Understanding the Plan-Execute-Report Pattern in FusionGraphRAGAgent
The Plan-Execute-Report pattern in FusionGraphRAGAgent is a three-stage orchestration workflow where a Planner decomposes queries into structured tasks, a WorkerCoordinator executes them with specialized agents, and a Reporter synthesizes evidence into a final markdown report.
FusionGraphRAGAgent implements the Plan-Execute-Report (PER) pattern to transform free-form user queries into comprehensive, evidence-based reports through systematic task decomposition and execution. This architecture separates concerns into distinct planning, execution, and reporting phases coordinated by the MultiAgentOrchestrator. According to the 1517005260/graph-rag-agent source code, this pattern enables complex multi-step reasoning while maintaining full traceability through structured state management.
Overview of the PER Cycle
The Plan-Execute-Report pattern follows a linear pipeline that converts natural language input into structured intelligence:
User Query → Planner → WorkerCoordinator → Reporter → Final Report
- Planner: Transforms ambiguous queries into a structured
PlanSpeccontaining atomic tasks, dependencies, and execution signals. - WorkerCoordinator: Schedules tasks, resolves dependencies, dispatches specialized executors (retrieval, research, reflection), and collects
ExecutionRecordartifacts. - Reporter: Consumes execution evidence to generate a long-form, structured report with citations and consistency checks.
The MultiAgentOrchestrator class in graphrag_agent/agents/multi_agent/orchestrator.py implements the complete PER lifecycle, returning an OrchestratorResult containing status, metrics, and all intermediate artifacts.
Core Components of the Pattern
Plan Stage
The planning phase is handled by BasePlanner and its concrete implementations in graphrag_agent/agents/multi_agent/planner/. This stage involves three critical sub-components:
- Clarifier (
clarifier.py): Resolves ambiguity in user queries before decomposition. - TaskDecomposer (
task_decomposer.py): Breaks complex problems into atomic, executable tasks with dependency graphs. - PlanReviewer (
plan_reviewer.py): Validates and optimizes the generated plan for efficiency and completeness.
The planner outputs a PlannerResult containing the PlanSpec and a PlanExecutionSignal that determines whether the pipeline proceeds to execution or requests clarification.
Execute Stage
Execution is managed by WorkerCoordinator in graphrag_agent/agents/multi_agent/executor/worker_coordinator.py. This component:
- Resolves task dependencies (sequential, parallel, or adaptive scheduling).
- Dispatches tasks to specialized executors including
retrieval_executor.py,research_executor.py, andreflector.py. - Collates
ExecutionRecordobjects containing tool calls, retrieved evidence, reflection metadata, and execution status.
The coordinator handles retries, reflection loops, and parallelization based on configuration.
Report Stage
The reporting phase utilizes BaseReporter and its implementations in graphrag_agent/agents/multi_agent/reporter/. Key sub-modules include:
- OutlineBuilder (
outline_builder.py): Constructs a report outline from the plan structure and collected evidence. - SectionWriter (
section_writer.py): Generates content for each outline section using evidence mapping. - MapReduce Pipeline (
mapreduce/): Handles long-document generation through distributed section writing and reduction.
The reporter produces a ReportResult containing the outline, section contents, final markdown, references, and optional consistency-check results.
Integration Facade
MultiAgentFacade in graphrag_agent/agents/multi_agent/integration/legacy_facade.py serves as the compatibility layer that exposes the entire PER pipeline as a single process_query method. This facade is what FusionGraphRAGAgent invokes to run the pattern seamlessly.
Data Flow and State Management
State persistence across the PER pipeline is managed through PlanExecuteState defined in graphrag_agent/agents/multi_agent/core/state.py. This dataclass holds:
- User query and conversation messages.
- Placeholders for plan, execution, and report contexts.
- Session identifiers for tracking multi-turn interactions.
The data flow progresses through strongly-typed result objects:
- PlannerResult: Contains the generated
PlanSpecand execution signal. - ExecutionRecord: Captures output from each task executor, including evidence and metadata.
- ReportResult: Encapsulates the final report structure and content.
- OrchestratorResult: Aggregates all stages, exposing status (
completed,needs_clarification,failed,partial), error collections, and timing metrics viaOrchestratorMetrics.
Configuration and Error Handling
All PER-related configuration uses the MULTI_AGENT_* prefix, configurable via environment variables or settings.py:
MULTI_AGENT_AUTO_GENERATE_REPORT: Automatically triggers the Reporter after execution completion.MULTI_AGENT_STOP_ON_CLARIFICATION: Halts the pipeline when the planner requests user clarification.MULTI_AGENT_WORKER_EXECUTION_MODE: Sets task scheduling tosequentialorparallel.MULTI_AGENT_WORKER_MAX_CONCURRENCY: Controls parallel worker limits.
Error handling is centralized in OrchestratorResult, which aggregates exceptions across all three stages and provides granular status reporting for partial failures.
Usage Examples
Basic Usage via FusionGraphRAGAgent
The simplest way to invoke the Plan-Execute-Report pattern is through the high-level agent interface:
from graphrag_agent.agents.fusion_agent import FusionGraphRAGAgent
# The agent internally creates a MultiAgentFacade → PER pipeline
agent = FusionGraphRAGAgent()
# Run a complex query
result = agent.ask("请分析中国能源消费的历史趋势并给出2025年的预测")
# Result is a dict with all PER artefacts
print(result["status"]) # e.g. "completed"
print(result["response"]) # final markdown report
print(result["planner"]["plan_spec"]) # structured task plan
print(result["execution_records"]) # list of ExecutionRecord dicts
print(result["report"]["outline"]) # report outline
print(result["metrics"]) # timing breakdown
Relevant files: agents/fusion_agent.py, agents/multi_agent/integration/legacy_facade.py.
Direct PER Stack Invocation
For advanced use cases, invoke the orchestrator components directly:
from graphrag_agent.agents.multi_agent.integration import MultiAgentFactory, MultiAgentFacade
from graphrag_agent.cache_manager import CacheManager
from graphrag_agent.agents.multi_agent.core.state import PlanExecuteState
# 1️⃣ Prepare cache & component bundle
cache = CacheManager()
bundle = MultiAgentFactory.create_default_bundle(cache_manager=cache)
# 2️⃣ Build the façade
facade = MultiAgentFacade(bundle=bundle, cache_manager=cache)
# 3️⃣ Initialise state (session & user message)
state = PlanExecuteState(
session_id="sess-001",
messages=[],
input="请比较2022‑2024年全球AI专利数量,并给出增长原因"
)
# 4️⃣ Run the PER pipeline
result = facade.process_query(
query=state.input,
assumptions=["仅考虑公开专利"], # optional
report_type="long_document"
)
# 5️⃣ Inspect each stage
plan = result["planner"]["plan_spec"]
print("Tasks generated:", len(plan["task_graph"]["nodes"]))
exec_records = result["execution_records"]
print("Evidence collected:", sum(len(r["evidence"]) for r in exec_records))
print("Report title:", result["report"]["outline"]["title"])
Relevant files: agents/multi_agent/integration/multi_agent_factory.py, agents/multi_agent/orchestrator.py, agents/multi_agent/core/state.py.
Configuring Execution Mode
Control parallelization behavior through environment variables:
import os
from graphrag_agent.agents.fusion_agent import FusionGraphRAGAgent
# Force parallel execution with up to 4 workers
os.environ["MULTI_AGENT_WORKER_EXECUTION_MODE"] = "parallel"
os.environ["MULTI_AGENT_WORKER_MAX_CONCURRENCY"] = "4"
agent = FusionGraphRAGAgent()
result = agent.ask("分析过去十年金融科技领域的专利趋势")
print("Execution mode used:", result["metrics"]["execution_seconds"])
Configuration source: agents/multi_agent/readme.md.
Summary
- Plan-Execute-Report is a three-stage architecture separating query planning, task execution, and report generation into discrete, orchestrated phases.
- MultiAgentOrchestrator in
graphrag_agent/agents/multi_agent/orchestrator.pyimplements the complete lifecycle, returning structuredOrchestratorResultobjects with full provenance. - Planner components (Clarifier, TaskDecomposer, PlanReviewer) in
agents/multi_agent/planner/decompose queries into executablePlanSpecobjects. - WorkerCoordinator in
agents/multi_agent/executor/worker_coordinator.pyhandles dependency resolution and dispatches tasks to specialized executors likeretrieval_executor.pyandreflector.py. - State management uses
PlanExecuteStatefromagents/multi_agent/core/state.pyto maintain context across the pipeline. - Configuration is controlled via
MULTI_AGENT_*environment variables, including execution mode and auto-reporting flags.
Frequently Asked Questions
What is the role of MultiAgentOrchestrator in the Plan-Execute-Report pattern?
MultiAgentOrchestrator serves as the central conductor that implements the complete Plan-Execute-Report lifecycle. It coordinates handoffs between the Planner, WorkerCoordinator, and Reporter while maintaining state in PlanExecuteState and aggregating results into OrchestratorResult. This class handles error propagation, metrics collection, and conditional execution logic based on planner signals.
How does FusionGraphRAGAgent handle task dependencies during execution?
The WorkerCoordinator in graphrag_agent/agents/multi_agent/executor/worker_coordinator.py resolves dependencies defined in the PlanSpec task graph. It supports sequential execution for dependent tasks and parallel execution for independent branches, configurable via MULTI_AGENT_WORKER_EXECUTION_MODE. The coordinator ensures that tasks requiring specific evidence or reflection outputs wait for their prerequisites to complete.
What data structure maintains state across the PER pipeline?
PlanExecuteState defined in graphrag_agent/agents/multi_agent/core/state.py is the central state container. It persists the user query, conversation history, generated plan specifications, execution records, and report context throughout the pipeline. This state object is passed between stages, allowing each component to access previous outputs while maintaining session isolation.
How can I configure parallel execution in the Plan-Execute-Report workflow?
Set the environment variable MULTI_AGENT_WORKER_EXECUTION_MODE to "parallel" and specify concurrency limits via MULTI_AGENT_WORKER_MAX_CONCURRENCY. This configuration directs the WorkerCoordinator to dispatch independent tasks simultaneously using the executor pool, reducing total execution time for plans with parallelizable branches such as multi-source retrieval or independent research tasks.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →