# How to Persist Hedge Fund Flows and Restore Them for Later Execution

> Learn how to persist hedge fund flows for later execution. Save your flow definitions to the database and restore them effortlessly to rebuild your executable agent graph.

- Repository: [Virat Singh/ai-hedge-fund](https://github.com/virattt/ai-hedge-fund)
- Tags: how-to-guide
- Published: 2026-03-09

---

**Persist hedge fund flows by POSTing JSON definitions to the `/flows` endpoint, which stores them in the `hedge_fund_flows` table via SQLAlchemy models, then restore them later by fetching the flow data and passing its nodes and edges to `create_graph` in [`graph.py`](https://github.com/virattt/ai-hedge-fund/blob/main/graph.py) to rebuild the executable agent graph.**

The virattt/ai-hedge-fund repository provides a sophisticated backend for building AI-powered investment strategies through composable agent graphs. When you construct a flow in the UI—connecting analyst agents, portfolio managers, and risk managers—you need a reliable mechanism to persist hedge fund flows and restore them for later execution without manually recreating the node configuration each time.

## Understanding Flow Persistence Architecture

The backend stores a complete flow definition as a row in the `hedge_fund_flows` relational table. This record contains JSON-compatible columns for `nodes` (agent instances), `edges` (connections), `viewport` (UI state), and metadata. The SQLAlchemy ORM model `HedgeFundFlow` in [`src/app/backend/database/models.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/database/models.py) maps these columns to Python objects.

When persisting, `FlowRepository` in [`src/app/backend/repositories/flow_repository.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/repositories/flow_repository.py) handles the database transaction. When restoring, the `create_graph` function in [`src/app/backend/services/graph.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/services/graph.py) reconstructs the executable `StateGraph` from the stored JSON structures.

## Persisting a New Flow

### The FlowCreateRequest Schema

Before storage, the FastAPI endpoint validates incoming requests against the `FlowCreateRequest` Pydantic schema in [`src/app/backend/models/schemas.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/models/schemas.py). This schema enforces required fields including `name`, `nodes`, and `edges`, ensuring the flow definition is structurally complete before it reaches the database layer.

### Repository Layer and Database Insertion

The `FlowRepository.create_flow` method in [`src/app/backend/repositories/flow_repository.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/repositories/flow_repository.py) converts the validated `FlowCreateRequest` into a `HedgeFundFlow` ORM object. It commits the JSON-compatible `nodes`, `edges`, `viewport`, and optional metadata to the relational database, assigning a unique integer `id` that serves as the persistent identifier for later retrieval.

### Example: Creating a Flow via API

You can persist a flow by sending a POST request to the `/flows` endpoint defined in [`src/app/backend/routes/flows.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/routes/flows.py):

```bash
curl -X POST https://your-api.com/flows \
  -H "Content-Type: application/json" \
  -d '{
        "name": "My First Hedge Fund Flow",
        "description": "Demo flow with two analyst agents",
        "nodes": [
          {"id": "warren_buffett_abc123", "type": "agent", "data": {"agent_id":"warren_buffett"}},
          {"id": "growth_agent_def456", "type": "agent", "data": {"agent_id":"growth_agent"}}
        ],
        "edges": [
          {"id": "e1", "source": "warren_buffett_abc123", "target": "growth_agent_def456"}
        ],
        "viewport": {"x":0,"y":0,"zoom":1},
        "is_template": false,
        "tags": ["demo","test"]
      }'

```

Behind the scenes, the request is validated against `FlowCreateRequest` in [`src/app/backend/models/schemas.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/models/schemas.py), and `FlowRepository.create_flow` inserts the row into `hedge_fund_flows`.

## Restoring and Executing a Stored Flow

### Fetching Flow Data

To restore a flow for later execution, send a GET request to `/flows/{flow_id}` implemented in [`src/app/backend/routes/flows.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/routes/flows.py). The endpoint returns a `FlowResponse` containing the original `nodes`, `edges`, and viewport data. You can also list all available flows via `GET /flows` to locate the specific `id` you need.

### Reconstructing the StateGraph

The `create_graph` function in [`src/app/backend/services/graph.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/services/graph.py) accepts the stored `graph_nodes` and `graph_edges` from the `FlowResponse`, then reconstructs the `StateGraph` by mapping the JSON definitions back to agent instances and connection logic. This rebuilt graph is then executable via `run_graph_async` with new parameters such as tickers, date ranges, and model configurations.

### Example: Restoring and Running a Flow

```python
import requests

API_URL = "https://your-api.com"
FLOW_ID = 42

# Fetch the stored flow

resp = requests.get(f"{API_URL}/flows/{FLOW_ID}")
resp.raise_for_status()
flow = resp.json()          # contains `nodes`, `edges`, `viewport`, etc.

# Pass the flow data to the graph runner

from app.backend.services.graph import create_graph, run_graph_async

graph = create_graph(flow["graph_nodes"], flow["graph_edges"])
portfolio = {"cash": 100_000, "positions": {}}

result = await run_graph_async(
    graph,
    portfolio,
    tickers=["AAPL", "MSFT"],
    start_date="2024-01-01",
    end_date="2024-03-31",
    model_name="gpt-4.1",
    model_provider="OPENAI",
)
print(result)

```

The `FlowResponse` model in [`src/app/backend/models/schemas.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/models/schemas.py) supplies the exact JSON structure needed by `create_graph`, ensuring seamless restoration of the agent workflow.

## Tracking Flow Executions

Each execution creates a `HedgeFundFlowRun` record defined in [`src/app/backend/database/models.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/database/models.py) and managed by `FlowRunRepository` in [`src/app/backend/repositories/flow_run_repository.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/repositories/flow_run_repository.py). This stores the initial `request_data`, execution status (pending, complete, or failed), and final `results`, enabling you to audit performance across multiple runs of the same persisted flow.

When `run_graph_async` finishes, the service handling the request (such as the `/hedge_fund` endpoint in [`src/app/backend/routes/hedge_fund.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/routes/hedge_fund.py)) typically records the outcome:

```python
from app.backend.repositories.flow_run_repository import FlowRunRepository
from app.backend.models.schemas import FlowRunStatus

run_repo = FlowRunRepository(db_session)
flow_run = run_repo.create_flow_run(flow_id=FLOW_ID, request_data=run_request)

# ... after execution completes

run_repo.update_flow_run(
    run_id=flow_run.id,
    status=FlowRunStatus.COMPLETE,
    results=execution_output,
)

```

This creates a complete audit trail, allowing you to compare results across different executions of the same flow configuration.

## Key Files and Components

Understanding the persistence mechanism requires familiarity with these specific files in the virattt/ai-hedge-fund repository:

- **[`src/app/backend/models/schemas.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/models/schemas.py)** – Contains Pydantic request/response models including `FlowCreateRequest`, `FlowResponse`, and `FlowRunCreateRequest` that validate API payloads.
- **[`src/app/backend/database/models.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/database/models.py)** – Defines SQLAlchemy ORM classes `HedgeFundFlow` and `HedgeFundFlowRun` that map to the relational database tables.
- **[`src/app/backend/repositories/flow_repository.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/repositories/flow_repository.py)** – Implements `FlowRepository` with `create_flow` and retrieval methods that handle database transactions.
- **[`src/app/backend/repositories/flow_run_repository.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/repositories/flow_run_repository.py)** – Manages execution history through `FlowRunRepository`, storing run metadata and results.
- **[`src/app/backend/routes/flows.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/routes/flows.py)** – FastAPI endpoints `POST /flows` and `GET /flows/{id}` that expose persistence operations to clients.
- **[`src/app/backend/services/graph.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/services/graph.py)** – Contains `create_graph` and `run_graph_async` functions that reconstruct and execute stored flows.
- **[`src/app/backend/routes/hedge_fund.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/routes/hedge_fund.py)** – Example usage of flow execution and run tracking in the main hedge fund endpoint.

## Summary

- **Persistence**: POST JSON flow definitions to `/flows` to store them in the `hedge_fund_flows` table via `FlowRepository.create_flow` in [`src/app/backend/repositories/flow_repository.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/repositories/flow_repository.py).
- **Validation**: The `FlowCreateRequest` schema in [`src/app/backend/models/schemas.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/models/schemas.py) enforces required fields (`name`, `nodes`, `edges`) before database insertion.
- **Restoration**: Fetch stored flows via `GET /flows/{flow_id}` and pass the returned `nodes` and `edges` to `create_graph` in [`src/app/backend/services/graph.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/services/graph.py) to rebuild the executable `StateGraph`.
- **Execution Tracking**: Each run creates a `HedgeFundFlowRun` record managed by `FlowRunRepository` in [`src/app/backend/repositories/flow_run_repository.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/repositories/flow_run_repository.py), storing request data, status, and results for audit trails.

## Frequently Asked Questions

### What database schema stores the flow definitions?

Flow definitions are stored in the `hedge_fund_flows` table, mapped by the `HedgeFundFlow` SQLAlchemy ORM class in [`src/app/backend/database/models.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/database/models.py). This table stores JSON-compatible columns for `nodes`, `edges`, `viewport`, and metadata, along with a unique integer `id` primary key and timestamps.

### How does the system reconstruct the agent graph from stored data?

The `create_graph` function in [`src/app/backend/services/graph.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/services/graph.py) accepts the stored `graph_nodes` and `graph_edges` from the `FlowResponse`, then reconstructs the `StateGraph` by mapping the JSON definitions back to agent instances and connection logic. This rebuilt graph is then executable via `run_graph_async` with new parameters such as tickers, date ranges, and model configurations.

### Can I track the execution history of a persisted flow?

Yes. Each execution creates a `HedgeFundFlowRun` record managed by `FlowRunRepository` in [`src/app/backend/repositories/flow_run_repository.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/repositories/flow_run_repository.py). This stores the initial `request_data`, execution status (pending, complete, or failed), and final `results`, enabling you to audit performance across multiple runs of the same persisted flow configuration.

### What validation occurs when persisting a new flow?

Incoming requests are validated against the `FlowCreateRequest` Pydantic schema in [`src/app/backend/models/schemas.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/app/backend/models/schemas.py). This schema enforces required fields including `name`, `nodes`, and `edges`, ensuring the flow definition is structurally complete before `FlowRepository.create_flow` commits it to the database.