Core Components of the RAGFlow Backend API: Architecture and Interaction Patterns
The RAGFlow backend API is built on a Quart web server that uses Blueprint modules for routing, delegates business logic to service classes, and persists data through Peewee ORM models, with background tasks managed via Redis queues and distributed locks.
RAGFlow is an open-source RAG (Retrieval-Augmented Generation) engine that provides a modular backend API for document processing and knowledge base management. The architecture follows a clean separation of concerns, with distinct layers handling HTTP routing, business logic, and data persistence. This design enables high concurrency, extensibility through plugins, and independent scaling of background workers.
Entry Point and Server Bootstrap
The backend lifecycle begins in api/ragflow_server.py, which serves as the application entry point. This file initializes the Quart application, sets up logging and signal handling, and launches the HTTP server.
# From api/ragflow_server.py
def main():
# Initialize database connections
init_db()
# Load plugins via GlobalPluginManager
plugin_manager.load_plugins()
# Start background progress updater thread
start_progress_updater()
# Launch Quart server
app.run(host=HOST, port=PORT)
The server also spawns a background thread that periodically calls DocumentService.update_progress(), protected by a Redis distributed lock to prevent concurrent updates across multiple server instances.
Application Factory and Blueprint Registration
The global Quart instance is created in api/apps/__init__.py, which acts as an application factory. This module registers CORS handlers, custom JSON encoders, and dynamically loads all API modules.
# From api/apps/__init__.py
def create_app():
app = Quart(__name__)
# Register CORS and error handlers
register_extensions(app)
# Dynamically load all *_app.py modules as Blueprints
for module in glob('api/apps/*_app.py'):
import_module(module)
return app
The register_page helper function converts each module into a Blueprint with an appropriate URL prefix, enabling modular routing where each functional area (users, knowledge bases, files) maintains its own route definitions.
HTTP Layer: Blueprint Modules
Individual Blueprint modules such as kb_app.py, file_app.py, and user_app.py define the RESTful endpoints for each domain. These modules handle HTTP request parsing, parameter validation, and response serialization.
For example, the knowledge base endpoint in api/apps/kb_app.py handles listing knowledge bases:
# From api/apps/kb_app.py
@app.route('/api/v1/kb', methods=['GET'])
async def list_knowledgebases():
# Extract query parameters
tenant_id = request.args.get('tenant_id')
# Delegate to service layer
result = KnowledgebaseService.get_list(tenant_id=tenant_id)
# Return JSON response
return jsonify(result)
Business Logic Layer: Service Classes
The service layer in api/db/services/*.py encapsulates all business rules and data access logic. Classes like DocumentService, KnowledgebaseService, and UserService provide high-level methods that operate on Peewee ORM models.
For instance, DocumentService manages document lifecycle operations and task queuing:
# From api/db/services/document_service.py
class DocumentService:
@staticmethod
def update_progress(doc_id):
# Protected by RedisDistributedLock
with RedisDistributedLock(f"doc_{doc_id}"):
# Aggregate task progress from Redis queue
progress = calculate_progress(doc_id)
# Update database via Peewee model
Document.update(progress=progress).where(Document.id == doc_id).execute()
@staticmethod
def queue_raptor_o_graphrag_tasks(doc_id):
# Enqueue long-running GraphRAG task to Redis
task = {"doc_id": doc_id, "type": "graphrag"}
RedisConn.lpush("task_queue", json.dumps(task))
Data Access Layer: Peewee ORM Models
Database entities are defined in api/db/db_models.py using the Peewee ORM. The BaseDataBase class establishes a pooled, retry-aware connection to MySQL, PostgreSQL, or OceanBase, while individual models like User, Knowledgebase, and Document map to relational tables.
# From api/db/db_models.py
class BaseDataBase:
def __init__(self):
# Pooled connection with retry logic
self.database = PooledMySQLDatabase(
database=settings.DB_NAME,
host=settings.DB_HOST,
max_connections=20,
stale_timeout=300
)
class Document(BaseModel):
id = CharField(primary_key=True)
kb_id = CharField(index=True)
name = CharField()
progress = FloatField(default=0.0)
status = CharField(default="pending")
# Additional fields...
Background Processing: Redis Task Queues and Distributed Locks
Long-running operations such as document parsing, embedding generation, and GraphRAG processing are decoupled from the HTTP request cycle through a Redis-backed task queue. The common/utils/redis_conn.py module provides the Redis client, while DocumentService uses RedisDistributedLock to prevent race conditions during progress updates.
The background thread spawned in ragflow_server.py periodically aggregates task status from the queue and updates the database, enabling clients to poll for progress via the document endpoints.
Configuration and Utilities
Central configuration resides in common/settings.py, which loads environment variables and defines database connections, storage backends (MinIO), and security parameters such as JWT secrets. Utility modules in api/utils/* provide JSON encoding, command registration, and logging helpers that are imported across the application.
Practical Code Examples
List Knowledge Bases (Python Client)
import requests
API_URL = "http://localhost:8080/api/v1/kb"
TOKEN = "Bearer <your-access-token>" # obtained from /api/v1/login
resp = requests.get(API_URL, headers={"Authorization": TOKEN})
print(resp.json())
The endpoint is defined in api/apps/kb_app.py → @app.route("/kb", methods=["GET"]) and ultimately calls KnowledgebaseService.get_list(...).
Upload a Document (cURL)
curl -X POST http://localhost:8080/api/v1/file \
-H "Authorization: Bearer $TOKEN" \
-F "file=@/path/to/report.pdf" \
-F "kb_id=kb_12345"
file_app.py receives the multipart request, stores the file via settings.STORAGE_IMPL, creates a File record, then creates a Document record through DocumentService.insert().
Trigger a GraphRAG Task (Python)
from ragflow import RAGFlowClient
client = RAGFlowClient(base_url="http://localhost:8080", token=TOKEN)
doc_id = "doc_abc123"
task_id = client.graph_rag.create_task(doc_id) # internally calls queue_raptor_o_graphrag_tasks()
print("Task queued:", task_id)
The helper queue_raptor_o_graphrag_tasks lives in api/db/services/document_service.py and pushes a task description onto the Redis queue.
Summary
- Entry Point:
ragflow_server.pybootstraps the Quart application, initializes the database, loads plugins, and spawns a background progress updater thread protected by Redis distributed locks. - HTTP Routing:
api/apps/__init__.pycreates the global Quart instance and dynamically registers Blueprint modules (*_app.py) that define RESTful endpoints. - Business Logic: Service classes in
api/db/services/*.pyencapsulate domain logic, validation, and database operations, using Peewee ORM models defined inapi/db/db_models.py. - Data Persistence: Peewee models with pooled, retry-aware database connections to MySQL, PostgreSQL, or OceanBase.
- Background Processing: Redis-backed task queues and distributed locks enable asynchronous document processing (parsing, embedding, GraphRAG) decoupled from HTTP requests.
- Configuration: Centralized settings in
common/settings.pymanage database, storage, and security parameters.
Frequently Asked Questions
How does RAGFlow handle concurrent document processing without data races?
RAGFlow uses RedisDistributedLock to protect critical sections during progress updates. The background thread in ragflow_server.py acquires a lock specific to each document before calling DocumentService.update_progress(), ensuring that only one instance updates the database row at a time, even when multiple workers process chunks of the same document concurrently.
What database backends does RAGFlow support?
According to the BaseDataBase implementation in api/db/db_models.py, RAGFlow supports MySQL, PostgreSQL, and OceanBase through Peewee's pooled connection interface. The specific driver is selected via environment variables defined in common/settings.py, with connection pooling configured to handle up to 20 concurrent connections by default.
How are long-running tasks like GraphRAG decoupled from HTTP requests?
When a client triggers a GraphRAG operation, the Blueprint handler calls DocumentService.queue_raptor_o_graphrag_tasks(), which serializes the task description and pushes it onto a Redis list acting as a task queue. The HTTP request returns immediately with a task ID, while separate worker processes (outside the API server) consume the queue and execute the heavy processing. The API server's background thread periodically aggregates worker progress and updates the Document table for client polling.
Where does RAGFlow load plugins, and how do they integrate with the API?
RAGFlow initializes the GlobalPluginManager during server startup in ragflow_server.py (lines 44-45). Plugins are dynamically loaded from the agent/plugin.py module and can register additional routes or background tasks with the global Quart app instance. This allows custom retrievers, connectors, or API endpoints to integrate seamlessly without modifying core Blueprint files.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →