How OpenRAG Handles Document Ingestion and Processing Configuration: TaskService Architecture Explained
OpenRAG orchestrates document ingestion through the TaskService class, which manages background processing, concurrency limits, and timeout protection while pulling chunking and embedding defaults from a configurable Langflow flow.
OpenRAG's ingestion system combines async Python coroutines with Langflow's visual pipeline to process documents from upload to vector store. This article examines how the TaskService class in langflow-ai/openrag coordinates file processing, resource limits, and user-configurable settings like chunk size and embedding models.
TaskService: The Core Ingestion Orchestrator
The ingestion pipeline centers on src/services/task_service.py, where the TaskService class manages the complete lifecycle of document uploads. When users upload files—whether local, cloud, or URL—the API invokes either TaskService.create_upload_task or TaskService.create_langflow_upload_task depending on the source.
Both methods instantiate concrete processors and delegate to TaskService.create_custom_task:
# src/services/task_service.py – TaskService class methods
async def create_upload_task(self, user_id: str, file_paths: List[str], ...):
processor = DocumentFileProcessor()
return await self.create_custom_task(
user_id=user_id,
processor=processor,
items=file_paths,
...
)
async def create_langflow_upload_task(self, user_id: str, flow_id: str, ...):
processor = LangflowFileProcessor(flow_id=flow_id)
return await self.create_custom_task(...)
Background Processing and Concurrency Control
Async Task Execution
Each task spawns a background coroutine (background_custom_processor) that runs asynchronously while updating task status and timestamps. The coroutine iterates over uploaded items and applies the attached processor:
# src/services/task_service.py – Background processing loop
async def background_custom_processor(self, task_id: str):
task = self.task_store[task_id]
processor = task.processor
for item in task.items:
async with self._processing_semaphore:
result = await self._process_with_timeout(processor, item)
# Update counters atomically...
Global Resource Limits
To prevent GPU/CPU exhaustion, TaskService initializes a global semaphore (self._processing_semaphore) based on the number of workers detected at startup via utils.gpu_detection.get_worker_count. This semaphore limits concurrent file processing across all active tasks.
Per-task locks (self._task_locks) ensure atomic updates to counters like processed_files and failed_files, preventing race conditions during high-volume ingestion.
Timeout Protection and Error Handling
Every file processes inside _process_with_timeout, which wraps the processor call with asyncio.wait_for. The default timeout is 3600 seconds (configurable via the TaskService constructor parameter ingestion_timeout):
# src/services/task_service.py – Timeout handling
async def _process_with_timeout(self, processor, item, timeout=None):
timeout = timeout or self.ingestion_timeout
try:
return await asyncio.wait_for(
processor.process(item),
timeout=timeout
)
except asyncio.TimeoutError:
raise IngestionTimeoutError(f"Processing exceeded {timeout}s")
When timeouts fire, the system raises IngestionTimeoutError, logs the failure, and marks the file as failed in the task store while allowing the remaining queue to continue processing.
Document Processing Implementations
OpenRAG provides two distinct processor implementations in src/models/processors.py:
-
DocumentFileProcessor: Handles plain file uploads by extracting text, running OCR and table-structure detection via Docling, splitting content into chunks, generating embeddings using the selected model, and writing vectors to OpenSearch. -
LangflowFileProcessor: Invokes the Langflow ingestion flow, offering a UI-driven pipeline with custom node configuration for complex transformation logic.
Configuration Management and Defaults
Fetching Ingestion Defaults
The system stores default chunking parameters (size, overlap, separator) and embedding models within the Langflow ingestion flow (JSON definition in flows/ingestion_flow.json). The settings endpoint in src/api/settings.py fetches these values on-demand:
# src/api/settings.py – Retrieving flow-based defaults
async def get_settings():
response = await clients.langflow_request(
"GET", f"/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}"
)
flow_data = response.json()
# Extract from SplitText and OpenAIEmbeddings nodes
ingestion_defaults = {
"chunkSize": knowledge_config.chunk_size,
"chunkOverlap": knowledge_config.chunk_overlap,
"separator": "\\n",
"embeddingModel": knowledge_config.embedding_model,
}
return SettingsResponse(ingestion_defaults=ingestion_defaults)
Runtime Configuration Updates
Users can override defaults via the PATCH endpoint, which updates the Langflow flow nodes directly:
# src/api/settings.py – Updating configuration
@app.patch("/api/v1/settings/ingestion")
async def update_ingestion_defaults(defaults: IngestionDefaultsConfig):
await clients.langflow_patch_flow_node(
flow_id=LANGFLOW_INGEST_FLOW_ID,
node_id="SplitText-QIKhg",
updates={
"chunk_size": {"value": defaults.chunkSize},
"chunk_overlap": {"value": defaults.chunkOverlap}
}
)
Monitoring and Observability
Every task creation and completion emits telemetry events (Category.TASK_OPERATIONS), enabling the frontend to display active, failed, and completed tasks in the Tasks badge. The UI polls the task store endpoint to render progress bars and supports cancellation, which aborts the background coroutine immediately.
Frontend monitoring implementation:
// frontend/components/TaskStatus.tsx
useEffect(() => {
const interval = setInterval(async () => {
const { data } = await axios.get(`/api/v1/tasks/${taskId}`);
setTaskInfo(data);
}, 2000);
return () => clearInterval(interval);
}, [taskId]);
Summary
TaskServiceinsrc/services/task_service.pyorchestrates the entire ingestion lifecycle, from task creation to background processing completion.- Concurrency control combines a global semaphore (limiting total workers) with per-task locks (ensuring atomic counter updates).
- Timeout protection via
_process_with_timeoutdefaults to 3600 seconds but supports per-call overrides throughingestion_timeout. - Processor selection determines pipeline behavior:
DocumentFileProcessorfor direct document handling with Docling OCR, orLangflowFileProcessorfor UI-configurable flows. - Configuration defaults reside in the Langflow ingestion flow JSON and are exposed through
src/api/settings.py, allowing runtime updates to chunk size, overlap, and embedding models.
Frequently Asked Questions
How does OpenRAG prevent resource exhaustion during large batch uploads?
OpenRAG implements a global semaphore (self._processing_semaphore) in TaskService that limits concurrent file processing based on detected GPU/CPU worker counts from utils.gpu_detection.get_worker_count. This ensures the system never processes more files simultaneously than the underlying hardware can handle, regardless of how many tasks users create.
Where are the default chunking and embedding settings stored in OpenRAG?
Default ingestion parameters—including chunk size, chunk overlap, separator, and embedding model—live within the Langflow ingestion flow definition (flows/ingestion_flow.json). The get_settings() function in src/api/settings.py extracts these values from specific flow nodes (such as SplitText and OpenAIEmbeddings) and returns them wrapped in the IngestionDefaultsConfig model.
What happens when a document ingestion task exceeds the time limit?
When a file exceeds the configured ingestion_timeout (default 3600 seconds), the _process_with_timeout method raises IngestionTimeoutError, logs the failure event, and marks that specific file as failed in the task counters. The background coroutine continues processing the remaining queue items rather than terminating the entire task, ensuring partial completion of large batches.
How can I monitor the progress of an active ingestion task?
The frontend monitors tasks by polling the /api/v1/tasks/{taskId} endpoint every few seconds, which reads from the TaskService.task_store to return current status, processed file counts, failed counts, and duration. Additionally, the system emits telemetry events under Category.TASK_OPERATIONS that populate the Tasks badge in the UI with real-time completion percentages.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →