How to Manage Background Tasks with TaskService in OpenRAG

The TaskService class in OpenRAG provides a singleton-based asyncio orchestration layer that manages long-running file ingestion jobs through concurrent semaphore-controlled processing, thread-safe counter updates, and automatic cleanup scheduling.

OpenRAG (langflow-ai/openrag) processes large-scale document ingestion through a dedicated TaskService that abstracts away the complexity of async job management. This article explains how to leverage the TaskService implementation in src/services/task_service.py to create, monitor, and gracefully terminate background file processing tasks while maintaining system responsiveness.

Core Architecture of the TaskService

The TaskService operates as a singleton orchestrator that coordinates the entire lifecycle of background ingestion jobs. According to the OpenRAG source code, the service manages logical task containers (UploadTask) that group individual file-level jobs (FileTask), each tracked with atomic counters protected by asyncio locks.

Global Concurrency Controls

In src/services/task_service.py, the service initializes a global asyncio.Semaphore (lines 39-43) whose size is dynamically calculated based on the host's GPU/CPU worker count via utils.gpu_detection.get_worker_count. This semaphore ensures that at most N files are processed concurrently across all active tasks, preventing resource exhaustion during bulk ingestion.

Thread-Safe State Management

To prevent race conditions when multiple file coroutines complete simultaneously, the service maintains per-task locks in self._task_locks (lines 36-38). These locks protect counter updates for processed_files, successful_files, and failed_files during the execution of background_custom_processor.

Creating and Scheduling Background Tasks

To initiate a bulk file ingestion job, invoke TaskService.create_upload_task, which generates a UUID, wraps file paths into FileTask instances (defined in src/models/tasks.py), and spawns a background coroutine via asyncio.create_task.

Instantiating the Service

from services.task_service import TaskService

# Create service instance (typically injected by FastAPI)

task_service = TaskService(
    document_service=my_doc_service,
    ingestion_timeout=3600  # seconds

)

# Start the periodic cleanup scheduler

task_service.start_cleanup_scheduler()

Creating an Upload Task


# files: list of absolute file paths

task_id = await task_service.create_upload_task(
    user_id=user_id,
    file_paths=files,
    jwt_token="eyJhbGci...",      # Optional auth token

    owner_name="John Doe",
    owner_email="[email protected]"
)

The method internally constructs a DocumentFileProcessor and delegates to create_custom_task, which schedules the background_custom_processor coroutine. This coroutine iterates through each file, acquiring the global semaphore before calling _process_with_timeout to execute the processor with timeout protection.

Monitoring Task Progress

The service exposes task state through get_task_status, which aggregates per-file statuses from the task_store dictionary and calculates running and pending counts. The FastAPI layer in src/api/tasks.py surfaces this data via GET /tasks/{task_id}.

Querying Task Status

status = task_service.get_task_status(user_id, task_id)
print(status)

# Output includes:

# - task_id, status, total_files

# - processed_files, successful_files, failed_files

# - running_files, pending_files

# - duration_seconds and per-file metadata

Cancelling and Cleaning Up Tasks

OpenRAG provides explicit lifecycle management through cancellation and automatic cleanup mechanisms to prevent memory leaks from stale task objects.

Cancelling Running Tasks

When invoked, cancel_task (lines 52-70) locates the background asyncio task, cancels the coroutine, and marks the UploadTask and any pending or running FileTask instances as FAILED. The method increments the failed_files counter under the per-task lock to ensure atomic state updates.

cancelled = await task_service.cancel_task(user_id, task_id)
if cancelled:
    print(f"Task {task_id} cancelled successfully")

Automatic Cleanup Scheduling

The start_cleanup_scheduler method initiates a periodic task that runs every CLEANUP_INTERVAL_SECONDS (default 2 hours). This scheduler invokes cleanup_old_tasks, which purges COMPLETED or FAILED tasks older than max_age_seconds (default 1 hour) from self.task_store and removes associated lock entries from self._task_locks.

Graceful Shutdown

The shutdown method (lines 110-138) ensures clean termination by cancelling the periodic cleanup task and all running background jobs, then awaiting their completion. This prevents dangling coroutines during application restart or deployment.


# Call during FastAPI app shutdown event

await task_service.shutdown()

Summary

  • TaskService in src/services/task_service.py acts as a singleton orchestrator for OpenRAG's file ingestion pipeline, managing the lifecycle of UploadTask and FileTask objects defined in src/models/tasks.py.
  • Global concurrency is enforced via an asyncio.Semaphore sized to the host's GPU/CPU worker count, while per-task asyncio.Lock instances ensure thread-safe counter updates.
  • Background processing occurs through asyncio.create_task spawning background_custom_processor, which handles each file with timeout protection and semaphore-guarded execution.
  • Status monitoring is available through get_task_status and exposed via FastAPI endpoints in src/api/tasks.py for frontend polling.
  • Cleanup and cancellation include a periodic scheduler (2-hour default) that removes stale tasks, plus a shutdown method for graceful termination of all background work.

Frequently Asked Questions

How does TaskService limit concurrent file processing?

TaskService uses a global asyncio.Semaphore initialized in src/services/task_service.py (lines 39-43) with a size determined by utils.gpu_detection.get_worker_count. This semaphore is acquired within background_custom_processor before each file processing operation, ensuring that the total number of concurrent file operations across all tasks never exceeds the host's worker capacity.

What happens when I cancel a task in OpenRAG?

When you call cancel_task, the service cancels the underlying asyncio task running background_custom_processor, then atomically updates the task status under a per-task lock. It marks the UploadTask as FAILED, sets any RUNNING or PENDING FileTask instances to FAILED, increments the failed_files counter, and records a cancellation error message. The cleanup scheduler will later remove the task if it remains in a terminal state beyond the configured age threshold.

How does the cleanup scheduler work?

The start_cleanup_scheduler method creates a background task that executes cleanup_old_tasks every 2 hours (configurable via CLEANUP_INTERVAL_SECONDS). It scans self.task_store for tasks with COMPLETED or FAILED status older than max_age_seconds (default 1 hour) and removes them along with their associated locks from self._task_locks, preventing memory accumulation from completed ingestion jobs.

What's the difference between UploadTask and FileTask?

UploadTask (defined in src/models/tasks.py) represents the logical container for an entire ingestion job, containing metadata like user_id, timestamps, and aggregate counters. FileTask represents individual file-level work units within that upload, tracking per-file status (PENDING, RUNNING, COMPLETED, FAILED), error messages, and processing duration. One UploadTask contains multiple FileTask instances, allowing granular progress tracking and partial failure handling.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →