How to Persist Hedge Fund Flows and Restore Them for Later Execution

Persist hedge fund flows by POSTing JSON definitions to the /flows endpoint, which stores them in the hedge_fund_flows table via SQLAlchemy models, then restore them later by fetching the flow data and passing its nodes and edges to create_graph in graph.py to rebuild the executable agent graph.

The virattt/ai-hedge-fund repository provides a sophisticated backend for building AI-powered investment strategies through composable agent graphs. When you construct a flow in the UI—connecting analyst agents, portfolio managers, and risk managers—you need a reliable mechanism to persist hedge fund flows and restore them for later execution without manually recreating the node configuration each time.

Understanding Flow Persistence Architecture

The backend stores a complete flow definition as a row in the hedge_fund_flows relational table. This record contains JSON-compatible columns for nodes (agent instances), edges (connections), viewport (UI state), and metadata. The SQLAlchemy ORM model HedgeFundFlow in src/app/backend/database/models.py maps these columns to Python objects.

When persisting, FlowRepository in src/app/backend/repositories/flow_repository.py handles the database transaction. When restoring, the create_graph function in src/app/backend/services/graph.py reconstructs the executable StateGraph from the stored JSON structures.

Persisting a New Flow

The FlowCreateRequest Schema

Before storage, the FastAPI endpoint validates incoming requests against the FlowCreateRequest Pydantic schema in src/app/backend/models/schemas.py. This schema enforces required fields including name, nodes, and edges, ensuring the flow definition is structurally complete before it reaches the database layer.

Repository Layer and Database Insertion

The FlowRepository.create_flow method in src/app/backend/repositories/flow_repository.py converts the validated FlowCreateRequest into a HedgeFundFlow ORM object. It commits the JSON-compatible nodes, edges, viewport, and optional metadata to the relational database, assigning a unique integer id that serves as the persistent identifier for later retrieval.

Example: Creating a Flow via API

You can persist a flow by sending a POST request to the /flows endpoint defined in src/app/backend/routes/flows.py:

curl -X POST https://your-api.com/flows \
  -H "Content-Type: application/json" \
  -d '{
        "name": "My First Hedge Fund Flow",
        "description": "Demo flow with two analyst agents",
        "nodes": [
          {"id": "warren_buffett_abc123", "type": "agent", "data": {"agent_id":"warren_buffett"}},
          {"id": "growth_agent_def456", "type": "agent", "data": {"agent_id":"growth_agent"}}
        ],
        "edges": [
          {"id": "e1", "source": "warren_buffett_abc123", "target": "growth_agent_def456"}
        ],
        "viewport": {"x":0,"y":0,"zoom":1},
        "is_template": false,
        "tags": ["demo","test"]
      }'

Behind the scenes, the request is validated against FlowCreateRequest in src/app/backend/models/schemas.py, and FlowRepository.create_flow inserts the row into hedge_fund_flows.

Restoring and Executing a Stored Flow

Fetching Flow Data

To restore a flow for later execution, send a GET request to /flows/{flow_id} implemented in src/app/backend/routes/flows.py. The endpoint returns a FlowResponse containing the original nodes, edges, and viewport data. You can also list all available flows via GET /flows to locate the specific id you need.

Reconstructing the StateGraph

The create_graph function in src/app/backend/services/graph.py accepts the stored graph_nodes and graph_edges from the FlowResponse, then reconstructs the StateGraph by mapping the JSON definitions back to agent instances and connection logic. This rebuilt graph is then executable via run_graph_async with new parameters such as tickers, date ranges, and model configurations.

Example: Restoring and Running a Flow

import requests

API_URL = "https://your-api.com"
FLOW_ID = 42

# Fetch the stored flow

resp = requests.get(f"{API_URL}/flows/{FLOW_ID}")
resp.raise_for_status()
flow = resp.json()          # contains `nodes`, `edges`, `viewport`, etc.

# Pass the flow data to the graph runner

from app.backend.services.graph import create_graph, run_graph_async

graph = create_graph(flow["graph_nodes"], flow["graph_edges"])
portfolio = {"cash": 100_000, "positions": {}}

result = await run_graph_async(
    graph,
    portfolio,
    tickers=["AAPL", "MSFT"],
    start_date="2024-01-01",
    end_date="2024-03-31",
    model_name="gpt-4.1",
    model_provider="OPENAI",
)
print(result)

The FlowResponse model in src/app/backend/models/schemas.py supplies the exact JSON structure needed by create_graph, ensuring seamless restoration of the agent workflow.

Tracking Flow Executions

Each execution creates a HedgeFundFlowRun record defined in src/app/backend/database/models.py and managed by FlowRunRepository in src/app/backend/repositories/flow_run_repository.py. This stores the initial request_data, execution status (pending, complete, or failed), and final results, enabling you to audit performance across multiple runs of the same persisted flow.

When run_graph_async finishes, the service handling the request (such as the /hedge_fund endpoint in src/app/backend/routes/hedge_fund.py) typically records the outcome:

from app.backend.repositories.flow_run_repository import FlowRunRepository
from app.backend.models.schemas import FlowRunStatus

run_repo = FlowRunRepository(db_session)
flow_run = run_repo.create_flow_run(flow_id=FLOW_ID, request_data=run_request)

# ... after execution completes

run_repo.update_flow_run(
    run_id=flow_run.id,
    status=FlowRunStatus.COMPLETE,
    results=execution_output,
)

This creates a complete audit trail, allowing you to compare results across different executions of the same flow configuration.

Key Files and Components

Understanding the persistence mechanism requires familiarity with these specific files in the virattt/ai-hedge-fund repository:

Summary

Frequently Asked Questions

What database schema stores the flow definitions?

Flow definitions are stored in the hedge_fund_flows table, mapped by the HedgeFundFlow SQLAlchemy ORM class in src/app/backend/database/models.py. This table stores JSON-compatible columns for nodes, edges, viewport, and metadata, along with a unique integer id primary key and timestamps.

How does the system reconstruct the agent graph from stored data?

The create_graph function in src/app/backend/services/graph.py accepts the stored graph_nodes and graph_edges from the FlowResponse, then reconstructs the StateGraph by mapping the JSON definitions back to agent instances and connection logic. This rebuilt graph is then executable via run_graph_async with new parameters such as tickers, date ranges, and model configurations.

Can I track the execution history of a persisted flow?

Yes. Each execution creates a HedgeFundFlowRun record managed by FlowRunRepository in src/app/backend/repositories/flow_run_repository.py. This stores the initial request_data, execution status (pending, complete, or failed), and final results, enabling you to audit performance across multiple runs of the same persisted flow configuration.

What validation occurs when persisting a new flow?

Incoming requests are validated against the FlowCreateRequest Pydantic schema in src/app/backend/models/schemas.py. This schema enforces required fields including name, nodes, and edges, ensuring the flow definition is structurally complete before FlowRepository.create_flow commits it to the database.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →