How LangGraph Orchestrates Analyst Agents in ai-hedge-fund: Dynamic Workflow Architecture
LangGraph orchestrates analyst agents in ai-hedge-fund by dynamically constructing a StateGraph at runtime, where each selected analyst node runs in parallel from a shared start node, feeds signals into a centralized risk-management agent, and culminates in a portfolio-manager decision node before terminating.
The virattt/ai-hedge-fund repository implements a modular AI hedge fund system where LangGraph serves as the orchestration backbone. Instead of hard-coding agent connections, the system dynamically wires analyst agents into a directed graph based on runtime configuration. This architecture allows LangGraph to orchestrate analyst agents through a shared state object that propagates market data and trading signals through every node in the workflow.
Dynamic Graph Construction Architecture
The AgentState Shared Memory
At the core of the orchestration lies AgentState, defined in src/graph/state.py (lines 14-19). This typed dictionary maintains messages, data, and metadata fields that every analyst reads and writes during execution. By passing the same mutable state object through the graph, LangGraph enables seamless data sharing without explicit message passing between agents.
Analyst Configuration and Node Generation
The system decouples agent definitions from graph construction through ANALYST_CONFIG in src/utils/analysts.py (lines 23-70). This configuration dictionary maps analyst keys—such as "warren_buffett" or "ben_graham"—to their display names, investment philosophies, and Python implementations. The helper function get_analyst_nodes() (lines 75-78) transforms this configuration into a mapping of {key: (node_name, agent_func)} tuples, allowing create_workflow() in src/main.py to programmatically instantiate nodes.
Wiring the Execution Flow
From Start Node to Analyst Nodes
The workflow begins with a start_node that simply returns the incoming state unchanged. In src/main.py (lines 100-130), create_workflow() iterates through selected_analysts and adds each as a distinct node using workflow.add_node(node_name, node_func). Each analyst node connects via workflow.add_edge("start_node", node_name), enabling parallel execution of multiple investment strategies on the same market data.
Coordination Through Risk Management and Portfolio Manager
After analysts complete their signal generation, execution flows through fixed coordination nodes. The graph wires every analyst node to risk_management_agent (defined in src/agents/risk_manager.py), which aggregates and filters signals. The workflow then connects risk_management_agent to portfolio_manager (from src/agents/portfolio_manager.py) via workflow.add_edge("risk_management_agent", "portfolio_manager"), culminating in workflow.add_edge("portfolio_manager", END).
Code Walkthrough: Building the Workflow
The following implementation demonstrates how LangGraph dynamically assembles the analyst pipeline:
from langgraph.graph import END, StateGraph
from src.graph.state import AgentState
from src.utils.analysts import get_analyst_nodes
from src.agents.risk_manager import risk_management_agent
from src.agents.portfolio_manager import portfolio_management_agent
def start(state: AgentState) -> AgentState:
"""Entry point that passes state to all analyst nodes."""
return state
# Select analysts dynamically at runtime
selected = ["warren_buffett", "ben_graham"]
nodes = get_analyst_nodes()
# Initialize the StateGraph with our shared state schema
workflow = StateGraph(AgentState)
workflow.add_node("start_node", start)
# Dynamically add analyst nodes based on configuration
for key in selected:
node_name, func = nodes[key]
workflow.add_node(node_name, func)
workflow.add_edge("start_node", node_name)
# Add coordination agents
workflow.add_node("risk_management_agent", risk_management_agent)
workflow.add_node("portfolio_manager", portfolio_management_agent)
# Wire the flow: analysts → risk manager → portfolio manager → END
for key in selected:
workflow.add_edge(nodes[key][0], "risk_management_agent")
workflow.add_edge("risk_management_agent", "portfolio_manager")
workflow.add_edge("portfolio_manager", END)
workflow.set_entry_point("start_node")
graph = workflow.compile()
# Execute with initial market data
initial_state = {
"messages": [],
"data": {"tickers": ["AAPL"], "portfolio": {}, "start_date": "2024-01-01"},
"metadata": {"show_reasoning": True},
}
final_state = graph.invoke(initial_state)
Key Implementation Files
Understanding how LangGraph orchestrates analyst agents requires examining these specific source files:
src/graph/state.py: DefinesAgentStateand reasoning visualization helpers.src/utils/analysts.py: ContainsANALYST_CONFIGandget_analyst_nodes()for dynamic node generation.src/main.py: Housescreate_workflow()with complete LangGraph wiring logic.src/agents/risk_manager.py: Implements the risk-management coordination node.src/agents/portfolio_manager.py: Implements the final portfolio decision node.app/backend/services/agent_service.py: Providescreate_agent_functionto wrap Python callables into LangGraph-compatible nodes.
Summary
- Dynamic Graph Construction: The workflow builds itself at runtime using
get_analyst_nodes()andANALYST_CONFIG, allowing flexible analyst selection without code changes. - Shared State Architecture:
AgentStatepropagates through all nodes, enabling analysts to read market data and append signals to a unified message history. - Hierarchical Coordination: Analyst nodes feed into
risk_management_agentfor signal filtering, thenportfolio_managerfor final trade decisions, creating a deterministic execution path toEND. - LangGraph Integration: The system leverages
StateGraph.add_node(),add_edge(), andcompile()to transform configuration into executable agent orchestration.
Frequently Asked Questions
How does LangGraph handle parallel execution of analyst agents?
LangGraph executes analyst nodes in parallel when they share the same parent edge from start_node. In src/main.py, each selected analyst connects individually to start_node via workflow.add_edge("start_node", node_name), allowing simultaneous signal generation on the shared AgentState before converging at the risk-management node.
What determines which analysts are included in the workflow?
The selected_analysts list passed to create_workflow() in src/main.py filters the available analysts defined in ANALYST_CONFIG (src/utils/analysts.py). Only analysts whose keys exist in both the configuration and the selection list receive nodes in the compiled graph.
How is the AgentState structured to support multiple analyst outputs?
AgentState in src/graph/state.py implements a typed dictionary with a messages field that accumulates output from every node. Analyst agents append their JSON-encoded signals to this list, while the data field maintains immutable market information, and metadata controls execution flags like show_reasoning.
Can custom analysts be added without modifying the graph construction code?
Yes. Adding an analyst requires only updating ANALYST_CONFIG in src/utils/analysts.py with a unique key, display metadata, and agent function. The get_analyst_nodes() helper automatically picks up new entries, and create_workflow() dynamically wires them into the graph without explicit branching logic for each analyst.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →