How Agent Signals Propagate Through the Graph in AI Hedge Fund
Agent signals propagate through the AI Hedge Fund graph via a shared mutable state dictionary (state["data"]["analyst_signals"]) that each agent writes to and downstream agents read from, orchestrated by LangGraph's topological execution order.
The virattt/ai-hedge-fund repository implements a multi-agent financial analysis system where analyst agents, risk managers, and portfolio managers communicate through a centralized state object. This architecture allows signals to flow naturally from data gathering to final trading decisions without explicit message passing between nodes.
The AgentState Structure
Signal propagation begins with the state definition in src/graph/state.py. The AgentState TypedDict declares a mutable data field that persists across graph execution:
# src/graph/state.py
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
data: Annotated[dict[str, any], merge_dicts] # holds `analyst_signals`
metadata: Annotated[dict[str, any], merge_dicts]
The analyst_signals key within state["data"] serves as the central broadcast channel. Every agent receives the full state object, writes its analysis results to this dictionary using its unique agent ID as the key (e.g., "warren_buffett_agent"), and passes the modified state to the next node in the graph.
Graph Construction and Node Registration
The signal pathway is constructed dynamically in app/backend/services/graph.py. The create_graph function builds the LangGraph workflow by mapping analyst configurations to executable nodes:
# app/backend/services/graph.py
def create_graph(graph_nodes, graph_edges) -> StateGraph:
graph = StateGraph(AgentState)
graph.add_node("start_node", start)
# Build mapping of analyst config → (node_name, func)
analyst_nodes = {key: (f"{key}_agent", cfg["agent_func"])
for key, cfg in ANALYST_CONFIG.items()}
# Add analyst nodes (skip portfolio manager for now)
for unique_id in agent_ids:
base_key = extract_base_agent_key(unique_id)
if base_key == "portfolio_manager":
portfolio_manager_nodes.add(unique_id); continue
if base_key not in ANALYST_CONFIG: continue
node_name, node_func = analyst_nodes[base_key]
agent_fn = create_agent_function(node_func, unique_id)
graph.add_node(unique_id, agent_fn)
# Add a paired risk‑manager for each portfolio manager
...
# Add edges from the React‑Flow definition
for edge in graph_edges:
if edge.source in agent_ids_set and edge.target in agent_ids_set:
graph.add_edge(edge.source, edge.target)
Key architectural details:
- Analyst nodes derive from
ANALYST_CONFIGdefined insrc/utils/analysts.py - Each portfolio manager automatically receives a paired risk-management agent (
risk_management_agent_<suffix>) - Direct analyst-to-portfolio-manager edges are intercepted and rewired through the risk manager to enforce risk checks before trading decisions
The Signal Flow Execution Pipeline
When run_graph executes, LangGraph processes nodes in topological order, ensuring that signals propagate forward automatically through state mutation:
1. Initialization
The workflow begins with an empty signals dictionary:
# app/backend/services/graph.py
return graph.invoke(
{
"messages": [HumanMessage(... )],
"data": {
"tickers": tickers,
"portfolio": portfolio,
"start_date": start_date,
"end_date": end_date,
"analyst_signals": {}, # ← empty at start
},
"metadata": {...},
},
)
2. Analyst Execution
Each analyst agent computes its signal and writes directly to the shared state:
# Example: src/agents/warren_buffett.py
state["data"]["analyst_signals"][agent_id] = buffett_analysis
Because state is passed by reference and mutated in-place, these writes are immediately visible to all subsequent nodes.
3. Risk Management Intervention
The risk-management agent reads existing analyst signals, calculates position limits based on volatility and correlation data, and appends its own constraints:
# src/agents/risk_manager.py
state["data"]["analyst_signals"][agent_id] = risk_analysis
4. Portfolio Aggregation
Finally, the portfolio-management agent in src/agents/portfolio_manager.py consumes the complete signal set:
# src/agents/portfolio_manager.py
analyst_signals = state["data"]["analyst_signals"]
risk_data = analyst_signals.get(risk_manager_id, {}).get(ticker, {})
# … compress signals from every other analyst …
At this stage, the portfolio manager has access to every upstream signal, including risk constraints, enabling informed trade sizing decisions.
Signal Routing Behaviors
The graph handles several edge cases through specific routing logic:
| Scenario | Routing Mechanism | Signal Flow Result |
|---|---|---|
| Direct Analyst → Portfolio Manager | Edges are rewired through direct_to_portfolio_managers logic |
Analyst → Risk Manager → Portfolio Manager |
| Multiple Analysts Converging | All incoming edges route to the paired risk manager first | Signals aggregate in analyst_signals before portfolio evaluation |
| Isolated Analyst Nodes | Automatically connected to the start node if no incoming edges exist | Analyst runs early, making signals available to all downstream consumers |
| Risk Manager Bypass | Prohibited by construction—every portfolio manager has a mandatory risk check | Ensures risk data always precedes trading decisions |
Implementation Guide: Adding Custom Agents
To extend the signal propagation pipeline with a new analyst, you must implement the write-to-state pattern and register the agent in the configuration.
Step 1: Define the Agent Function
Create a new agent that follows the state mutation contract:
# src/agents/my_new_analyst.py
from src.graph.state import AgentState
from pydantic import BaseModel, Field
from typing_extensions import Literal
class MySignal(BaseModel):
signal: Literal["bullish", "bearish", "neutral"]
confidence: int = Field(description="0‑100")
reasoning: str
def my_new_analyst(state: AgentState, agent_id: str = "my_new_analyst_agent"):
# … compute signal logic …
analysis = {
"signal": "bullish",
"confidence": 80,
"reasoning": "Strong earnings growth"
}
state["data"]["analyst_signals"][agent_id] = analysis
return {"messages": state["messages"], "data": state["data"]}
Step 2: Register in Analyst Configuration
Add the agent to the central registry in src/utils/analysts.py:
ANALYST_CONFIG["my_new_analyst"] = {
"display_name": "My New Analyst",
"description": "Custom signal generator",
"investing_style": "Focused on earnings momentum",
"agent_func": my_new_analyst,
"type": "analyst",
"order": 17,
}
Step 3: Configure Graph Edges
Wire the agent into the execution flow via the React Flow UI or JSON payload:
{
"source": "my_new_analyst_ab12cd",
"target": "risk_management_agent_ef34gh"
}
When the graph executes, your agent's output will automatically propagate to the risk manager and subsequently to the portfolio manager through the shared state dictionary.
Debugging Signal Propagation
To inspect how signals propagate during execution, capture the final state after running the graph:
from app.backend.services.graph import run_graph, create_graph
# Build graph with specific analysts and edges
graph = create_graph(
graph_nodes=[
type('Node', (), {'id': 'warren_buffett_123abc'}),
type('Node', (), {'id': 'technical_analyst_456def'}),
type('Node', (), {'id': 'portfolio_manager_789ghi'})
],
graph_edges=[
type('Edge', (), {'source': 'warren_buffett_123abc', 'target': 'risk_management_agent_abc'}),
type('Edge', (), {'source': 'technical_analyst_456def', 'target': 'risk_management_agent_abc'}),
type('Edge', (), {'source': 'risk_management_agent_abc', 'target': 'portfolio_manager_789ghi'}),
]
)
result = run_graph(
graph,
portfolio={"cash": 100_000, "positions": {}},
tickers=["AAPL"],
start_date="2024-01-01",
end_date="2024-06-30",
model_name="gpt-4o",
model_provider="openai"
)
# Inspect propagated signals
import json
print(json.dumps(result["data"]["analyst_signals"], indent=2))
The output will show entries for warren_buffett_agent, technical_analyst_agent, and risk_management_agent, demonstrating the accumulated state after topological execution.
Summary
- State-based propagation occurs through
state["data"]["analyst_signals"], a mutable dictionary shared across all nodes insrc/graph/state.py - Topological execution ensures upstream agents complete before downstream consumers read their signals
- Mandatory risk interception routes all analyst signals through the risk manager before reaching the portfolio manager
- Automatic aggregation allows the portfolio manager to view the complete signal history without explicit message passing
- Extensibility follows a simple pattern: write to the shared state dictionary using your unique agent ID, register in
ANALYST_CONFIG, and wire edges in the graph definition
Frequently Asked Questions
How does the portfolio manager access signals from multiple analysts?
The portfolio manager retrieves all accumulated signals by reading state["data"]["analyst_signals"] as implemented in src/agents/portfolio_manager.py. Because LangGraph executes nodes in topological order, by the time the portfolio manager runs, every upstream analyst and the risk manager have already written their entries to this dictionary. The manager iterates through this dictionary to compress signals into trading decisions.
What happens if two analysts produce conflicting signals?
The graph does not resolve conflicts during propagation—it simply aggregates all signals into analyst_signals. Conflict resolution occurs in the portfolio management layer, where the agent weighs signals based on confidence scores, historical performance, and risk constraints. The risk manager may also override position sizes based on volatility limits regardless of analyst bullishness.
Can agents communicate directly without going through the state object?
No. The architecture enforces state-based communication exclusively. Agents cannot send direct messages to each other; they must write to state["data"]["analyst_signals"] or state["messages"]. This design pattern ensures reproducibility and allows LangGraph to track the complete execution history for debugging purposes.
Why are analyst signals stored in a dictionary rather than a list?
The dictionary structure keyed by agent_id prevents duplicate entries and enables O(1) lookups when downstream agents need specific signals (such as when the portfolio manager retrieves risk data using analyst_signals.get(risk_manager_id, {})). The merge_dicts reducer function in src/graph/state.py handles concurrent updates without data loss.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →