How RAGFlow's Agentic Workflow Leverages the `agent/canvas.py` Module

RAGFlow's agentic workflow treats every agent as a directed graph canvas, using the agent/canvas.py module to parse DSL JSON, manage runtime state, execute components asynchronously, and stream events back to the frontend.

RAGFlow implements agentic workflows as executable directed graphs where components like Begin, Retrieval, Generate, and Message nodes process data in a defined sequence. The agent/canvas.py module serves as the core execution engine that transforms declarative JSON DSL into running Python objects, handling everything from variable resolution to citation tracking. This article explores how the Canvas class orchestrates state management, async execution, and event streaming according to the infiniflow/ragflow source code.

Canvas as Graph Plus Runtime State

The architecture separates static graph structure from dynamic execution state. The Graph class (lines 40-120) handles DSL parsing and component instantiation, while Canvas subclasses it to inject runtime context.

The Graph Foundation

In agent/canvas.py, the Graph class parses the DSL JSON supplied by the frontend and creates concrete component objects using component_class factory methods. It stores these in self.components and maintains the directed graph structure through upstream and downstream relationships defined in the JSON. This layer is responsible for topological ordering and basic graph validation.

Canvas State Injection

Canvas extends Graph by injecting runtime globals, variables, history, retrieval, and memory structures that each step can read or write during execution (constructor lines 81-94). These structures enable components to share state across the workflow:

  • self.variables – Stores intermediate outputs accessible via ${cpn_id@output_key} syntax
  • self.history – Tracks conversation turns for multi-turn agents
  • self.retrieval – Holds retrieved chunks and document aggregates for RAG pipelines
  • self.memory – Persists long-term context across sessions

Loading and Initializing DSL Workflows

Before execution, the canvas must hydrate its state from persisted data.

Canvas.load() DSL Parsing

The Canvas.load() method (lines 95-119) first calls super().load() to initialize the graph structure, then enriches the canvas with persisted history, globals, variables, retrieval, and optional memory from previous runs. This enables workflow resumption and multi-turn conversations where context must survive between invocations.

The Async Execution Engine

The core of RAGFlow's agentic execution lives in the asynchronous run() method (lines 69-463), which implements a sophisticated event-driven execution loop.

Workflow Preparation and Cancellation Checks

Before processing begins, run() performs several setup steps (lines 70-100):

  1. Records start time and generates a unique message_id
  2. Injects the user query into self.variables["sys.query"] and initializes the path with self.path.append("begin")
  3. Loads webhook payloads if present in the DSL
  4. Checks self.is_canceled() (lines 14-15), which queries Redis for a cancellation flag and raises TaskCanceledException if the user requested termination

Event Emission Architecture

The workflow yields structured events consumed by the frontend UI. Using a decorate helper (lines 99-108), run() emits:

  • workflow_started – Signals execution begin
  • node_started – Before a component executes
  • message – Streaming output chunks
  • node_finished – After component completion
  • workflow_finished – Final completion with outputs and elapsed time

Batch Component Execution

The _run_batch method (lines 22-70) walks the current slice of the execution path and invokes each component. It supports both synchronous and asynchronous component methods, executing them in a thread-pool limited by max_concurrency to prevent resource exhaustion. This batching allows parallel execution of independent branches in the graph.

Node Post-Processing and Path Navigation

After each batch completes (lines 100-226), run() examines outputs to determine the next execution phase:

  • Message components – Handles streaming text, TTS (text-to-speech) conversion, and citation extraction
  • Error propagation – Routes exceptions through exception_handler() for graceful degradation
  • Path decisions – Evaluates component types (iteration, switch, loops) to determine the next nodes to append to self.path

Handling User Input Pauses

When the path contains a UserFillup component (lines 268-311), the engine detects that future inputs are required. It yields a user_inputs event containing the required field specifications and returns early, allowing the frontend to prompt the user. The same Canvas instance can later resume execution via run(query="...", inputs={...}) with the missing data supplied.

Workflow Completion

When the path is exhausted or cancelled (lines 442-466), run() emits a workflow_finished event containing final outputs, elapsed time metrics, and persists the updated history. This ensures conversation context survives for subsequent turns.

State Management Helpers

canvas.py provides utilities for components to interact with shared state without manual dictionary manipulation.

Variable Resolution

Methods like get_variable_value and set_variable_value (lines 91-140) handle the ${cpn_id@output_key} and ${sys.xxx} template syntax used in prompts. Components reference outputs from previous steps, and the canvas resolves these to actual values at runtime.

Reference and Citation Tracking

The add_reference and get_reference methods (lines 403-424) manage retrieved chunks and document aggregates. When the Retrieval component fetches documents, it stores them via add_reference(). Later, Message components automatically attach reference data to message_end events when they detect citation patterns like [ID: ...] in the output (lines 150-166).

Memory and Conversation History

Multi-turn conversations rely on add_memory, get_memory, and add_user_input (lines 323-336). These methods enable components to read previous user queries and assistant responses, maintaining context across workflow invocations.

Integration with External Systems

The canvas module interfaces with several external services to provide a complete workflow platform.

Component Definitions and Factory

Component classes defined in agent/component/*.py must implement invoke, invoke_async, get_input, output, and thoughts methods. The factory in agent/component/__init__.py maps DSL component names (like "Retrieval" or "Generate") to these concrete classes during Graph.load().

Redis-Based Task Cancellation

Task cancellation uses Redis via REDIS_CONN (imported at line 33). The has_canceled service checks for a Redis key formatted as {task_id}-cancel. When present, Canvas.run() raises TaskCanceledException at the next cancellation checkpoint, ensuring workflows terminate cleanly even during long-running retrieval or generation operations.

Async File Handling

The get_files_async and get_files methods (lines 511-577) fetch uploaded files from storage, base64-encode them, and provide them to components that require document processing. These methods integrate with file_service.py to handle blob storage and parsing.

Practical Implementation Examples

Basic Async Canvas Execution

import json
import asyncio
from agent.canvas import Canvas

# Minimal DSL: Begin component forwarding to Message

dsl = {
    "components": {
        "begin": {
            "obj": {"component_name": "Begin", "params": {}},
            "downstream": ["msg"],
            "upstream": []
        },
        "msg": {
            "obj": {"component_name": "Message", "params": {"content": "${sys.query}"}},
            "downstream": [],
            "upstream": ["begin"]
        }
    },
    "history": [],
    "path": ["begin"],
    "globals": {"sys.query": ""},
    "retrieval": {"chunks": [], "doc_aggs": []}
}

async def run_demo():
    canvas = Canvas(json.dumps(dsl), tenant_id="demo_tenant")
    async for event in canvas.run(query="Hello RAGFlow!"):
        print(event)  # node_started, message, workflow_finished, etc.

asyncio.run(run_demo())

Accessing Retrieval Results


# Inside a component:

chunks = self.canvas.retrieval[-1]["chunks"]
docs = self.canvas.retrieval[-1]["doc_aggs"]

# After workflow completion:

reference = canvas.get_reference()  # {"chunks": {...}, "doc_aggs": {...}}

Cancelling a Running Workflow

from rag.utils.redis_conn import REDIS_CONN

# Set cancellation flag from external process

REDIS_CONN.set(f"{task_id}-cancel", "x")

# Canvas.run() will raise TaskCanceledException at next checkpoint

Resuming After User Input


# First run yields user_inputs event and pauses

async for event in canvas.run(query="Book a flight"):
    if event["type"] == "user_inputs":
        print("Need more info:", event["data"])

# Resume with filled values

async for event in canvas.run(query="Book a flight", inputs={"date": "2024-12-25"}):
    process_event(event)

Summary

  • agent/canvas.py implements the core execution engine that transforms JSON DSL into running agentic workflows through the Canvas and Graph classes.
  • State management combines static graph structure with dynamic runtime variables, history, retrieval results, and memory to enable complex multi-turn interactions.
  • Async execution uses _run_batch with concurrency limits and supports both sync and async component methods, yielding events for real-time frontend updates.
  • Cancellation support integrates with Redis to enable graceful termination of long-running workflows via TaskCanceledException.
  • Citation tracking automatically aggregates retrieval results and attaches them to message events when patterns like [ID: ...] appear in generated text.

Frequently Asked Questions

How does Canvas handle async component execution?

The _run_batch method (lines 22-70) inspects each component for invoke_async methods and executes them using asyncio thread pools when max_concurrency limits permit. Synchronous components run in executor threads to avoid blocking the event loop, while the main run() coroutine manages the execution path and event emission.

What happens when a workflow needs to pause for user input?

When the execution path contains a UserFillup component downstream, Canvas.run() detects this requirement (lines 268-311), yields a user_inputs event specifying the required fields, and returns early. The frontend collects the missing data, and the workflow resumes when run() is called again with the inputs parameter populated.

How does RAGFlow enable workflow cancellation?

Cancellation checks occur at the start of run() and within _run_batch via self.is_canceled(), which queries Redis for a {task_id}-cancel key (lines 14-15). If present, the method raises TaskCanceledException, causing the workflow to emit a cancellation event and terminate cleanly without corrupting state.

Where does Canvas store retrieval results for citations?

Retrieval components store chunks and document aggregates in self.retrieval using add_reference() (lines 403-424). The Message component later extracts citation markers from generated text and automatically attaches the reference data to message_end events, enabling the frontend to display source documents alongside generated answers.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →