How the RAG Pipeline Is Orchestrated in RAGFlow's `rag/flow/` Directory
The RAG pipeline in RAGFlow is orchestrated by the Pipeline class in rag/flow/pipeline.py, which executes a directed acyclic graph of components defined by a JSON DSL, processing documents asynchronously from file retrieval through parsing, extraction, and hierarchical merging.
The infiniflow/ragflow repository implements a modular Retrieval-Augmented Generation (RAG) engine where the rag/flow/ directory houses the core orchestration logic. This article examines how RAG pipeline orchestration works, tracing the execution flow from DSL definition to final output through the graph-based engine.
The Pipeline Class: Central Orchestrator of the RAG Flow
Inheriting from the Graph Engine
The orchestration begins with the Pipeline class defined in rag/flow/pipeline.py. This class inherits from agent.canvas.Graph, which provides the underlying graph traversal capabilities. When instantiated, the Pipeline receives a DSL (Domain Specific Language) structure—either as JSON or a Python dict—that describes the directed acyclic graph of processing components.
DSL Resolution and Context Binding
The constructor resolves critical context parameters before execution begins:
doc_idandkb_ididentify the target document and knowledge basetenant_id,task_id, andflow_idprovide multi-tenancy and task tracking capabilities
These identifiers enable downstream components to fetch raw file bytes from the storage layer and report progress correctly.
from rag.flow.pipeline import Pipeline
# Initialize with DSL and context
pipeline = Pipeline(
dsl,
tenant_id=tid,
doc_id=did,
task_id=task_id,
flow_id=fid
)
Step-by-Step RAG Pipeline Execution Flow
1. Graph Initialization and Component Mapping
When super().__init__(dsl, tenant_id, task_id) executes, the graph engine parses the DSL and builds self.components—a mapping of component IDs to instantiated component objects. It also records downstream connections via get_downstream to determine execution order.
2. Implicit File Component Injection
If the DSL does not explicitly define a starting node, the pipeline automatically prepends a File component. This ensures every pipeline begins with document retrieval:
# From pipeline.py lines 24-27
if not self.path:
self.path.append("File")
3. Asynchronous Component Invocation Loop
The run coroutine (lines 44-64 in pipeline.py) implements the core execution loop:
- Iterates through
self.pathin topological order - Invokes each component asynchronously:
await cpn_obj.invoke(**last_cpn.output()) - Passes the output dictionary from the previous component as input to the next
This design allows components to communicate via standardized dictionaries containing keys like text, json, markdown, or html.
4. Progress Callbacks and Redis Logging
Every component reports progress through the callback method (lines 43-99). This mechanism:
- Stores per-component logs in Redis using the key format
<flow_id>-<task_id>-logs - Updates the task progress in the database via
TaskService.update_progress - Enables real-time UI updates showing which component is currently processing
# Callback structure stored in Redis
{
"progress": 0.5,
"message": "Parsing PDF pages...",
"datetime": "2024-01-15T10:30:00",
"timestamp": 1705315800
}
5. Error Handling and Pipeline Abort
If any component sets an error flag (_ERROR), the pipeline immediately aborts:
- Progress is set to
-1to indicate failure - A
TaskCanceledExceptionis raised - Partial logs remain available in Redis for debugging
Successful completion logs a final "END" entry and returns the terminal component's output dictionary.
The ProcessBase Abstraction: Standardizing RAG Components
All processing blocks inherit from ProcessBase defined in rag/flow/base.py. This abstraction provides:
Callback Wiring: The constructor attaches the canvas callback using partial(self._canvas.callback, id), ensuring every component can report progress without knowing the underlying logging infrastructure.
Standardized Invocation: The public invoke method (not to be confused with _invoke) handles cross-cutting concerns:
- Records start timestamps
- Copies inbound kwargs to the component's output dictionary
- Executes the component-specific
_invokemethod with timeout protection - Calculates elapsed time and returns the populated output dictionary
Component developers only need to override _invoke and call self.set_output() to emit results downstream.
Core RAG Components in rag/flow/
The directory contains specialized components that transform raw files into structured knowledge:
File Retrieval (rag/flow/file.py)
The File component serves as the entry point. It retrieves the original file either from the database via DocumentService or from an uploaded blob storage. It emits a dictionary containing name, file or blob, and metadata required by downstream parsers.
Document Parsing (rag/flow/parser/parser.py)
The Parser component dispatches files to format-specific parsers based on the file suffix and the ParserParam.setups configuration. It handles PDFs, spreadsheets, Word documents, images, and plain text, producing structured output in json, markdown, or html formats.
LLM-Based Extraction (rag/flow/extractor/extractor.py)
The Extractor component invokes Large Language Models to extract specific fields, generate summaries, or create tables of contents from the parsed chunks. This enables semantic enrichment of documents before storage.
Hierarchical Merging (rag/flow/hierarchical_merger/hierarchical_merger.py)
The HierarchicalMerger component transforms flat text chunks into hierarchical document structures. It uses user-defined regex patterns (e.g., ^# for level 1, ^## for level 2) to reconstruct document outlines, preserving semantic relationships between sections.
Text Splitting and Tokenization
Optional preprocessing components include:
Tokenizer(rag/flow/tokenizer/tokenizer.py): Counts tokens to enforce limitsSplitter(rag/flow/splitter/splitter.py): Implements sliding-window or semantic splitting strategies for long texts
Defining the RAG Pipeline with JSON DSL
The Domain Specific Language (DSL) declaratively defines the component graph. The JSON structure contains two top-level keys:
components: Array of component definitions withid,type, andparamobjectsedges: Array of connections mappingsourcetotargetcomponent IDs
Example DSL configuration:
{
"components": [
{"id": "File", "type": "File", "param": {}},
{"id": "Parser", "type": "Parser", "param": {"setups": {"pdf": {"suffix": ["pdf"], "parse_method": "deepdoc"}}},
{"id": "Extractor", "type": "Extractor", "param": {"field_name": "summary"}},
{"id": "HierarchicalMerger", "type": "HierarchicalMerger",
"param": {"levels": [["^#"], ["^##"]], "hierarchy": 2}}
],
"edges": [
{"source": "File", "target": "Parser"},
{"source": "Parser", "target": "Extractor"},
{"source": "Extractor", "target": "HierarchicalMerger"}
]
}
The Graph constructor parses this DSL to build self.components (mapping IDs to instances) and calculates downstream dependencies for topological execution.
Monitoring and Logging Architecture
Real-time visibility into RAG pipeline orchestration is achieved through a dual-layer logging system:
Redis Stream Logging: Each pipeline execution creates a Redis key using the format <flow_id>-<task_id>-logs. The callback method appends structured entries containing progress, message, datetime, and timestamp fields, enabling live log tailing by external observers.
Database Persistence: The TaskService.update_progress method writes human-readable progress strings to the database, allowing the web UI to display component-level status without polling Redis directly.
This architecture separates high-frequency log streaming from persistent state management, optimizing for both real-time monitoring and historical audit trails.
Asynchronous Design for Scalable RAG Processing
The entire RAG pipeline orchestration layer is built on Python's asyncio to maximize throughput:
Non-blocking Component Execution: All _invoke methods are defined as async def, allowing the pipeline to handle I/O-bound operations (file retrieval, LLM API calls, database writes) without blocking the event loop.
Concurrent Task Management: The run coroutine uses asyncio.create_task to spawn component invocations and asyncio.gather to synchronize parallel branches when the DSL permits concurrent execution. Post-processing steps like image2id (converting images to storage IDs) execute concurrently with text processing.
This asynchronous architecture ensures that slow operations in one branch (e.g., parsing a large PDF) do not stall independent branches (e.g., tokenizing a small text file), making the system horizontally scalable for high-volume document processing.
Summary
- The RAG pipeline orchestration in RAGFlow centers on the
Pipelineclass inrag/flow/pipeline.py, which inherits fromagent.canvas.Graphto execute component DAGs. - A JSON DSL declaratively defines components (File, Parser, Extractor, HierarchicalMerger) and their connections, enabling flexible pipeline configuration without code changes.
- ProcessBase in
rag/flow/base.pystandardizes component behavior, providing callback integration, timeout handling, and output management for all processing blocks. - The execution engine is fully asynchronous, using
asyncioto run components concurrently while streaming progress logs to Redis and persisting status to the database viaTaskService. - Error handling is strict: any component setting
_ERRORimmediately aborts the pipeline with progress-1and raisesTaskCanceledException.
Frequently Asked Questions
What is the role of the Pipeline class in RAGFlow?
The Pipeline class serves as the central orchestrator for the RAG pipeline. Defined in rag/flow/pipeline.py, it inherits from agent.canvas.Graph to parse a JSON DSL describing the component graph, then executes each component asynchronously while managing callbacks, progress tracking, and error propagation. It binds the execution context (tenant, document, and task IDs) to ensure components can access storage and report status correctly.
How does the RAG pipeline handle errors during component execution?
If any component sets an internal _ERROR flag during its _invoke method, the pipeline immediately enters an abort sequence. The run coroutine in pipeline.py catches this state, sets the task progress to -1 in both Redis and the database via TaskService, raises a TaskCanceledException, and stops further component execution. This ensures that partial document processing does not contaminate downstream results or the knowledge base.
Can I add custom components to the RAGFlow pipeline?
Yes, custom components can be added by inheriting from ProcessBase in rag/flow/base.py and implementing the _invoke method. Create a parameter class inheriting from ProcessParamBase for configuration, then place both classes in a new file under rag/flow/. The Graph factory automatically discovers components by type name, so adding your component to the DSL with "type": "YourComponentName" makes it available for pipeline execution without modifying core orchestration code.
How does the DSL define connections between RAG components?
The DSL uses a JSON structure with two top-level arrays: components and edges. The components array defines each processing block with a unique id, type (class name), and param object for configuration. The edges array contains objects with source and target keys referencing component IDs, creating a directed acyclic graph. The Pipeline constructor parses this structure to build self.components and calculate downstream dependencies, ensuring components execute in topological order while respecting the defined data flow.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →