# How to Build a Real-Time Video Processing Pipeline with FPS Monitoring in Roboflow Supervision

> Build a real-time video processing pipeline with FPS monitoring using Roboflow Supervision. Learn to orchestrate frame input output and benchmark performance with sv process video and sv FPSMonitor.

- Repository: [Roboflow/supervision](https://github.com/roboflow/supervision)
- Tags: tutorial
- Published: 2026-04-06

---

**Use `sv.process_video()` to orchestrate multi-threaded frame I/O while `sv.FPSMonitor` benchmarks performance via a moving-average tick counter.**

Roboflow Supervision provides a production-ready framework for real-time video processing that decouples I/O from computation using background threads. The pipeline centers on `process_video` in [`src/supervision/utils/video.py`](https://github.com/roboflow/supervision/blob/main/src/supervision/utils/video.py), which manages frame reading, user-defined processing, and video writing across three concurrent threads, while the `FPSMonitor` class tracks latency with minimal overhead.

## Core Architecture Components

The pipeline consists of five coordinated primitives designed to maintain throughput on high-resolution or high-frame-rate sources. Understanding how `VideoInfo`, frame generators, and the threaded orchestration interact ensures you can customize the flow without dropping frames.

### Video Source and VideoInfo Extraction

Before processing, the framework inspects the source to configure decoders and sinks. `VideoInfo.from_video_path` opens the video with OpenCV, extracts width, height, FPS, and total frame count, and validates that the source is readable.

```python
video_info = sv.VideoInfo.from_video_path("input.mp4")

```

*(source: [`VideoInfo` class](https://github.com/roboflow/supervision/blob/develop/src/supervision/utils/video.py#L21-L52))*

For iteration, `get_video_frames_generator` yields each frame as a BGR `numpy.ndarray`. It supports frame skipping via the `stride` parameter and robust seeking for sources that misbehave with direct `set` calls.

*(source: [`get_video_frames_generator`](https://github.com/roboflow/supervision/blob/develop/src/supervision/utils/video.py#L59-L92))*

### Threaded Processing with `process_video`

The `process_video` function implements a **three-stage producer-consumer pattern** that prevents I/O blocking from stalling inference:

1. **Reader thread** — Pulls frames from the generator and pushes `(index, frame)` tuples into `frame_read_queue` (bounded by `prefetch` size).
2. **Main thread** — Dequeues frames, executes your callback (e.g., object detection), and pushes processed frames into `frame_write_queue` (bounded by `writer_buffer` size).
3. **Writer thread** — Consumes from the write queue and persists frames via the `VideoSink` context manager.

This design keeps heavy CPU work on the main thread—where most ML libraries expect single-threaded execution—while relegating OpenCV I/O to background threads, avoiding GIL contention.

*(source: [`process_video` implementation](https://github.com/roboflow/supervision/blob/develop/src/supervision/utils/video.py#L111-L170))*

### FPS Monitoring

`FPSMonitor` maintains a **deque of timestamps** (default length 30) to compute a rolling average frame rate. Instantiate it before the pipeline starts, call `tick()` after each frame finishes processing, and read the `fps` property for the current moving average.

*(source: [`FPSMonitor` class](https://github.com/roboflow/supervision/blob/develop/src/supervision/utils/video.py#L71-L122))*

## Complete Implementation Example

Below is a self-contained script that reads a video file, draws the frame index on each frame, writes the result to disk, and prints the final average FPS.

```python
import cv2
import supervision as sv
import time

# 1. Initialize metadata and performance counter

video_info = sv.VideoInfo.from_video_path("input.mp4")
fps_monitor = sv.FPSMonitor(sample_size=30)

# 2. Define per-frame processing logic

def process_frame(frame: cv2.typing.MatLike, idx: int) -> cv2.typing.MatLike:
    cv2.putText(
        frame,
        f"Frame: {idx}",
        (10, 30),
        cv2.FONT_HERSHEY_SIMPLEX,
        1.0,
        (0, 255, 0),
        2,
    )
    fps_monitor.tick()  # Record timestamp after processing

    return frame

# 3. Run threaded pipeline

sv.process_video(
    source_path="input.mp4",
    target_path="output.mp4",
    callback=process_frame,
    show_progress=True,
)

# 4. Report statistics

print(f"Average FPS: {fps_monitor.fps:.2f}")

```

**Key implementation details:**

- The callback receives the zero-based frame index and must return the processed `numpy` array.
- `fps_monitor.tick()` is called **after** processing finishes to measure true compute latency, not I/O waiting time.
- `VideoSink` (used internally by `process_video`) handles codec selection and graceful resource cleanup automatically.

## Handling Real-Time Streams (Webcam and RTSP)

For live sources such as webcams or RTSP feeds, pass the device index or URL string to `source_path`. The generator will read indefinitely until the stream ends or you terminate the process.

```python
sv.process_video(
    source_path=0,  # 0 for default webcam; use "rtsp://..." for IP cameras

    target_path="webcam_capture.mp4",
    callback=process_frame,
    max_frames=500,  # Optional safety limit for testing

    show_progress=False,
)

```

When `max_frames` is omitted, the pipeline runs until `KeyboardInterrupt` or stream termination.

## Continuous FPS Reporting

For long-running deployments, log FPS periodically rather than waiting for completion. Spawn a daemon thread that reads `fps_monitor.fps` at fixed intervals:

```python
import threading

def log_fps(monitor: sv.FPSMonitor, interval: float = 1.0):
    while True:
        time.sleep(interval)
        print(f"[FPS] Current: {monitor.fps:.2f}")

# Start background reporter

threading.Thread(
    target=log_fps,
    args=(fps_monitor,),
    daemon=True
).start()

sv.process_video(
    source_path=0,
    target_path="live_output.mp4",
    callback=process_frame,
)

```

Because `FPSMonitor` uses thread-safe collections, concurrent reads from the reporting thread and writes from the main processing thread will not corrupt the moving-average calculation.

## Summary

- **`sv.VideoInfo.from_video_path`** extracts source metadata from [`src/supervision/utils/video.py`](https://github.com/roboflow/supervision/blob/main/src/supervision/utils/video.py) to configure sinks automatically.
- **`sv.process_video`** orchestrates three threads (reader, processor, writer) with bounded queues to decode, process, and encode frames without blocking the main inference thread.
- **`sv.FPSMonitor`** provides lightweight, moving-average latency tracking via the `tick()` method and `fps` property.
- The pipeline supports file-based, webcam, and RTSP inputs interchangeably via the same `source_path` interface.
- For real-time dashboards, access `fps_monitor.fps` from a secondary thread to stream metrics while processing continues.

## Frequently Asked Questions

### How do I install the supervision library to access these video utilities?

Install via pip: `pip install supervision`. The video pipeline requires OpenCV as a dependency, which is installed automatically. Ensure you have version **0.18.0** or later to access the threaded `process_video` API and `FPSMonitor` class.

### Can I use `FPSMonitor` outside of `process_video`?

Yes. `FPSMonitor` is decoupled from the video pipeline. You can instantiate it in any loop where you need throughput metrics—such as benchmarking batch inference on static images—by calling `tick()` once per iteration and reading the `fps` property.

### Why is my FPS lower than the source video's native frame rate?

The `FPSMonitor` measures **processing latency**, not playback speed. If your callback runs a heavy model (e.g., YOLOv8), the bottleneck is compute time, not I/O. To improve throughput, use a GPU-accelerated model, reduce input resolution, or increase `stride` in `get_video_frames_generator` to skip frames.

### Is the `process_video` callback thread-safe?

The callback executes on the **main thread**, so you do not need thread-safe models inside it. However, avoid blocking operations (like synchronous HTTP requests) inside the callback, as they stall the entire pipeline and can cause the bounded queues to fill, triggering back-pressure on the reader thread.