How to Persist Hedge Fund Flows and Restore Them for Later Execution
Persist hedge fund flows by POSTing JSON definitions to the /flows endpoint, which stores them in the hedge_fund_flows table via SQLAlchemy models, then restore them later by fetching the flow data and passing its nodes and edges to create_graph in graph.py to rebuild the executable agent graph.
The virattt/ai-hedge-fund repository provides a sophisticated backend for building AI-powered investment strategies through composable agent graphs. When you construct a flow in the UI—connecting analyst agents, portfolio managers, and risk managers—you need a reliable mechanism to persist hedge fund flows and restore them for later execution without manually recreating the node configuration each time.
Understanding Flow Persistence Architecture
The backend stores a complete flow definition as a row in the hedge_fund_flows relational table. This record contains JSON-compatible columns for nodes (agent instances), edges (connections), viewport (UI state), and metadata. The SQLAlchemy ORM model HedgeFundFlow in src/app/backend/database/models.py maps these columns to Python objects.
When persisting, FlowRepository in src/app/backend/repositories/flow_repository.py handles the database transaction. When restoring, the create_graph function in src/app/backend/services/graph.py reconstructs the executable StateGraph from the stored JSON structures.
Persisting a New Flow
The FlowCreateRequest Schema
Before storage, the FastAPI endpoint validates incoming requests against the FlowCreateRequest Pydantic schema in src/app/backend/models/schemas.py. This schema enforces required fields including name, nodes, and edges, ensuring the flow definition is structurally complete before it reaches the database layer.
Repository Layer and Database Insertion
The FlowRepository.create_flow method in src/app/backend/repositories/flow_repository.py converts the validated FlowCreateRequest into a HedgeFundFlow ORM object. It commits the JSON-compatible nodes, edges, viewport, and optional metadata to the relational database, assigning a unique integer id that serves as the persistent identifier for later retrieval.
Example: Creating a Flow via API
You can persist a flow by sending a POST request to the /flows endpoint defined in src/app/backend/routes/flows.py:
curl -X POST https://your-api.com/flows \
-H "Content-Type: application/json" \
-d '{
"name": "My First Hedge Fund Flow",
"description": "Demo flow with two analyst agents",
"nodes": [
{"id": "warren_buffett_abc123", "type": "agent", "data": {"agent_id":"warren_buffett"}},
{"id": "growth_agent_def456", "type": "agent", "data": {"agent_id":"growth_agent"}}
],
"edges": [
{"id": "e1", "source": "warren_buffett_abc123", "target": "growth_agent_def456"}
],
"viewport": {"x":0,"y":0,"zoom":1},
"is_template": false,
"tags": ["demo","test"]
}'
Behind the scenes, the request is validated against FlowCreateRequest in src/app/backend/models/schemas.py, and FlowRepository.create_flow inserts the row into hedge_fund_flows.
Restoring and Executing a Stored Flow
Fetching Flow Data
To restore a flow for later execution, send a GET request to /flows/{flow_id} implemented in src/app/backend/routes/flows.py. The endpoint returns a FlowResponse containing the original nodes, edges, and viewport data. You can also list all available flows via GET /flows to locate the specific id you need.
Reconstructing the StateGraph
The create_graph function in src/app/backend/services/graph.py accepts the stored graph_nodes and graph_edges from the FlowResponse, then reconstructs the StateGraph by mapping the JSON definitions back to agent instances and connection logic. This rebuilt graph is then executable via run_graph_async with new parameters such as tickers, date ranges, and model configurations.
Example: Restoring and Running a Flow
import requests
API_URL = "https://your-api.com"
FLOW_ID = 42
# Fetch the stored flow
resp = requests.get(f"{API_URL}/flows/{FLOW_ID}")
resp.raise_for_status()
flow = resp.json() # contains `nodes`, `edges`, `viewport`, etc.
# Pass the flow data to the graph runner
from app.backend.services.graph import create_graph, run_graph_async
graph = create_graph(flow["graph_nodes"], flow["graph_edges"])
portfolio = {"cash": 100_000, "positions": {}}
result = await run_graph_async(
graph,
portfolio,
tickers=["AAPL", "MSFT"],
start_date="2024-01-01",
end_date="2024-03-31",
model_name="gpt-4.1",
model_provider="OPENAI",
)
print(result)
The FlowResponse model in src/app/backend/models/schemas.py supplies the exact JSON structure needed by create_graph, ensuring seamless restoration of the agent workflow.
Tracking Flow Executions
Each execution creates a HedgeFundFlowRun record defined in src/app/backend/database/models.py and managed by FlowRunRepository in src/app/backend/repositories/flow_run_repository.py. This stores the initial request_data, execution status (pending, complete, or failed), and final results, enabling you to audit performance across multiple runs of the same persisted flow.
When run_graph_async finishes, the service handling the request (such as the /hedge_fund endpoint in src/app/backend/routes/hedge_fund.py) typically records the outcome:
from app.backend.repositories.flow_run_repository import FlowRunRepository
from app.backend.models.schemas import FlowRunStatus
run_repo = FlowRunRepository(db_session)
flow_run = run_repo.create_flow_run(flow_id=FLOW_ID, request_data=run_request)
# ... after execution completes
run_repo.update_flow_run(
run_id=flow_run.id,
status=FlowRunStatus.COMPLETE,
results=execution_output,
)
This creates a complete audit trail, allowing you to compare results across different executions of the same flow configuration.
Key Files and Components
Understanding the persistence mechanism requires familiarity with these specific files in the virattt/ai-hedge-fund repository:
src/app/backend/models/schemas.py– Contains Pydantic request/response models includingFlowCreateRequest,FlowResponse, andFlowRunCreateRequestthat validate API payloads.src/app/backend/database/models.py– Defines SQLAlchemy ORM classesHedgeFundFlowandHedgeFundFlowRunthat map to the relational database tables.src/app/backend/repositories/flow_repository.py– ImplementsFlowRepositorywithcreate_flowand retrieval methods that handle database transactions.src/app/backend/repositories/flow_run_repository.py– Manages execution history throughFlowRunRepository, storing run metadata and results.src/app/backend/routes/flows.py– FastAPI endpointsPOST /flowsandGET /flows/{id}that expose persistence operations to clients.src/app/backend/services/graph.py– Containscreate_graphandrun_graph_asyncfunctions that reconstruct and execute stored flows.src/app/backend/routes/hedge_fund.py– Example usage of flow execution and run tracking in the main hedge fund endpoint.
Summary
- Persistence: POST JSON flow definitions to
/flowsto store them in thehedge_fund_flowstable viaFlowRepository.create_flowinsrc/app/backend/repositories/flow_repository.py. - Validation: The
FlowCreateRequestschema insrc/app/backend/models/schemas.pyenforces required fields (name,nodes,edges) before database insertion. - Restoration: Fetch stored flows via
GET /flows/{flow_id}and pass the returnednodesandedgestocreate_graphinsrc/app/backend/services/graph.pyto rebuild the executableStateGraph. - Execution Tracking: Each run creates a
HedgeFundFlowRunrecord managed byFlowRunRepositoryinsrc/app/backend/repositories/flow_run_repository.py, storing request data, status, and results for audit trails.
Frequently Asked Questions
What database schema stores the flow definitions?
Flow definitions are stored in the hedge_fund_flows table, mapped by the HedgeFundFlow SQLAlchemy ORM class in src/app/backend/database/models.py. This table stores JSON-compatible columns for nodes, edges, viewport, and metadata, along with a unique integer id primary key and timestamps.
How does the system reconstruct the agent graph from stored data?
The create_graph function in src/app/backend/services/graph.py accepts the stored graph_nodes and graph_edges from the FlowResponse, then reconstructs the StateGraph by mapping the JSON definitions back to agent instances and connection logic. This rebuilt graph is then executable via run_graph_async with new parameters such as tickers, date ranges, and model configurations.
Can I track the execution history of a persisted flow?
Yes. Each execution creates a HedgeFundFlowRun record managed by FlowRunRepository in src/app/backend/repositories/flow_run_repository.py. This stores the initial request_data, execution status (pending, complete, or failed), and final results, enabling you to audit performance across multiple runs of the same persisted flow configuration.
What validation occurs when persisting a new flow?
Incoming requests are validated against the FlowCreateRequest Pydantic schema in src/app/backend/models/schemas.py. This schema enforces required fields including name, nodes, and edges, ensuring the flow definition is structurally complete before FlowRepository.create_flow commits it to the database.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →