# How Agent Signals Propagate Through the Graph in AI Hedge Fund

> Discover how agent signals propagate through the AI hedge fund graph. Learn about shared mutable state and LangGraph's topological execution for efficient signal sharing.

- Repository: [Virat Singh/ai-hedge-fund](https://github.com/virattt/ai-hedge-fund)
- Tags: internals
- Published: 2026-03-09

---

**Agent signals propagate through the AI Hedge Fund graph via a shared mutable state dictionary (`state["data"]["analyst_signals"]`) that each agent writes to and downstream agents read from, orchestrated by LangGraph's topological execution order.**

The `virattt/ai-hedge-fund` repository implements a multi-agent financial analysis system where analyst agents, risk managers, and portfolio managers communicate through a centralized state object. This architecture allows signals to flow naturally from data gathering to final trading decisions without explicit message passing between nodes.

## The AgentState Structure

Signal propagation begins with the state definition in [`src/graph/state.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/graph/state.py). The `AgentState` TypedDict declares a mutable `data` field that persists across graph execution:

```python

# src/graph/state.py

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    data:    Annotated[dict[str, any], merge_dicts]   # holds `analyst_signals`

    metadata: Annotated[dict[str, any], merge_dicts]

```

The `analyst_signals` key within `state["data"]` serves as the central broadcast channel. Every agent receives the full state object, writes its analysis results to this dictionary using its unique agent ID as the key (e.g., `"warren_buffett_agent"`), and passes the modified state to the next node in the graph.

## Graph Construction and Node Registration

The signal pathway is constructed dynamically in [`app/backend/services/graph.py`](https://github.com/virattt/ai-hedge-fund/blob/main/app/backend/services/graph.py). The `create_graph` function builds the LangGraph workflow by mapping analyst configurations to executable nodes:

```python

# app/backend/services/graph.py

def create_graph(graph_nodes, graph_edges) -> StateGraph:
    graph = StateGraph(AgentState)
    graph.add_node("start_node", start)

    # Build mapping of analyst config → (node_name, func)

    analyst_nodes = {key: (f"{key}_agent", cfg["agent_func"])
                    for key, cfg in ANALYST_CONFIG.items()}

    # Add analyst nodes (skip portfolio manager for now)

    for unique_id in agent_ids:
        base_key = extract_base_agent_key(unique_id)
        if base_key == "portfolio_manager":
            portfolio_manager_nodes.add(unique_id); continue
        if base_key not in ANALYST_CONFIG: continue
        node_name, node_func = analyst_nodes[base_key]
        agent_fn = create_agent_function(node_func, unique_id)
        graph.add_node(unique_id, agent_fn)

    # Add a paired risk‑manager for each portfolio manager

    ...
    # Add edges from the React‑Flow definition

    for edge in graph_edges:
        if edge.source in agent_ids_set and edge.target in agent_ids_set:
            graph.add_edge(edge.source, edge.target)

```

**Key architectural details:**
- Analyst nodes derive from `ANALYST_CONFIG` defined in [`src/utils/analysts.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/utils/analysts.py)
- Each portfolio manager automatically receives a paired risk-management agent (`risk_management_agent_<suffix>`)
- Direct analyst-to-portfolio-manager edges are intercepted and rewired through the risk manager to enforce risk checks before trading decisions

## The Signal Flow Execution Pipeline

When `run_graph` executes, LangGraph processes nodes in topological order, ensuring that **signals propagate forward automatically** through state mutation:

### 1. Initialization

The workflow begins with an empty signals dictionary:

```python

# app/backend/services/graph.py

return graph.invoke(
    {
        "messages": [HumanMessage(... )],
        "data": {
            "tickers": tickers,
            "portfolio": portfolio,
            "start_date": start_date,
            "end_date": end_date,
            "analyst_signals": {},          # ← empty at start

        },
        "metadata": {...},
    },
)

```

### 2. Analyst Execution

Each analyst agent computes its signal and writes directly to the shared state:

```python

# Example: src/agents/warren_buffett.py

state["data"]["analyst_signals"][agent_id] = buffett_analysis

```

Because `state` is passed by reference and mutated in-place, these writes are immediately visible to all subsequent nodes.

### 3. Risk Management Intervention

The risk-management agent reads existing analyst signals, calculates position limits based on volatility and correlation data, and appends its own constraints:

```python

# src/agents/risk_manager.py

state["data"]["analyst_signals"][agent_id] = risk_analysis

```

### 4. Portfolio Aggregation

Finally, the portfolio-management agent in [`src/agents/portfolio_manager.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/agents/portfolio_manager.py) consumes the complete signal set:

```python

# src/agents/portfolio_manager.py

analyst_signals = state["data"]["analyst_signals"]
risk_data = analyst_signals.get(risk_manager_id, {}).get(ticker, {})

# … compress signals from every other analyst …

```

At this stage, the portfolio manager has access to every upstream signal, including risk constraints, enabling informed trade sizing decisions.

## Signal Routing Behaviors

The graph handles several edge cases through specific routing logic:

| Scenario | Routing Mechanism | Signal Flow Result |
|----------|------------------|-------------------|
| **Direct Analyst → Portfolio Manager** | Edges are rewired through `direct_to_portfolio_managers` logic | Analyst → Risk Manager → Portfolio Manager |
| **Multiple Analysts Converging** | All incoming edges route to the paired risk manager first | Signals aggregate in `analyst_signals` before portfolio evaluation |
| **Isolated Analyst Nodes** | Automatically connected to the start node if no incoming edges exist | Analyst runs early, making signals available to all downstream consumers |
| **Risk Manager Bypass** | Prohibited by construction—every portfolio manager has a mandatory risk check | Ensures risk data always precedes trading decisions |

## Implementation Guide: Adding Custom Agents

To extend the signal propagation pipeline with a new analyst, you must implement the write-to-state pattern and register the agent in the configuration.

### Step 1: Define the Agent Function

Create a new agent that follows the state mutation contract:

```python

# src/agents/my_new_analyst.py

from src.graph.state import AgentState
from pydantic import BaseModel, Field
from typing_extensions import Literal

class MySignal(BaseModel):
    signal: Literal["bullish", "bearish", "neutral"]
    confidence: int = Field(description="0‑100")
    reasoning: str

def my_new_analyst(state: AgentState, agent_id: str = "my_new_analyst_agent"):
    # … compute signal logic …

    analysis = {
        "signal": "bullish", 
        "confidence": 80, 
        "reasoning": "Strong earnings growth"
    }
    state["data"]["analyst_signals"][agent_id] = analysis
    return {"messages": state["messages"], "data": state["data"]}

```

### Step 2: Register in Analyst Configuration

Add the agent to the central registry in [`src/utils/analysts.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/utils/analysts.py):

```python
ANALYST_CONFIG["my_new_analyst"] = {
    "display_name": "My New Analyst",
    "description": "Custom signal generator",
    "investing_style": "Focused on earnings momentum",
    "agent_func": my_new_analyst,
    "type": "analyst",
    "order": 17,
}

```

### Step 3: Configure Graph Edges

Wire the agent into the execution flow via the React Flow UI or JSON payload:

```json
{
  "source": "my_new_analyst_ab12cd",
  "target": "risk_management_agent_ef34gh"
}

```

When the graph executes, your agent's output will automatically propagate to the risk manager and subsequently to the portfolio manager through the shared state dictionary.

## Debugging Signal Propagation

To inspect how signals propagate during execution, capture the final state after running the graph:

```python
from app.backend.services.graph import run_graph, create_graph

# Build graph with specific analysts and edges

graph = create_graph(
    graph_nodes=[
        type('Node', (), {'id': 'warren_buffett_123abc'}),
        type('Node', (), {'id': 'technical_analyst_456def'}),
        type('Node', (), {'id': 'portfolio_manager_789ghi'})
    ],
    graph_edges=[
        type('Edge', (), {'source': 'warren_buffett_123abc', 'target': 'risk_management_agent_abc'}),
        type('Edge', (), {'source': 'technical_analyst_456def', 'target': 'risk_management_agent_abc'}),
        type('Edge', (), {'source': 'risk_management_agent_abc', 'target': 'portfolio_manager_789ghi'}),
    ]
)

result = run_graph(
    graph,
    portfolio={"cash": 100_000, "positions": {}},
    tickers=["AAPL"],
    start_date="2024-01-01",
    end_date="2024-06-30",
    model_name="gpt-4o",
    model_provider="openai"
)

# Inspect propagated signals

import json
print(json.dumps(result["data"]["analyst_signals"], indent=2))

```

The output will show entries for `warren_buffett_agent`, `technical_analyst_agent`, and `risk_management_agent`, demonstrating the accumulated state after topological execution.

## Summary

- **State-based propagation** occurs through `state["data"]["analyst_signals"]`, a mutable dictionary shared across all nodes in [`src/graph/state.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/graph/state.py)
- **Topological execution** ensures upstream agents complete before downstream consumers read their signals
- **Mandatory risk interception** routes all analyst signals through the risk manager before reaching the portfolio manager
- **Automatic aggregation** allows the portfolio manager to view the complete signal history without explicit message passing
- **Extensibility** follows a simple pattern: write to the shared state dictionary using your unique agent ID, register in `ANALYST_CONFIG`, and wire edges in the graph definition

## Frequently Asked Questions

### How does the portfolio manager access signals from multiple analysts?

The portfolio manager retrieves all accumulated signals by reading `state["data"]["analyst_signals"]` as implemented in [`src/agents/portfolio_manager.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/agents/portfolio_manager.py). Because LangGraph executes nodes in topological order, by the time the portfolio manager runs, every upstream analyst and the risk manager have already written their entries to this dictionary. The manager iterates through this dictionary to compress signals into trading decisions.

### What happens if two analysts produce conflicting signals?

The graph does not resolve conflicts during propagation—it simply aggregates all signals into `analyst_signals`. Conflict resolution occurs in the portfolio management layer, where the agent weighs signals based on confidence scores, historical performance, and risk constraints. The risk manager may also override position sizes based on volatility limits regardless of analyst bullishness.

### Can agents communicate directly without going through the state object?

No. The architecture enforces state-based communication exclusively. Agents cannot send direct messages to each other; they must write to `state["data"]["analyst_signals"]` or `state["messages"]`. This design pattern ensures reproducibility and allows LangGraph to track the complete execution history for debugging purposes.

### Why are analyst signals stored in a dictionary rather than a list?

The dictionary structure keyed by `agent_id` prevents duplicate entries and enables O(1) lookups when downstream agents need specific signals (such as when the portfolio manager retrieves risk data using `analyst_signals.get(risk_manager_id, {})`). The `merge_dicts` reducer function in [`src/graph/state.py`](https://github.com/virattt/ai-hedge-fund/blob/main/src/graph/state.py) handles concurrent updates without data loss.