How to Integrate Async Inference with Supervision Detection Tools
You can run asynchronous inference in Supervision by wrapping async model calls inside the synchronous callback function of process_video, which runs in a threaded pipeline that separates frame I/O from inference processing.
Supervision (roboflow/supervision) separates video handling, model inference, and post-processing into independent, interchangeable components. By plugging async-compatible inference calls into the library’s callback-based architecture, you achieve full pipeline parallelism without modifying internal source code.
Supervision’s Video Pipeline Architecture
The library implements a producer-consumer pattern through three core utilities in src/supervision/utils/video.py:
-
VideoInfo(line 21) — Stores resolution, FPS, and frame count metadata required for both reading and writing video streams. -
get_video_frames_generator(line 59) — A lazy generator yielding rawnp.uint8frames, keeping the reading side synchronous so you can drive it from any async loop. -
process_video(line 22) — Orchestrates a three-stage threaded pipeline: a reader thread fills a bounded queue, the main thread executes your inference callback, and a writer thread persists results. This design removes the need for explicit locking while maintaining high throughput.
The callback parameter in process_video expects a callable with the signature defined at line 14: Callable[[NDArray, int], NDArray], accepting a frame and index, and returning a processed frame.
The Async Integration Pattern
Supervision already provides the concurrency primitives; you only need to bridge sync and async contexts. The pattern works as follows:
-
Use
sv.get_video_frames_generatorto create a synchronous frame source. -
Wrap your async inference engine (HTTP API, async SDK, or GIL-releasing model) in a synchronous wrapper using
asyncio.runorloop.run_until_complete. -
Convert raw inference outputs to
sv.DetectionsusingDetections.from_inference(line 613 ofsrc/supervision/detection/core.py), ensuring uniform output regardless of the underlying model. -
Return the annotated frame to the pipeline.
By calling asyncio.run inside the callback, you temporarily spin up an event loop for that frame. The surrounding reader and writer threads keep the pipeline flowing, so network latency or inference time is hidden behind prefetch buffers.
Implementation Examples
Threaded Pipeline with a Blocking Model
For CPU-bound or GPU-bound models that release the GIL, run the model synchronously inside the callback while process_video handles I/O concurrency:
import supervision as sv
from ultralytics import YOLO
model = YOLO("yolov8n.pt")
def sync_callback(frame: sv.NDArray, idx: int) -> sv.NDArray:
# Model runs synchronously; I/O threads handle frame reading/writing
results = model.predict(frame, conf=0.4, device=0)[0]
detections = sv.Detections.from_inference(results)
annotated = detections.draw(frame)
return annotated
sv.process_video(
source_path="input.mp4",
target_path="output.mp4",
callback=sync_callback,
show_progress=True,
)
This leverages the three-stage pipeline (reader → processor → writer) defined at lines 22–70 in src/supervision/utils/video.py.
Asyncio Inference with HTTP Services
For I/O-bound inference (e.g., REST API calls), wrap the async client in a sync callback:
import asyncio
import aiohttp
import supervision as sv
import numpy as np
import cv2
async def async_infer(frame: np.ndarray) -> dict:
async with aiohttp.ClientSession() as session:
_, jpeg = cv2.imencode(".jpg", frame)
async with session.post(
"https://api.example.com/v1/predict",
data=jpeg.tobytes(),
headers={"Content-Type": "image/jpeg"},
) as resp:
return await resp.json()
def async_wrapper(frame: np.ndarray, idx: int) -> np.ndarray:
# Bridge async to sync for the callback contract
inference_result = asyncio.run(async_infer(frame))
detections = sv.Detections.from_inference(inference_result)
detections = detections[detections.confidence > 0.5]
return detections.draw(frame)
sv.process_video(
source_path="input.mp4",
target_path="output.mp4",
callback=async_wrapper,
prefetch=64, # Hide network latency
writer_buffer=64,
show_progress=True,
)
The prefetch parameter increases the buffer size in the reader thread, preventing stalls during network requests.
Direct Generator Control for Streaming
For real-time streams (webcam or RTSP) where you need full control, bypass process_video and drive the loop manually:
import cv2
import supervision as sv
import asyncio
async def async_infer(frame):
# Your async inference logic here
...
async def main():
cap = cv2.VideoCapture(0) # Webcam
fps_monitor = sv.FPSMonitor()
while True:
ret, frame = cap.read()
if not ret:
break
# Async inference directly awaited
detections = sv.Detections.from_inference(await async_infer(frame))
annotated = detections.draw(frame)
cv2.imshow("Live", annotated)
fps_monitor.tick()
if cv2.waitKey(1) == 27: # ESC
break
print(f"Average FPS: {fps_monitor.fps:.2f}")
asyncio.run(main())
FPSMonitor (lines 71–110 in src/supervision/utils/video.py) benchmarks end-to-end latency for your async setup.
Key Source Files
Understanding these files helps when customizing the integration:
-
src/supervision/utils/video.py— Containsprocess_video,get_video_frames_generator,VideoInfo, andFPSMonitor. This is the core infrastructure for threaded video processing. -
src/supervision/detection/core.py— ImplementsDetections.from_inference(line 613), the canonical factory method that converts raw inference dictionaries into Supervision’s unifiedDetectionsclass. -
src/supervision/detection/tools/inference_slicer.py— Demonstrates how to swap custom inference callbacks for tiled processing, useful when running async inference on large image patches.
The public API exposed in src/supervision/__init__.py allows you to import these components as sv.process_video, sv.Detections, etc.
Summary
- Supervision decouples video I/O, inference, and annotation into separate stages via the
process_videothreaded pipeline. - Async inference integrates by wrapping coroutines in synchronous callbacks;
asyncio.runbridges the gap without blocking the pipeline. Detections.from_inferencenormalizes output from any model (YOLO, Detectron2, HTTP APIs) into a standard format at line 613 ofcore.py.VideoInfoandget_video_frames_generatorprovide the metadata and frame sources needed to drive custom async loops when bypassingprocess_video.- Prefetch buffers in
process_video(set viaprefetchandwriter_bufferparameters) hide latency from slow inference or network calls.
Frequently Asked Questions
How does Supervision handle thread safety when I use async inference in the callback?
Supervision’s process_video function implements a bounded queue between the reader thread and the main processor thread, and another between the processor and writer thread. Your callback runs in the main thread, so calling asyncio.run or other blocking operations inside it is safe; the reader and writer threads continue operating concurrently without explicit locks (see lines 22–70 in src/supervision/utils/video.py).
Can I use async/await syntax directly in the callback instead of asyncio.run?
No. The callback signature required by process_video is Callable[[NDArray, int], NDArray], which is synchronous. You must bridge async code using asyncio.run, loop.run_until_complete, or by running an async inference worker in a separate thread and consuming results via asyncio.Queue within the synchronous callback.
What is the performance impact of calling asyncio.run for every frame?
While asyncio.run creates and closes an event loop per call, the overhead is negligible compared to model inference or network latency. For maximum efficiency with high-frequency frames, consider using process_video with large prefetch and writer_buffer values (e.g., 64–128) to keep the pipeline full while the callback handles async I/O.
Does Detections.from_inference support custom model outputs or only specific formats?
Detections.from_inference (line 613 of src/supervision/detection/core.py) accepts raw dictionaries following the standard inference schema or objects from the Roboflow Inference SDK. As long as your async model returns bounding boxes, confidences, and class IDs in the expected format, the factory method will create a valid sv.Detections object compatible with all Supervision annotators and metrics.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →