How Agent Signals Propagate Through the Graph in AI Hedge Fund

Agent signals propagate through the AI Hedge Fund graph via a shared mutable state dictionary (state["data"]["analyst_signals"]) that each agent writes to and downstream agents read from, orchestrated by LangGraph's topological execution order.

The virattt/ai-hedge-fund repository implements a multi-agent financial analysis system where analyst agents, risk managers, and portfolio managers communicate through a centralized state object. This architecture allows signals to flow naturally from data gathering to final trading decisions without explicit message passing between nodes.

The AgentState Structure

Signal propagation begins with the state definition in src/graph/state.py. The AgentState TypedDict declares a mutable data field that persists across graph execution:


# src/graph/state.py

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    data:    Annotated[dict[str, any], merge_dicts]   # holds `analyst_signals`

    metadata: Annotated[dict[str, any], merge_dicts]

The analyst_signals key within state["data"] serves as the central broadcast channel. Every agent receives the full state object, writes its analysis results to this dictionary using its unique agent ID as the key (e.g., "warren_buffett_agent"), and passes the modified state to the next node in the graph.

Graph Construction and Node Registration

The signal pathway is constructed dynamically in app/backend/services/graph.py. The create_graph function builds the LangGraph workflow by mapping analyst configurations to executable nodes:


# app/backend/services/graph.py

def create_graph(graph_nodes, graph_edges) -> StateGraph:
    graph = StateGraph(AgentState)
    graph.add_node("start_node", start)

    # Build mapping of analyst config → (node_name, func)

    analyst_nodes = {key: (f"{key}_agent", cfg["agent_func"])
                    for key, cfg in ANALYST_CONFIG.items()}

    # Add analyst nodes (skip portfolio manager for now)

    for unique_id in agent_ids:
        base_key = extract_base_agent_key(unique_id)
        if base_key == "portfolio_manager":
            portfolio_manager_nodes.add(unique_id); continue
        if base_key not in ANALYST_CONFIG: continue
        node_name, node_func = analyst_nodes[base_key]
        agent_fn = create_agent_function(node_func, unique_id)
        graph.add_node(unique_id, agent_fn)

    # Add a paired risk‑manager for each portfolio manager

    ...
    # Add edges from the React‑Flow definition

    for edge in graph_edges:
        if edge.source in agent_ids_set and edge.target in agent_ids_set:
            graph.add_edge(edge.source, edge.target)

Key architectural details:

  • Analyst nodes derive from ANALYST_CONFIG defined in src/utils/analysts.py
  • Each portfolio manager automatically receives a paired risk-management agent (risk_management_agent_<suffix>)
  • Direct analyst-to-portfolio-manager edges are intercepted and rewired through the risk manager to enforce risk checks before trading decisions

The Signal Flow Execution Pipeline

When run_graph executes, LangGraph processes nodes in topological order, ensuring that signals propagate forward automatically through state mutation:

1. Initialization

The workflow begins with an empty signals dictionary:


# app/backend/services/graph.py

return graph.invoke(
    {
        "messages": [HumanMessage(... )],
        "data": {
            "tickers": tickers,
            "portfolio": portfolio,
            "start_date": start_date,
            "end_date": end_date,
            "analyst_signals": {},          # ← empty at start

        },
        "metadata": {...},
    },
)

2. Analyst Execution

Each analyst agent computes its signal and writes directly to the shared state:


# Example: src/agents/warren_buffett.py

state["data"]["analyst_signals"][agent_id] = buffett_analysis

Because state is passed by reference and mutated in-place, these writes are immediately visible to all subsequent nodes.

3. Risk Management Intervention

The risk-management agent reads existing analyst signals, calculates position limits based on volatility and correlation data, and appends its own constraints:


# src/agents/risk_manager.py

state["data"]["analyst_signals"][agent_id] = risk_analysis

4. Portfolio Aggregation

Finally, the portfolio-management agent in src/agents/portfolio_manager.py consumes the complete signal set:


# src/agents/portfolio_manager.py

analyst_signals = state["data"]["analyst_signals"]
risk_data = analyst_signals.get(risk_manager_id, {}).get(ticker, {})

# … compress signals from every other analyst …

At this stage, the portfolio manager has access to every upstream signal, including risk constraints, enabling informed trade sizing decisions.

Signal Routing Behaviors

The graph handles several edge cases through specific routing logic:

Scenario Routing Mechanism Signal Flow Result
Direct Analyst → Portfolio Manager Edges are rewired through direct_to_portfolio_managers logic Analyst → Risk Manager → Portfolio Manager
Multiple Analysts Converging All incoming edges route to the paired risk manager first Signals aggregate in analyst_signals before portfolio evaluation
Isolated Analyst Nodes Automatically connected to the start node if no incoming edges exist Analyst runs early, making signals available to all downstream consumers
Risk Manager Bypass Prohibited by construction—every portfolio manager has a mandatory risk check Ensures risk data always precedes trading decisions

Implementation Guide: Adding Custom Agents

To extend the signal propagation pipeline with a new analyst, you must implement the write-to-state pattern and register the agent in the configuration.

Step 1: Define the Agent Function

Create a new agent that follows the state mutation contract:


# src/agents/my_new_analyst.py

from src.graph.state import AgentState
from pydantic import BaseModel, Field
from typing_extensions import Literal

class MySignal(BaseModel):
    signal: Literal["bullish", "bearish", "neutral"]
    confidence: int = Field(description="0‑100")
    reasoning: str

def my_new_analyst(state: AgentState, agent_id: str = "my_new_analyst_agent"):
    # … compute signal logic …

    analysis = {
        "signal": "bullish", 
        "confidence": 80, 
        "reasoning": "Strong earnings growth"
    }
    state["data"]["analyst_signals"][agent_id] = analysis
    return {"messages": state["messages"], "data": state["data"]}

Step 2: Register in Analyst Configuration

Add the agent to the central registry in src/utils/analysts.py:

ANALYST_CONFIG["my_new_analyst"] = {
    "display_name": "My New Analyst",
    "description": "Custom signal generator",
    "investing_style": "Focused on earnings momentum",
    "agent_func": my_new_analyst,
    "type": "analyst",
    "order": 17,
}

Step 3: Configure Graph Edges

Wire the agent into the execution flow via the React Flow UI or JSON payload:

{
  "source": "my_new_analyst_ab12cd",
  "target": "risk_management_agent_ef34gh"
}

When the graph executes, your agent's output will automatically propagate to the risk manager and subsequently to the portfolio manager through the shared state dictionary.

Debugging Signal Propagation

To inspect how signals propagate during execution, capture the final state after running the graph:

from app.backend.services.graph import run_graph, create_graph

# Build graph with specific analysts and edges

graph = create_graph(
    graph_nodes=[
        type('Node', (), {'id': 'warren_buffett_123abc'}),
        type('Node', (), {'id': 'technical_analyst_456def'}),
        type('Node', (), {'id': 'portfolio_manager_789ghi'})
    ],
    graph_edges=[
        type('Edge', (), {'source': 'warren_buffett_123abc', 'target': 'risk_management_agent_abc'}),
        type('Edge', (), {'source': 'technical_analyst_456def', 'target': 'risk_management_agent_abc'}),
        type('Edge', (), {'source': 'risk_management_agent_abc', 'target': 'portfolio_manager_789ghi'}),
    ]
)

result = run_graph(
    graph,
    portfolio={"cash": 100_000, "positions": {}},
    tickers=["AAPL"],
    start_date="2024-01-01",
    end_date="2024-06-30",
    model_name="gpt-4o",
    model_provider="openai"
)

# Inspect propagated signals

import json
print(json.dumps(result["data"]["analyst_signals"], indent=2))

The output will show entries for warren_buffett_agent, technical_analyst_agent, and risk_management_agent, demonstrating the accumulated state after topological execution.

Summary

  • State-based propagation occurs through state["data"]["analyst_signals"], a mutable dictionary shared across all nodes in src/graph/state.py
  • Topological execution ensures upstream agents complete before downstream consumers read their signals
  • Mandatory risk interception routes all analyst signals through the risk manager before reaching the portfolio manager
  • Automatic aggregation allows the portfolio manager to view the complete signal history without explicit message passing
  • Extensibility follows a simple pattern: write to the shared state dictionary using your unique agent ID, register in ANALYST_CONFIG, and wire edges in the graph definition

Frequently Asked Questions

How does the portfolio manager access signals from multiple analysts?

The portfolio manager retrieves all accumulated signals by reading state["data"]["analyst_signals"] as implemented in src/agents/portfolio_manager.py. Because LangGraph executes nodes in topological order, by the time the portfolio manager runs, every upstream analyst and the risk manager have already written their entries to this dictionary. The manager iterates through this dictionary to compress signals into trading decisions.

What happens if two analysts produce conflicting signals?

The graph does not resolve conflicts during propagation—it simply aggregates all signals into analyst_signals. Conflict resolution occurs in the portfolio management layer, where the agent weighs signals based on confidence scores, historical performance, and risk constraints. The risk manager may also override position sizes based on volatility limits regardless of analyst bullishness.

Can agents communicate directly without going through the state object?

No. The architecture enforces state-based communication exclusively. Agents cannot send direct messages to each other; they must write to state["data"]["analyst_signals"] or state["messages"]. This design pattern ensures reproducibility and allows LangGraph to track the complete execution history for debugging purposes.

Why are analyst signals stored in a dictionary rather than a list?

The dictionary structure keyed by agent_id prevents duplicate entries and enables O(1) lookups when downstream agents need specific signals (such as when the portfolio manager retrieves risk data using analyst_signals.get(risk_manager_id, {})). The merge_dicts reducer function in src/graph/state.py handles concurrent updates without data loss.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →