# How RAGFlow's Agentic Workflow Leverages the `agent/canvas.py` Module

> Discover how RAGFlow's agentic workflow utilizes agent canvas.py to parse DSL JSON, manage state, execute components, and stream events for an efficient user experience.

- Repository: [InfiniFlow/ragflow](https://github.com/infiniflow/ragflow)
- Tags: internals
- Published: 2026-02-23

---

**RAGFlow's agentic workflow treats every agent as a directed graph canvas, using the [`agent/canvas.py`](https://github.com/infiniflow/ragflow/blob/main/agent/canvas.py) module to parse DSL JSON, manage runtime state, execute components asynchronously, and stream events back to the frontend.**

RAGFlow implements agentic workflows as executable directed graphs where components like **Begin**, **Retrieval**, **Generate**, and **Message** nodes process data in a defined sequence. The [`agent/canvas.py`](https://github.com/infiniflow/ragflow/blob/main/agent/canvas.py) module serves as the core execution engine that transforms declarative JSON DSL into running Python objects, handling everything from variable resolution to citation tracking. This article explores how the **Canvas** class orchestrates state management, async execution, and event streaming according to the infiniflow/ragflow source code.

## Canvas as Graph Plus Runtime State

The architecture separates static graph structure from dynamic execution state. The `Graph` class (lines 40-120) handles DSL parsing and component instantiation, while `Canvas` subclasses it to inject runtime context.

### The Graph Foundation

In [`agent/canvas.py`](https://github.com/infiniflow/ragflow/blob/main/agent/canvas.py), the `Graph` class parses the DSL JSON supplied by the frontend and creates concrete component objects using `component_class` factory methods. It stores these in `self.components` and maintains the directed graph structure through `upstream` and `downstream` relationships defined in the JSON. This layer is responsible for topological ordering and basic graph validation.

### Canvas State Injection

`Canvas` extends `Graph` by injecting **runtime globals**, **variables**, **history**, **retrieval**, and **memory** structures that each step can read or write during execution (constructor lines 81-94). These structures enable components to share state across the workflow:

- `self.variables` – Stores intermediate outputs accessible via `${cpn_id@output_key}` syntax
- `self.history` – Tracks conversation turns for multi-turn agents
- `self.retrieval` – Holds retrieved chunks and document aggregates for RAG pipelines
- `self.memory` – Persists long-term context across sessions

## Loading and Initializing DSL Workflows

Before execution, the canvas must hydrate its state from persisted data.

### Canvas.load() DSL Parsing

The `Canvas.load()` method (lines 95-119) first calls `super().load()` to initialize the graph structure, then enriches the canvas with persisted `history`, `globals`, `variables`, `retrieval`, and optional `memory` from previous runs. This enables workflow resumption and multi-turn conversations where context must survive between invocations.

## The Async Execution Engine

The core of RAGFlow's agentic execution lives in the asynchronous `run()` method (lines 69-463), which implements a sophisticated event-driven execution loop.

### Workflow Preparation and Cancellation Checks

Before processing begins, `run()` performs several setup steps (lines 70-100):

1. Records start time and generates a unique `message_id`
2. Injects the user query into `self.variables["sys.query"]` and initializes the path with `self.path.append("begin")`
3. Loads webhook payloads if present in the DSL
4. Checks `self.is_canceled()` (lines 14-15), which queries Redis for a cancellation flag and raises `TaskCanceledException` if the user requested termination

### Event Emission Architecture

The workflow yields structured events consumed by the frontend UI. Using a `decorate` helper (lines 99-108), `run()` emits:

- `workflow_started` – Signals execution begin
- `node_started` – Before a component executes
- `message` – Streaming output chunks
- `node_finished` – After component completion
- `workflow_finished` – Final completion with outputs and elapsed time

### Batch Component Execution

The `_run_batch` method (lines 22-70) walks the current slice of the execution path and invokes each component. It supports both synchronous and asynchronous component methods, executing them in a thread-pool limited by `max_concurrency` to prevent resource exhaustion. This batching allows parallel execution of independent branches in the graph.

### Node Post-Processing and Path Navigation

After each batch completes (lines 100-226), `run()` examines outputs to determine the next execution phase:

- **Message components** – Handles streaming text, TTS (text-to-speech) conversion, and citation extraction
- **Error propagation** – Routes exceptions through `exception_handler()` for graceful degradation
- **Path decisions** – Evaluates component types (iteration, switch, loops) to determine the next nodes to append to `self.path`

### Handling User Input Pauses

When the path contains a `UserFillup` component (lines 268-311), the engine detects that future inputs are required. It yields a `user_inputs` event containing the required field specifications and returns early, allowing the frontend to prompt the user. The same `Canvas` instance can later resume execution via `run(query="...", inputs={...})` with the missing data supplied.

### Workflow Completion

When the path is exhausted or cancelled (lines 442-466), `run()` emits a `workflow_finished` event containing final outputs, elapsed time metrics, and persists the updated history. This ensures conversation context survives for subsequent turns.

## State Management Helpers

[`canvas.py`](https://github.com/infiniflow/ragflow/blob/main/canvas.py) provides utilities for components to interact with shared state without manual dictionary manipulation.

### Variable Resolution

Methods like `get_variable_value` and `set_variable_value` (lines 91-140) handle the `${cpn_id@output_key}` and `${sys.xxx}` template syntax used in prompts. Components reference outputs from previous steps, and the canvas resolves these to actual values at runtime.

### Reference and Citation Tracking

The `add_reference` and `get_reference` methods (lines 403-424) manage retrieved chunks and document aggregates. When the **Retrieval** component fetches documents, it stores them via `add_reference()`. Later, **Message** components automatically attach reference data to `message_end` events when they detect citation patterns like `[ID: ...]` in the output (lines 150-166).

### Memory and Conversation History

Multi-turn conversations rely on `add_memory`, `get_memory`, and `add_user_input` (lines 323-336). These methods enable components to read previous user queries and assistant responses, maintaining context across workflow invocations.

## Integration with External Systems

The canvas module interfaces with several external services to provide a complete workflow platform.

### Component Definitions and Factory

Component classes defined in `agent/component/*.py` must implement `invoke`, `invoke_async`, `get_input`, `output`, and `thoughts` methods. The factory in [`agent/component/__init__.py`](https://github.com/infiniflow/ragflow/blob/main/agent/component/__init__.py) maps DSL component names (like "Retrieval" or "Generate") to these concrete classes during `Graph.load()`.

### Redis-Based Task Cancellation

Task cancellation uses Redis via `REDIS_CONN` (imported at line 33). The `has_canceled` service checks for a Redis key formatted as `{task_id}-cancel`. When present, `Canvas.run()` raises `TaskCanceledException` at the next cancellation checkpoint, ensuring workflows terminate cleanly even during long-running retrieval or generation operations.

### Async File Handling

The `get_files_async` and `get_files` methods (lines 511-577) fetch uploaded files from storage, base64-encode them, and provide them to components that require document processing. These methods integrate with [`file_service.py`](https://github.com/infiniflow/ragflow/blob/main/file_service.py) to handle blob storage and parsing.

## Practical Implementation Examples

### Basic Async Canvas Execution

```python
import json
import asyncio
from agent.canvas import Canvas

# Minimal DSL: Begin component forwarding to Message

dsl = {
    "components": {
        "begin": {
            "obj": {"component_name": "Begin", "params": {}},
            "downstream": ["msg"],
            "upstream": []
        },
        "msg": {
            "obj": {"component_name": "Message", "params": {"content": "${sys.query}"}},
            "downstream": [],
            "upstream": ["begin"]
        }
    },
    "history": [],
    "path": ["begin"],
    "globals": {"sys.query": ""},
    "retrieval": {"chunks": [], "doc_aggs": []}
}

async def run_demo():
    canvas = Canvas(json.dumps(dsl), tenant_id="demo_tenant")
    async for event in canvas.run(query="Hello RAGFlow!"):
        print(event)  # node_started, message, workflow_finished, etc.

asyncio.run(run_demo())

```

### Accessing Retrieval Results

```python

# Inside a component:

chunks = self.canvas.retrieval[-1]["chunks"]
docs = self.canvas.retrieval[-1]["doc_aggs"]

# After workflow completion:

reference = canvas.get_reference()  # {"chunks": {...}, "doc_aggs": {...}}

```

### Cancelling a Running Workflow

```python
from rag.utils.redis_conn import REDIS_CONN

# Set cancellation flag from external process

REDIS_CONN.set(f"{task_id}-cancel", "x")

# Canvas.run() will raise TaskCanceledException at next checkpoint

```

### Resuming After User Input

```python

# First run yields user_inputs event and pauses

async for event in canvas.run(query="Book a flight"):
    if event["type"] == "user_inputs":
        print("Need more info:", event["data"])

# Resume with filled values

async for event in canvas.run(query="Book a flight", inputs={"date": "2024-12-25"}):
    process_event(event)

```

## Summary

- **[`agent/canvas.py`](https://github.com/infiniflow/ragflow/blob/main/agent/canvas.py)** implements the core execution engine that transforms JSON DSL into running agentic workflows through the `Canvas` and `Graph` classes.
- **State management** combines static graph structure with dynamic runtime variables, history, retrieval results, and memory to enable complex multi-turn interactions.
- **Async execution** uses `_run_batch` with concurrency limits and supports both sync and async component methods, yielding events for real-time frontend updates.
- **Cancellation support** integrates with Redis to enable graceful termination of long-running workflows via `TaskCanceledException`.
- **Citation tracking** automatically aggregates retrieval results and attaches them to message events when patterns like `[ID: ...]` appear in generated text.

## Frequently Asked Questions

### How does Canvas handle async component execution?

The `_run_batch` method (lines 22-70) inspects each component for `invoke_async` methods and executes them using `asyncio` thread pools when `max_concurrency` limits permit. Synchronous components run in executor threads to avoid blocking the event loop, while the main `run()` coroutine manages the execution path and event emission.

### What happens when a workflow needs to pause for user input?

When the execution path contains a `UserFillup` component downstream, `Canvas.run()` detects this requirement (lines 268-311), yields a `user_inputs` event specifying the required fields, and returns early. The frontend collects the missing data, and the workflow resumes when `run()` is called again with the `inputs` parameter populated.

### How does RAGFlow enable workflow cancellation?

Cancellation checks occur at the start of `run()` and within `_run_batch` via `self.is_canceled()`, which queries Redis for a `{task_id}-cancel` key (lines 14-15). If present, the method raises `TaskCanceledException`, causing the workflow to emit a cancellation event and terminate cleanly without corrupting state.

### Where does Canvas store retrieval results for citations?

Retrieval components store chunks and document aggregates in `self.retrieval` using `add_reference()` (lines 403-424). The `Message` component later extracts citation markers from generated text and automatically attaches the reference data to `message_end` events, enabling the frontend to display source documents alongside generated answers.