# How to Manage Background Tasks with TaskService in OpenRAG

> Master background task management in OpenRAG with TaskService. Learn how to efficiently orchestrate long-running file ingestion jobs using asyncio and concurrent processing.

- Repository: [Langflow/openrag](https://github.com/langflow-ai/openrag)
- Tags: how-to-guide
- Published: 2026-03-13

---

**The TaskService class in OpenRAG provides a singleton-based asyncio orchestration layer that manages long-running file ingestion jobs through concurrent semaphore-controlled processing, thread-safe counter updates, and automatic cleanup scheduling.**

OpenRAG (langflow-ai/openrag) processes large-scale document ingestion through a dedicated TaskService that abstracts away the complexity of async job management. This article explains how to leverage the TaskService implementation in [`src/services/task_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/task_service.py) to create, monitor, and gracefully terminate background file processing tasks while maintaining system responsiveness.

## Core Architecture of the TaskService

The TaskService operates as a singleton orchestrator that coordinates the entire lifecycle of background ingestion jobs. According to the OpenRAG source code, the service manages logical task containers (`UploadTask`) that group individual file-level jobs (`FileTask`), each tracked with atomic counters protected by asyncio locks.

### Global Concurrency Controls

In [`src/services/task_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/task_service.py), the service initializes a global `asyncio.Semaphore` (lines 39-43) whose size is dynamically calculated based on the host's GPU/CPU worker count via `utils.gpu_detection.get_worker_count`. This semaphore ensures that at most *N* files are processed concurrently across all active tasks, preventing resource exhaustion during bulk ingestion.

### Thread-Safe State Management

To prevent race conditions when multiple file coroutines complete simultaneously, the service maintains per-task locks in `self._task_locks` (lines 36-38). These locks protect counter updates for `processed_files`, `successful_files`, and `failed_files` during the execution of `background_custom_processor`.

## Creating and Scheduling Background Tasks

To initiate a bulk file ingestion job, invoke `TaskService.create_upload_task`, which generates a UUID, wraps file paths into `FileTask` instances (defined in [`src/models/tasks.py`](https://github.com/langflow-ai/openrag/blob/main/src/models/tasks.py)), and spawns a background coroutine via `asyncio.create_task`.

### Instantiating the Service

```python
from services.task_service import TaskService

# Create service instance (typically injected by FastAPI)

task_service = TaskService(
    document_service=my_doc_service,
    ingestion_timeout=3600  # seconds

)

# Start the periodic cleanup scheduler

task_service.start_cleanup_scheduler()

```

### Creating an Upload Task

```python

# files: list of absolute file paths

task_id = await task_service.create_upload_task(
    user_id=user_id,
    file_paths=files,
    jwt_token="eyJhbGci...",      # Optional auth token

    owner_name="John Doe",
    owner_email="john@example.com"
)

```

The method internally constructs a `DocumentFileProcessor` and delegates to `create_custom_task`, which schedules the `background_custom_processor` coroutine. This coroutine iterates through each file, acquiring the global semaphore before calling `_process_with_timeout` to execute the processor with timeout protection.

## Monitoring Task Progress

The service exposes task state through `get_task_status`, which aggregates per-file statuses from the `task_store` dictionary and calculates running and pending counts. The FastAPI layer in [`src/api/tasks.py`](https://github.com/langflow-ai/openrag/blob/main/src/api/tasks.py) surfaces this data via `GET /tasks/{task_id}`.

### Querying Task Status

```python
status = task_service.get_task_status(user_id, task_id)
print(status)

# Output includes:

# - task_id, status, total_files

# - processed_files, successful_files, failed_files

# - running_files, pending_files

# - duration_seconds and per-file metadata

```

## Cancelling and Cleaning Up Tasks

OpenRAG provides explicit lifecycle management through cancellation and automatic cleanup mechanisms to prevent memory leaks from stale task objects.

### Cancelling Running Tasks

When invoked, `cancel_task` (lines 52-70) locates the background asyncio task, cancels the coroutine, and marks the `UploadTask` and any pending or running `FileTask` instances as `FAILED`. The method increments the `failed_files` counter under the per-task lock to ensure atomic state updates.

```python
cancelled = await task_service.cancel_task(user_id, task_id)
if cancelled:
    print(f"Task {task_id} cancelled successfully")

```

### Automatic Cleanup Scheduling

The `start_cleanup_scheduler` method initiates a periodic task that runs every `CLEANUP_INTERVAL_SECONDS` (default 2 hours). This scheduler invokes `cleanup_old_tasks`, which purges `COMPLETED` or `FAILED` tasks older than `max_age_seconds` (default 1 hour) from `self.task_store` and removes associated lock entries from `self._task_locks`.

### Graceful Shutdown

The `shutdown` method (lines 110-138) ensures clean termination by cancelling the periodic cleanup task and all running background jobs, then awaiting their completion. This prevents dangling coroutines during application restart or deployment.

```python

# Call during FastAPI app shutdown event

await task_service.shutdown()

```

## Summary

- **TaskService** in [`src/services/task_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/task_service.py) acts as a singleton orchestrator for OpenRAG's file ingestion pipeline, managing the lifecycle of `UploadTask` and `FileTask` objects defined in [`src/models/tasks.py`](https://github.com/langflow-ai/openrag/blob/main/src/models/tasks.py).
- **Global concurrency** is enforced via an `asyncio.Semaphore` sized to the host's GPU/CPU worker count, while per-task `asyncio.Lock` instances ensure thread-safe counter updates.
- **Background processing** occurs through `asyncio.create_task` spawning `background_custom_processor`, which handles each file with timeout protection and semaphore-guarded execution.
- **Status monitoring** is available through `get_task_status` and exposed via FastAPI endpoints in [`src/api/tasks.py`](https://github.com/langflow-ai/openrag/blob/main/src/api/tasks.py) for frontend polling.
- **Cleanup and cancellation** include a periodic scheduler (2-hour default) that removes stale tasks, plus a `shutdown` method for graceful termination of all background work.

## Frequently Asked Questions

### How does TaskService limit concurrent file processing?

TaskService uses a global `asyncio.Semaphore` initialized in [`src/services/task_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/task_service.py) (lines 39-43) with a size determined by `utils.gpu_detection.get_worker_count`. This semaphore is acquired within `background_custom_processor` before each file processing operation, ensuring that the total number of concurrent file operations across all tasks never exceeds the host's worker capacity.

### What happens when I cancel a task in OpenRAG?

When you call `cancel_task`, the service cancels the underlying asyncio task running `background_custom_processor`, then atomically updates the task status under a per-task lock. It marks the `UploadTask` as `FAILED`, sets any `RUNNING` or `PENDING` `FileTask` instances to `FAILED`, increments the `failed_files` counter, and records a cancellation error message. The cleanup scheduler will later remove the task if it remains in a terminal state beyond the configured age threshold.

### How does the cleanup scheduler work?

The `start_cleanup_scheduler` method creates a background task that executes `cleanup_old_tasks` every 2 hours (configurable via `CLEANUP_INTERVAL_SECONDS`). It scans `self.task_store` for tasks with `COMPLETED` or `FAILED` status older than `max_age_seconds` (default 1 hour) and removes them along with their associated locks from `self._task_locks`, preventing memory accumulation from completed ingestion jobs.

### What's the difference between UploadTask and FileTask?

`UploadTask` (defined in [`src/models/tasks.py`](https://github.com/langflow-ai/openrag/blob/main/src/models/tasks.py)) represents the logical container for an entire ingestion job, containing metadata like `user_id`, timestamps, and aggregate counters. `FileTask` represents individual file-level work units within that upload, tracking per-file status (`PENDING`, `RUNNING`, `COMPLETED`, `FAILED`), error messages, and processing duration. One `UploadTask` contains multiple `FileTask` instances, allowing granular progress tracking and partial failure handling.