How the SSE Streaming Mechanism Delivers Real-Time Progress Updates in AI Hedge Fund
The AI Hedge Fund leverages FastAPI's StreamingResponse with an async generator that consumes events from an asyncio.Queue, populated by a singleton AgentProgress hub, to stream live updates via Server-Sent Events without blocking the main execution thread.
The virattt/ai-hedge-fund repository implements a sophisticated SSE streaming mechanism to provide real-time visibility into long-running financial analysis workflows. By decoupling agent execution from HTTP response handling through an asynchronous queue-based architecture, the system pushes incremental progress updates to clients as AI agents process market data. This approach transforms computationally intensive hedge fund simulations into observable, interactive experiences.
Core Components of the SSE Streaming Mechanism
The FastAPI Endpoint and StreamingResponse
The entry point resides in app/backend/routes/hedge_fund.py (lines 62-98), where an async route handler instantiates a StreamingResponse with the MIME type text/event-stream. This response wraps an event_generator async generator function that continuously yields SSE-formatted strings as they become available from the progress hub.
The AgentProgress Hub and Async Queue
Located in src/utils/progress.py (lines 44-62), the AgentProgress singleton acts as the central nervous system for status updates. It maintains an asyncio.Queue that decouples agent progress reporting from HTTP I/O operations. Agents invoke progress.update_status() to record new states with timestamps, which triggers registered handlers to push ProgressUpdateEvent objects onto the queue via progress_queue.put_nowait(event).
Event Models and SSE Serialization
The system defines strict Pydantic models in app/backend/models/events.py (lines 10-14) to ensure protocol compliance. The hierarchy includes BaseEvent, StartEvent, ProgressUpdateEvent, CompleteEvent, and ErrorEvent. Each model implements a to_sse() method that renders the object into proper SSE format: event: <type>\ndata: <json>\n\n.
Step-by-Step Execution Flow
-
Client initiates request – A POST request to
/hedge-fund/runor/backtesttriggers the SSE endpoint. -
Generator initialization – The route creates the
event_generatorand registers aprogress_handlerwith the globalprogressobject usingprogress.register_handler(progress_handler). -
Background task launch – The system starts the heavy-weight graph execution in a background coroutine via
run_task = asyncio.create_task(run_graph_async(...)), preventing blocking of the main thread. -
Disconnect detection – A separate coroutine monitors
await request.receive()forhttp.disconnectevents, enabling graceful cancellation if the client closes the connection. -
Initial event transmission – The generator immediately yields a
StartEventusingyield StartEvent().to_sse()to confirm connection establishment. -
Agent progress reporting – As agents complete analysis steps, they call
progress.update_status(), which loops through registered handlers and enqueuesProgressUpdateEventinstances. -
Queue consumption – The generator loop executes
await asyncio.wait_for(progress_queue.get(), timeout=1.0)to dequeue events, converting each to SSE format viaevent.to_sse()and yielding to the client. -
Completion signaling – When
run_taskfinishes, the generator yields a finalCompleteEvent(orErrorEventon failure) before exiting, closing the stream.
Client Implementation Examples
JavaScript EventSource Integration
// Listen to a hedge-fund run and log progress updates
const es = new EventSource("/hedge-fund/run");
// Event fired when the server sends a "start" event
es.addEventListener("start", (e) => {
console.log("Run started:", JSON.parse(e.data));
});
// Event fired for each progress update
es.addEventListener("progress", (e) => {
const p = JSON.parse(e.data);
console.log(`[${p.agent}] ${p.ticker ?? ""} – ${p.status}`);
if (p.analysis) console.debug("Analysis:", p.analysis);
});
// Final result
es.addEventListener("complete", (e) => {
console.log("Run completed:", JSON.parse(e.data));
es.close();
});
// Errors
es.addEventListener("error", (e) => {
console.error("Server error:", JSON.parse(e.data));
});
Python httpx Stream Consumer
import httpx
import json
def stream_hedge_fund(url: str, payload: dict):
with httpx.Client(timeout=None) as client:
with client.stream("POST", url, json=payload) as response:
for line in response.iter_lines():
if not line:
continue
# Each line is either `event: <type>` or `data: <json>`
# Accumulate until a blank line signals the end of one SSE message
if line.startswith(b"event:"):
event_type = line.split(b":", 1)[1].strip().decode()
elif line.startswith(b"data:"):
data = json.loads(line.split(b":", 1)[1].strip())
print(f"[{event_type}] →", data)
# Example usage
stream_hedge_fund(
"http://localhost:8000/hedge-fund/run",
{"initial_cash": 100_000, "tickers": ["AAPL"]}
)
Reliability and Performance Optimizations
The SSE streaming mechanism ensures robust delivery through several architectural safeguards. The asyncio.Queue decouples agent progress from HTTP I/O, preventing backpressure from slowing down financial computations. The await request.receive() pattern detects client disconnections, allowing the server to cancel expensive run_graph_async tasks and reclaim resources. The BaseEvent.to_sse() implementation guarantees strict adherence to the SSE specification, ensuring compatibility with standard browsers and HTTP clients. Additionally, the AgentProgress class provides a rich live table visualization in the console for developers while simultaneously streaming identical data to external clients.
Summary
- The SSE streaming mechanism in
app/backend/routes/hedge_fund.pycombines FastAPI'sStreamingResponsewith async generators to maintain persistent HTTP connections. - AgentProgress in
src/utils/progress.pyfunctions as a singleton hub, managing state through anasyncio.Queueand handler registration system. - Event models in
app/backend/models/events.pyenforce type safety and proper SSE serialization via theto_sse()method. - Automatic disconnect detection through
request.receive()monitoring prevents resource leaks from abandoned connections. - The architecture enables real-time observability of long-running AI computations without blocking the main execution thread or compromising throughput.
Frequently Asked Questions
What file contains the main SSE endpoint implementation?
The primary SSE endpoint is implemented in app/backend/routes/hedge_fund.py (lines 62-98), where the StreamingResponse wraps an async event_generator function that yields formatted Server-Sent Events to the client while managing background task execution.
How does the system handle client disconnections during streaming?
The mechanism monitors request.receive() for http.disconnect events in a dedicated background coroutine, allowing the server to detect when a client closes the connection and immediately cancel the long-running graph execution task to prevent unnecessary computational expenditure.
What event types are supported in the SSE stream?
The system supports five event types defined in app/backend/models/events.py: StartEvent for initialization, ProgressUpdateEvent for interim status reports, CompleteEvent for successful completion, ErrorEvent for failure states, and the abstract BaseEvent class that provides the to_sse() serialization method used by all concrete implementations.
How do agents communicate progress to the SSE stream?
Agents call progress.update_status() on the singleton AgentProgress instance, which broadcasts to all registered handlers; the handler registered by the SSE endpoint then constructs a ProgressUpdateEvent and pushes it onto the asyncio.Queue, where the event generator dequeues and transmits it to the client in real time.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →