# How the SSE Streaming Mechanism Delivers Real-Time Progress Updates in AI Hedge Fund

> Discover how the SSE streaming mechanism delivers real-time progress updates for AI hedge funds using FastAPI StreamingResponse and async generators for efficient event handling.

- Repository: [Virat Singh/ai-hedge-fund](https://github.com/virattt/ai-hedge-fund)
- Tags: internals
- Published: 2026-03-09

---

**The AI Hedge Fund leverages FastAPI's StreamingResponse with an async generator that consumes events from an asyncio.Queue, populated by a singleton AgentProgress hub, to stream live updates via Server-Sent Events without blocking the main execution thread.**

The virattt/ai-hedge-fund repository implements a sophisticated SSE streaming mechanism to provide real-time visibility into long-running financial analysis workflows. By decoupling agent execution from HTTP response handling through an asynchronous queue-based architecture, the system pushes incremental progress updates to clients as AI agents process market data. This approach transforms computationally intensive hedge fund simulations into observable, interactive experiences.

## Core Components of the SSE Streaming Mechanism

### The FastAPI Endpoint and StreamingResponse

The entry point resides in [`app/backend/routes/hedge_fund.py`](https://github.com/virattt/ai-hedge-fund/blob/main/app/backend/routes/hedge_fund.py) (lines 62-98), where an async route handler instantiates a `StreamingResponse` with the MIME type `text/event-stream`. This response wraps an `event_generator` async generator function that continuously yields SSE-formatted strings as they become available from the progress hub.

### The AgentProgress Hub and Async Queue

Located in [`src/utils/progress.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/utils/progress.py) (lines 44-62), the **AgentProgress** singleton acts as the central nervous system for status updates. It maintains an `asyncio.Queue` that decouples agent progress reporting from HTTP I/O operations. Agents invoke `progress.update_status()` to record new states with timestamps, which triggers registered handlers to push `ProgressUpdateEvent` objects onto the queue via `progress_queue.put_nowait(event)`.

### Event Models and SSE Serialization

The system defines strict Pydantic models in [`app/backend/models/events.py`](https://github.com/virattt/ai-hedge-fund/blob/main/app/backend/models/events.py) (lines 10-14) to ensure protocol compliance. The hierarchy includes `BaseEvent`, `StartEvent`, `ProgressUpdateEvent`, `CompleteEvent`, and `ErrorEvent`. Each model implements a `to_sse()` method that renders the object into proper SSE format: `event: <type>\ndata: <json>\n\n`.

## Step-by-Step Execution Flow

1. **Client initiates request** – A POST request to `/hedge-fund/run` or `/backtest` triggers the SSE endpoint.

2. **Generator initialization** – The route creates the `event_generator` and registers a `progress_handler` with the global `progress` object using `progress.register_handler(progress_handler)`.

3. **Background task launch** – The system starts the heavy-weight graph execution in a background coroutine via `run_task = asyncio.create_task(run_graph_async(...))`, preventing blocking of the main thread.

4. **Disconnect detection** – A separate coroutine monitors `await request.receive()` for `http.disconnect` events, enabling graceful cancellation if the client closes the connection.

5. **Initial event transmission** – The generator immediately yields a `StartEvent` using `yield StartEvent().to_sse()` to confirm connection establishment.

6. **Agent progress reporting** – As agents complete analysis steps, they call `progress.update_status()`, which loops through registered handlers and enqueues `ProgressUpdateEvent` instances.

7. **Queue consumption** – The generator loop executes `await asyncio.wait_for(progress_queue.get(), timeout=1.0)` to dequeue events, converting each to SSE format via `event.to_sse()` and yielding to the client.

8. **Completion signaling** – When `run_task` finishes, the generator yields a final `CompleteEvent` (or `ErrorEvent` on failure) before exiting, closing the stream.

## Client Implementation Examples

### JavaScript EventSource Integration

```javascript
// Listen to a hedge-fund run and log progress updates
const es = new EventSource("/hedge-fund/run");

// Event fired when the server sends a "start" event
es.addEventListener("start", (e) => {
  console.log("Run started:", JSON.parse(e.data));
});

// Event fired for each progress update
es.addEventListener("progress", (e) => {
  const p = JSON.parse(e.data);
  console.log(`[${p.agent}] ${p.ticker ?? ""} – ${p.status}`);
  if (p.analysis) console.debug("Analysis:", p.analysis);
});

// Final result
es.addEventListener("complete", (e) => {
  console.log("Run completed:", JSON.parse(e.data));
  es.close();
});

// Errors
es.addEventListener("error", (e) => {
  console.error("Server error:", JSON.parse(e.data));
});

```

### Python httpx Stream Consumer

```python
import httpx
import json

def stream_hedge_fund(url: str, payload: dict):
    with httpx.Client(timeout=None) as client:
        with client.stream("POST", url, json=payload) as response:
            for line in response.iter_lines():
                if not line:
                    continue
                # Each line is either `event: <type>` or `data: <json>`

                # Accumulate until a blank line signals the end of one SSE message

                if line.startswith(b"event:"):
                    event_type = line.split(b":", 1)[1].strip().decode()
                elif line.startswith(b"data:"):
                    data = json.loads(line.split(b":", 1)[1].strip())
                    print(f"[{event_type}] →", data)

# Example usage

stream_hedge_fund(
    "http://localhost:8000/hedge-fund/run",
    {"initial_cash": 100_000, "tickers": ["AAPL"]}
)

```

## Reliability and Performance Optimizations

The SSE streaming mechanism ensures robust delivery through several architectural safeguards. The **asyncio.Queue** decouples agent progress from HTTP I/O, preventing backpressure from slowing down financial computations. The `await request.receive()` pattern detects client disconnections, allowing the server to cancel expensive `run_graph_async` tasks and reclaim resources. The `BaseEvent.to_sse()` implementation guarantees strict adherence to the SSE specification, ensuring compatibility with standard browsers and HTTP clients. Additionally, the `AgentProgress` class provides a rich live table visualization in the console for developers while simultaneously streaming identical data to external clients.

## Summary

- The SSE streaming mechanism in [`app/backend/routes/hedge_fund.py`](https://github.com/virattt/ai-hedge-fund/blob/main/app/backend/routes/hedge_fund.py) combines FastAPI's `StreamingResponse` with async generators to maintain persistent HTTP connections.
- **AgentProgress** in [`src/utils/progress.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/utils/progress.py) functions as a singleton hub, managing state through an `asyncio.Queue` and handler registration system.
- Event models in [`app/backend/models/events.py`](https://github.com/virattt/ai-hedge-fund/blob/main/app/backend/models/events.py) enforce type safety and proper SSE serialization via the `to_sse()` method.
- Automatic disconnect detection through `request.receive()` monitoring prevents resource leaks from abandoned connections.
- The architecture enables real-time observability of long-running AI computations without blocking the main execution thread or compromising throughput.

## Frequently Asked Questions

### What file contains the main SSE endpoint implementation?

The primary SSE endpoint is implemented in [`app/backend/routes/hedge_fund.py`](https://github.com/virattt/ai-hedge-fund/blob/main/app/backend/routes/hedge_fund.py) (lines 62-98), where the `StreamingResponse` wraps an async `event_generator` function that yields formatted Server-Sent Events to the client while managing background task execution.

### How does the system handle client disconnections during streaming?

The mechanism monitors `request.receive()` for `http.disconnect` events in a dedicated background coroutine, allowing the server to detect when a client closes the connection and immediately cancel the long-running graph execution task to prevent unnecessary computational expenditure.

### What event types are supported in the SSE stream?

The system supports five event types defined in [`app/backend/models/events.py`](https://github.com/virattt/ai-hedge-fund/blob/main/app/backend/models/events.py): `StartEvent` for initialization, `ProgressUpdateEvent` for interim status reports, `CompleteEvent` for successful completion, `ErrorEvent` for failure states, and the abstract `BaseEvent` class that provides the `to_sse()` serialization method used by all concrete implementations.

### How do agents communicate progress to the SSE stream?

Agents call `progress.update_status()` on the singleton `AgentProgress` instance, which broadcasts to all registered handlers; the handler registered by the SSE endpoint then constructs a `ProgressUpdateEvent` and pushes it onto the `asyncio.Queue`, where the event generator dequeues and transmits it to the client in real time.