# How the RAG Pipeline Is Orchestrated in RAGFlow's `rag/flow/` Directory

> Discover how the RAG pipeline is orchestrated in RAGFlow's rag/flow/ directory. Explore the Pipeline class executing a DAG asynchronously from retrieval to merging.

- Repository: [InfiniFlow/ragflow](https://github.com/infiniflow/ragflow)
- Tags: internals
- Published: 2026-02-23

---

**The RAG pipeline in RAGFlow is orchestrated by the `Pipeline` class in [`rag/flow/pipeline.py`](https://github.com/infiniflow/ragflow/blob/main/rag/flow/pipeline.py), which executes a directed acyclic graph of components defined by a JSON DSL, processing documents asynchronously from file retrieval through parsing, extraction, and hierarchical merging.**

The `infiniflow/ragflow` repository implements a modular Retrieval-Augmented Generation (RAG) engine where the `rag/flow/` directory houses the core orchestration logic. This article examines how **RAG pipeline orchestration** works, tracing the execution flow from DSL definition to final output through the graph-based engine.

## The `Pipeline` Class: Central Orchestrator of the RAG Flow

### Inheriting from the Graph Engine

The orchestration begins with the **`Pipeline`** class defined in [`rag/flow/pipeline.py`](https://github.com/infiniflow/ragflow/blob/main/rag/flow/pipeline.py). This class inherits from `agent.canvas.Graph`, which provides the underlying graph traversal capabilities. When instantiated, the `Pipeline` receives a **DSL** (Domain Specific Language) structure—either as JSON or a Python dict—that describes the directed acyclic graph of processing components.

### DSL Resolution and Context Binding

The constructor resolves critical context parameters before execution begins:

- **`doc_id`** and **`kb_id`** identify the target document and knowledge base
- **`tenant_id`**, **`task_id`**, and **`flow_id`** provide multi-tenancy and task tracking capabilities

These identifiers enable downstream components to fetch raw file bytes from the storage layer and report progress correctly.

```python
from rag.flow.pipeline import Pipeline

# Initialize with DSL and context

pipeline = Pipeline(
    dsl, 
    tenant_id=tid, 
    doc_id=did, 
    task_id=task_id, 
    flow_id=fid
)

```

## Step-by-Step RAG Pipeline Execution Flow

### 1. Graph Initialization and Component Mapping

When `super().__init__(dsl, tenant_id, task_id)` executes, the graph engine parses the DSL and builds `self.components`—a mapping of component IDs to instantiated component objects. It also records downstream connections via `get_downstream` to determine execution order.

### 2. Implicit File Component Injection

If the DSL does not explicitly define a starting node, the pipeline automatically prepends a **`File`** component. This ensures every pipeline begins with document retrieval:

```python

# From pipeline.py lines 24-27

if not self.path:
    self.path.append("File")

```

### 3. Asynchronous Component Invocation Loop

The `run` coroutine (lines 44-64 in [`pipeline.py`](https://github.com/infiniflow/ragflow/blob/main/pipeline.py)) implements the core execution loop:

1. Iterates through `self.path` in topological order
2. Invokes each component asynchronously: `await cpn_obj.invoke(**last_cpn.output())`
3. Passes the output dictionary from the previous component as input to the next

This design allows components to communicate via standardized dictionaries containing keys like `text`, `json`, `markdown`, or `html`.

### 4. Progress Callbacks and Redis Logging

Every component reports progress through the **`callback`** method (lines 43-99). This mechanism:

- Stores per-component logs in **Redis** using the key format `<flow_id>-<task_id>-logs`
- Updates the task progress in the database via `TaskService.update_progress`
- Enables real-time UI updates showing which component is currently processing

```python

# Callback structure stored in Redis

{
    "progress": 0.5,
    "message": "Parsing PDF pages...",
    "datetime": "2024-01-15T10:30:00",
    "timestamp": 1705315800
}

```

### 5. Error Handling and Pipeline Abort

If any component sets an error flag (`_ERROR`), the pipeline immediately aborts:

- Progress is set to `-1` to indicate failure
- A `TaskCanceledException` is raised
- Partial logs remain available in Redis for debugging

Successful completion logs a final `"END"` entry and returns the terminal component's output dictionary.

## The `ProcessBase` Abstraction: Standardizing RAG Components

All processing blocks inherit from **`ProcessBase`** defined in [`rag/flow/base.py`](https://github.com/infiniflow/ragflow/blob/main/rag/flow/base.py). This abstraction provides:

**Callback Wiring**: The constructor attaches the canvas callback using `partial(self._canvas.callback, id)`, ensuring every component can report progress without knowing the underlying logging infrastructure.

**Standardized Invocation**: The public `invoke` method (not to be confused with `_invoke`) handles cross-cutting concerns:
- Records start timestamps
- Copies inbound kwargs to the component's output dictionary
- Executes the component-specific `_invoke` method with timeout protection
- Calculates elapsed time and returns the populated output dictionary

Component developers only need to override `_invoke` and call `self.set_output()` to emit results downstream.

## Core RAG Components in `rag/flow/`

The directory contains specialized components that transform raw files into structured knowledge:

### File Retrieval ([`rag/flow/file.py`](https://github.com/infiniflow/ragflow/blob/main/rag/flow/file.py))

The **`File`** component serves as the entry point. It retrieves the original file either from the database via `DocumentService` or from an uploaded blob storage. It emits a dictionary containing `name`, `file` or `blob`, and metadata required by downstream parsers.

### Document Parsing ([`rag/flow/parser/parser.py`](https://github.com/infiniflow/ragflow/blob/main/rag/flow/parser/parser.py))

The **`Parser`** component dispatches files to format-specific parsers based on the file suffix and the **`ParserParam.setups`** configuration. It handles PDFs, spreadsheets, Word documents, images, and plain text, producing structured output in `json`, `markdown`, or `html` formats.

### LLM-Based Extraction ([`rag/flow/extractor/extractor.py`](https://github.com/infiniflow/ragflow/blob/main/rag/flow/extractor/extractor.py))

The **`Extractor`** component invokes Large Language Models to extract specific fields, generate summaries, or create tables of contents from the parsed chunks. This enables semantic enrichment of documents before storage.

### Hierarchical Merging ([`rag/flow/hierarchical_merger/hierarchical_merger.py`](https://github.com/infiniflow/ragflow/blob/main/rag/flow/hierarchical_merger/hierarchical_merger.py))

The **`HierarchicalMerger`** component transforms flat text chunks into hierarchical document structures. It uses user-defined regex patterns (e.g., `^#` for level 1, `^##` for level 2) to reconstruct document outlines, preserving semantic relationships between sections.

### Text Splitting and Tokenization

Optional preprocessing components include:
- **`Tokenizer`** ([`rag/flow/tokenizer/tokenizer.py`](https://github.com/infiniflow/ragflow/blob/main/rag/flow/tokenizer/tokenizer.py)): Counts tokens to enforce limits
- **`Splitter`** ([`rag/flow/splitter/splitter.py`](https://github.com/infiniflow/ragflow/blob/main/rag/flow/splitter/splitter.py)): Implements sliding-window or semantic splitting strategies for long texts

## Defining the RAG Pipeline with JSON DSL

The **Domain Specific Language (DSL)** declaratively defines the component graph. The JSON structure contains two top-level keys:

- **`components`**: Array of component definitions with `id`, `type`, and `param` objects
- **`edges`**: Array of connections mapping `source` to `target` component IDs

Example DSL configuration:

```json
{
  "components": [
    {"id": "File", "type": "File", "param": {}},
    {"id": "Parser", "type": "Parser", "param": {"setups": {"pdf": {"suffix": ["pdf"], "parse_method": "deepdoc"}}},
    {"id": "Extractor", "type": "Extractor", "param": {"field_name": "summary"}},
    {"id": "HierarchicalMerger", "type": "HierarchicalMerger",
     "param": {"levels": [["^#"], ["^##"]], "hierarchy": 2}}
  ],
  "edges": [
    {"source": "File", "target": "Parser"},
    {"source": "Parser", "target": "Extractor"},
    {"source": "Extractor", "target": "HierarchicalMerger"}
  ]
}

```

The `Graph` constructor parses this DSL to build `self.components` (mapping IDs to instances) and calculates downstream dependencies for topological execution.

## Monitoring and Logging Architecture

Real-time visibility into **RAG pipeline orchestration** is achieved through a dual-layer logging system:

**Redis Stream Logging**: Each pipeline execution creates a Redis key using the format `<flow_id>-<task_id>-logs`. The `callback` method appends structured entries containing `progress`, `message`, `datetime`, and `timestamp` fields, enabling live log tailing by external observers.

**Database Persistence**: The `TaskService.update_progress` method writes human-readable progress strings to the database, allowing the web UI to display component-level status without polling Redis directly.

This architecture separates high-frequency log streaming from persistent state management, optimizing for both real-time monitoring and historical audit trails.

## Asynchronous Design for Scalable RAG Processing

The entire **RAG pipeline orchestration** layer is built on Python's `asyncio` to maximize throughput:

**Non-blocking Component Execution**: All `_invoke` methods are defined as `async def`, allowing the pipeline to handle I/O-bound operations (file retrieval, LLM API calls, database writes) without blocking the event loop.

**Concurrent Task Management**: The `run` coroutine uses `asyncio.create_task` to spawn component invocations and `asyncio.gather` to synchronize parallel branches when the DSL permits concurrent execution. Post-processing steps like `image2id` (converting images to storage IDs) execute concurrently with text processing.

This asynchronous architecture ensures that slow operations in one branch (e.g., parsing a large PDF) do not stall independent branches (e.g., tokenizing a small text file), making the system horizontally scalable for high-volume document processing.

## Summary

- The **RAG pipeline orchestration** in RAGFlow centers on the `Pipeline` class in [`rag/flow/pipeline.py`](https://github.com/infiniflow/ragflow/blob/main/rag/flow/pipeline.py), which inherits from `agent.canvas.Graph` to execute component DAGs.
- A **JSON DSL** declaratively defines components (File, Parser, Extractor, HierarchicalMerger) and their connections, enabling flexible pipeline configuration without code changes.
- **ProcessBase** in [`rag/flow/base.py`](https://github.com/infiniflow/ragflow/blob/main/rag/flow/base.py) standardizes component behavior, providing callback integration, timeout handling, and output management for all processing blocks.
- The execution engine is fully **asynchronous**, using `asyncio` to run components concurrently while streaming progress logs to **Redis** and persisting status to the database via `TaskService`.
- Error handling is strict: any component setting `_ERROR` immediately aborts the pipeline with progress `-1` and raises `TaskCanceledException`.

## Frequently Asked Questions

### What is the role of the `Pipeline` class in RAGFlow?

The `Pipeline` class serves as the central orchestrator for the RAG pipeline. Defined in [`rag/flow/pipeline.py`](https://github.com/infiniflow/ragflow/blob/main/rag/flow/pipeline.py), it inherits from `agent.canvas.Graph` to parse a JSON DSL describing the component graph, then executes each component asynchronously while managing callbacks, progress tracking, and error propagation. It binds the execution context (tenant, document, and task IDs) to ensure components can access storage and report status correctly.

### How does the RAG pipeline handle errors during component execution?

If any component sets an internal `_ERROR` flag during its `_invoke` method, the pipeline immediately enters an abort sequence. The `run` coroutine in [`pipeline.py`](https://github.com/infiniflow/ragflow/blob/main/pipeline.py) catches this state, sets the task progress to `-1` in both Redis and the database via `TaskService`, raises a `TaskCanceledException`, and stops further component execution. This ensures that partial document processing does not contaminate downstream results or the knowledge base.

### Can I add custom components to the RAGFlow pipeline?

Yes, custom components can be added by inheriting from `ProcessBase` in [`rag/flow/base.py`](https://github.com/infiniflow/ragflow/blob/main/rag/flow/base.py) and implementing the `_invoke` method. Create a parameter class inheriting from `ProcessParamBase` for configuration, then place both classes in a new file under `rag/flow/`. The `Graph` factory automatically discovers components by type name, so adding your component to the DSL with `"type": "YourComponentName"` makes it available for pipeline execution without modifying core orchestration code.

### How does the DSL define connections between RAG components?

The DSL uses a JSON structure with two top-level arrays: `components` and `edges`. The `components` array defines each processing block with a unique `id`, `type` (class name), and `param` object for configuration. The `edges` array contains objects with `source` and `target` keys referencing component IDs, creating a directed acyclic graph. The `Pipeline` constructor parses this structure to build `self.components` and calculate downstream dependencies, ensuring components execute in topological order while respecting the defined data flow.