How the SSE Streaming Mechanism Delivers Real-Time Progress Updates in AI Hedge Fund

The AI Hedge Fund leverages FastAPI's StreamingResponse with an async generator that consumes events from an asyncio.Queue, populated by a singleton AgentProgress hub, to stream live updates via Server-Sent Events without blocking the main execution thread.

The virattt/ai-hedge-fund repository implements a sophisticated SSE streaming mechanism to provide real-time visibility into long-running financial analysis workflows. By decoupling agent execution from HTTP response handling through an asynchronous queue-based architecture, the system pushes incremental progress updates to clients as AI agents process market data. This approach transforms computationally intensive hedge fund simulations into observable, interactive experiences.

Core Components of the SSE Streaming Mechanism

The FastAPI Endpoint and StreamingResponse

The entry point resides in app/backend/routes/hedge_fund.py (lines 62-98), where an async route handler instantiates a StreamingResponse with the MIME type text/event-stream. This response wraps an event_generator async generator function that continuously yields SSE-formatted strings as they become available from the progress hub.

The AgentProgress Hub and Async Queue

Located in src/utils/progress.py (lines 44-62), the AgentProgress singleton acts as the central nervous system for status updates. It maintains an asyncio.Queue that decouples agent progress reporting from HTTP I/O operations. Agents invoke progress.update_status() to record new states with timestamps, which triggers registered handlers to push ProgressUpdateEvent objects onto the queue via progress_queue.put_nowait(event).

Event Models and SSE Serialization

The system defines strict Pydantic models in app/backend/models/events.py (lines 10-14) to ensure protocol compliance. The hierarchy includes BaseEvent, StartEvent, ProgressUpdateEvent, CompleteEvent, and ErrorEvent. Each model implements a to_sse() method that renders the object into proper SSE format: event: <type>\ndata: <json>\n\n.

Step-by-Step Execution Flow

  1. Client initiates request – A POST request to /hedge-fund/run or /backtest triggers the SSE endpoint.

  2. Generator initialization – The route creates the event_generator and registers a progress_handler with the global progress object using progress.register_handler(progress_handler).

  3. Background task launch – The system starts the heavy-weight graph execution in a background coroutine via run_task = asyncio.create_task(run_graph_async(...)), preventing blocking of the main thread.

  4. Disconnect detection – A separate coroutine monitors await request.receive() for http.disconnect events, enabling graceful cancellation if the client closes the connection.

  5. Initial event transmission – The generator immediately yields a StartEvent using yield StartEvent().to_sse() to confirm connection establishment.

  6. Agent progress reporting – As agents complete analysis steps, they call progress.update_status(), which loops through registered handlers and enqueues ProgressUpdateEvent instances.

  7. Queue consumption – The generator loop executes await asyncio.wait_for(progress_queue.get(), timeout=1.0) to dequeue events, converting each to SSE format via event.to_sse() and yielding to the client.

  8. Completion signaling – When run_task finishes, the generator yields a final CompleteEvent (or ErrorEvent on failure) before exiting, closing the stream.

Client Implementation Examples

JavaScript EventSource Integration

// Listen to a hedge-fund run and log progress updates
const es = new EventSource("/hedge-fund/run");

// Event fired when the server sends a "start" event
es.addEventListener("start", (e) => {
  console.log("Run started:", JSON.parse(e.data));
});

// Event fired for each progress update
es.addEventListener("progress", (e) => {
  const p = JSON.parse(e.data);
  console.log(`[${p.agent}] ${p.ticker ?? ""} – ${p.status}`);
  if (p.analysis) console.debug("Analysis:", p.analysis);
});

// Final result
es.addEventListener("complete", (e) => {
  console.log("Run completed:", JSON.parse(e.data));
  es.close();
});

// Errors
es.addEventListener("error", (e) => {
  console.error("Server error:", JSON.parse(e.data));
});

Python httpx Stream Consumer

import httpx
import json

def stream_hedge_fund(url: str, payload: dict):
    with httpx.Client(timeout=None) as client:
        with client.stream("POST", url, json=payload) as response:
            for line in response.iter_lines():
                if not line:
                    continue
                # Each line is either `event: <type>` or `data: <json>`

                # Accumulate until a blank line signals the end of one SSE message

                if line.startswith(b"event:"):
                    event_type = line.split(b":", 1)[1].strip().decode()
                elif line.startswith(b"data:"):
                    data = json.loads(line.split(b":", 1)[1].strip())
                    print(f"[{event_type}] →", data)

# Example usage

stream_hedge_fund(
    "http://localhost:8000/hedge-fund/run",
    {"initial_cash": 100_000, "tickers": ["AAPL"]}
)

Reliability and Performance Optimizations

The SSE streaming mechanism ensures robust delivery through several architectural safeguards. The asyncio.Queue decouples agent progress from HTTP I/O, preventing backpressure from slowing down financial computations. The await request.receive() pattern detects client disconnections, allowing the server to cancel expensive run_graph_async tasks and reclaim resources. The BaseEvent.to_sse() implementation guarantees strict adherence to the SSE specification, ensuring compatibility with standard browsers and HTTP clients. Additionally, the AgentProgress class provides a rich live table visualization in the console for developers while simultaneously streaming identical data to external clients.

Summary

  • The SSE streaming mechanism in app/backend/routes/hedge_fund.py combines FastAPI's StreamingResponse with async generators to maintain persistent HTTP connections.
  • AgentProgress in src/utils/progress.py functions as a singleton hub, managing state through an asyncio.Queue and handler registration system.
  • Event models in app/backend/models/events.py enforce type safety and proper SSE serialization via the to_sse() method.
  • Automatic disconnect detection through request.receive() monitoring prevents resource leaks from abandoned connections.
  • The architecture enables real-time observability of long-running AI computations without blocking the main execution thread or compromising throughput.

Frequently Asked Questions

What file contains the main SSE endpoint implementation?

The primary SSE endpoint is implemented in app/backend/routes/hedge_fund.py (lines 62-98), where the StreamingResponse wraps an async event_generator function that yields formatted Server-Sent Events to the client while managing background task execution.

How does the system handle client disconnections during streaming?

The mechanism monitors request.receive() for http.disconnect events in a dedicated background coroutine, allowing the server to detect when a client closes the connection and immediately cancel the long-running graph execution task to prevent unnecessary computational expenditure.

What event types are supported in the SSE stream?

The system supports five event types defined in app/backend/models/events.py: StartEvent for initialization, ProgressUpdateEvent for interim status reports, CompleteEvent for successful completion, ErrorEvent for failure states, and the abstract BaseEvent class that provides the to_sse() serialization method used by all concrete implementations.

How do agents communicate progress to the SSE stream?

Agents call progress.update_status() on the singleton AgentProgress instance, which broadcasts to all registered handlers; the handler registered by the SSE endpoint then constructs a ProgressUpdateEvent and pushes it onto the asyncio.Queue, where the event generator dequeues and transmits it to the client in real time.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →