# How OpenRAG Handles Document Ingestion and Processing Configuration: TaskService Architecture Explained

> Discover how OpenRAG's TaskService manages document ingestion and processing configuration. Learn about background tasks, concurrency, and Langflow flow defaults for efficient data handling.

- Repository: [Langflow/openrag](https://github.com/langflow-ai/openrag)
- Tags: architecture
- Published: 2026-03-13

---

**OpenRAG orchestrates document ingestion through the `TaskService` class, which manages background processing, concurrency limits, and timeout protection while pulling chunking and embedding defaults from a configurable Langflow flow.**

OpenRAG's ingestion system combines async Python coroutines with Langflow's visual pipeline to process documents from upload to vector store. This article examines how the `TaskService` class in `langflow-ai/openrag` coordinates file processing, resource limits, and user-configurable settings like chunk size and embedding models.

## TaskService: The Core Ingestion Orchestrator

The ingestion pipeline centers on [`src/services/task_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/task_service.py), where the **`TaskService`** class manages the complete lifecycle of document uploads. When users upload files—whether local, cloud, or URL—the API invokes either `TaskService.create_upload_task` or `TaskService.create_langflow_upload_task` depending on the source.

Both methods instantiate concrete processors and delegate to `TaskService.create_custom_task`:

```python

# src/services/task_service.py – TaskService class methods

async def create_upload_task(self, user_id: str, file_paths: List[str], ...):
    processor = DocumentFileProcessor()
    return await self.create_custom_task(
        user_id=user_id,
        processor=processor,
        items=file_paths,
        ...
    )

async def create_langflow_upload_task(self, user_id: str, flow_id: str, ...):
    processor = LangflowFileProcessor(flow_id=flow_id)
    return await self.create_custom_task(...)

```

## Background Processing and Concurrency Control

### Async Task Execution

Each task spawns a **background coroutine** (`background_custom_processor`) that runs asynchronously while updating task status and timestamps. The coroutine iterates over uploaded items and applies the attached processor:

```python

# src/services/task_service.py – Background processing loop

async def background_custom_processor(self, task_id: str):
    task = self.task_store[task_id]
    processor = task.processor
    
    for item in task.items:
        async with self._processing_semaphore:
            result = await self._process_with_timeout(processor, item)
            # Update counters atomically...

```

### Global Resource Limits

To prevent GPU/CPU exhaustion, `TaskService` initializes a **global semaphore** (`self._processing_semaphore`) based on the number of workers detected at startup via `utils.gpu_detection.get_worker_count`. This semaphore limits concurrent file processing across *all* active tasks.

Per-task locks (`self._task_locks`) ensure atomic updates to counters like `processed_files` and `failed_files`, preventing race conditions during high-volume ingestion.

## Timeout Protection and Error Handling

Every file processes inside `_process_with_timeout`, which wraps the processor call with `asyncio.wait_for`. The default timeout is **3600 seconds** (configurable via the `TaskService` constructor parameter `ingestion_timeout`):

```python

# src/services/task_service.py – Timeout handling

async def _process_with_timeout(self, processor, item, timeout=None):
    timeout = timeout or self.ingestion_timeout
    try:
        return await asyncio.wait_for(
            processor.process(item), 
            timeout=timeout
        )
    except asyncio.TimeoutError:
        raise IngestionTimeoutError(f"Processing exceeded {timeout}s")

```

When timeouts fire, the system raises `IngestionTimeoutError`, logs the failure, and marks the file as failed in the task store while allowing the remaining queue to continue processing.

## Document Processing Implementations

OpenRAG provides two distinct processor implementations in [`src/models/processors.py`](https://github.com/langflow-ai/openrag/blob/main/src/models/processors.py):

- **`DocumentFileProcessor`**: Handles plain file uploads by extracting text, running OCR and table-structure detection via **Docling**, splitting content into chunks, generating embeddings using the selected model, and writing vectors to OpenSearch.

- **`LangflowFileProcessor`**: Invokes the Langflow ingestion flow, offering a UI-driven pipeline with custom node configuration for complex transformation logic.

## Configuration Management and Defaults

### Fetching Ingestion Defaults

The system stores default chunking parameters (size, overlap, separator) and embedding models within the **Langflow ingestion flow** (JSON definition in [`flows/ingestion_flow.json`](https://github.com/langflow-ai/openrag/blob/main/flows/ingestion_flow.json)). The settings endpoint in [`src/api/settings.py`](https://github.com/langflow-ai/openrag/blob/main/src/api/settings.py) fetches these values on-demand:

```python

# src/api/settings.py – Retrieving flow-based defaults

async def get_settings():
    response = await clients.langflow_request(
        "GET", f"/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}"
    )
    flow_data = response.json()
    
    # Extract from SplitText and OpenAIEmbeddings nodes

    ingestion_defaults = {
        "chunkSize": knowledge_config.chunk_size,
        "chunkOverlap": knowledge_config.chunk_overlap,
        "separator": "\\n",
        "embeddingModel": knowledge_config.embedding_model,
    }
    return SettingsResponse(ingestion_defaults=ingestion_defaults)

```

### Runtime Configuration Updates

Users can override defaults via the PATCH endpoint, which updates the Langflow flow nodes directly:

```python

# src/api/settings.py – Updating configuration

@app.patch("/api/v1/settings/ingestion")
async def update_ingestion_defaults(defaults: IngestionDefaultsConfig):
    await clients.langflow_patch_flow_node(
        flow_id=LANGFLOW_INGEST_FLOW_ID,
        node_id="SplitText-QIKhg",
        updates={
            "chunk_size": {"value": defaults.chunkSize},
            "chunk_overlap": {"value": defaults.chunkOverlap}
        }
    )

```

## Monitoring and Observability

Every task creation and completion emits telemetry events (`Category.TASK_OPERATIONS`), enabling the frontend to display active, failed, and completed tasks in the Tasks badge. The UI polls the task store endpoint to render progress bars and supports cancellation, which aborts the background coroutine immediately.

Frontend monitoring implementation:

```tsx
// frontend/components/TaskStatus.tsx
useEffect(() => {
  const interval = setInterval(async () => {
    const { data } = await axios.get(`/api/v1/tasks/${taskId}`);
    setTaskInfo(data);
  }, 2000);
  return () => clearInterval(interval);
}, [taskId]);

```

## Summary

- **`TaskService`** in [`src/services/task_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/task_service.py) orchestrates the entire ingestion lifecycle, from task creation to background processing completion.
- **Concurrency control** combines a global semaphore (limiting total workers) with per-task locks (ensuring atomic counter updates).
- **Timeout protection** via `_process_with_timeout` defaults to 3600 seconds but supports per-call overrides through `ingestion_timeout`.
- **Processor selection** determines pipeline behavior: `DocumentFileProcessor` for direct document handling with Docling OCR, or `LangflowFileProcessor` for UI-configurable flows.
- **Configuration defaults** reside in the Langflow ingestion flow JSON and are exposed through [`src/api/settings.py`](https://github.com/langflow-ai/openrag/blob/main/src/api/settings.py), allowing runtime updates to chunk size, overlap, and embedding models.

## Frequently Asked Questions

### How does OpenRAG prevent resource exhaustion during large batch uploads?

OpenRAG implements a **global semaphore** (`self._processing_semaphore`) in `TaskService` that limits concurrent file processing based on detected GPU/CPU worker counts from `utils.gpu_detection.get_worker_count`. This ensures the system never processes more files simultaneously than the underlying hardware can handle, regardless of how many tasks users create.

### Where are the default chunking and embedding settings stored in OpenRAG?

Default ingestion parameters—including **chunk size**, **chunk overlap**, **separator**, and **embedding model**—live within the Langflow ingestion flow definition ([`flows/ingestion_flow.json`](https://github.com/langflow-ai/openrag/blob/main/flows/ingestion_flow.json)). The `get_settings()` function in [`src/api/settings.py`](https://github.com/langflow-ai/openrag/blob/main/src/api/settings.py) extracts these values from specific flow nodes (such as `SplitText` and `OpenAIEmbeddings`) and returns them wrapped in the `IngestionDefaultsConfig` model.

### What happens when a document ingestion task exceeds the time limit?

When a file exceeds the configured `ingestion_timeout` (default 3600 seconds), the `_process_with_timeout` method raises `IngestionTimeoutError`, logs the failure event, and marks that specific file as failed in the task counters. The background coroutine continues processing the remaining queue items rather than terminating the entire task, ensuring partial completion of large batches.

### How can I monitor the progress of an active ingestion task?

The frontend monitors tasks by polling the `/api/v1/tasks/{taskId}` endpoint every few seconds, which reads from the `TaskService.task_store` to return current status, processed file counts, failed counts, and duration. Additionally, the system emits telemetry events under `Category.TASK_OPERATIONS` that populate the Tasks badge in the UI with real-time completion percentages.