Core Services in OpenRAG: Architecture, Functions, and Implementation Guide

OpenRAG organizes its backend into 16 specialized service classes located in src/services/ that handle background ingestion, vector retrieval, conversational AI, Langflow orchestration, and access control, exposing deterministic APIs for the TUI, REST endpoints, and SDK clients.

The langflow-ai/openrag repository implements a modular service-layer architecture where domain logic is isolated into discrete, testable components. These core services in OpenRAG manage the complete lifecycle of retrieval-augmented generation (RAG) workflows—from document upload to chat response—while maintaining strict separation between data ingestion, search, and orchestration concerns.

Task and Document Ingestion Pipeline

OpenRAG’s ingestion layer handles asynchronous file processing, embedding generation, and persistence through three coordinated services.

TaskService

Located in src/services/task_service.py, TaskService orchestrates background upload and ingestion jobs. It tracks progress, handles cancellations, and performs periodic cleanup of stale tasks. Key methods include create_upload_task for initiating uploads, background_custom_processor for handling file-specific logic, get_task_status for polling, and cancel_task for aborting long-running operations. The service also exposes shutdown for graceful termination of worker coroutines.

DocumentService

The DocumentService (src/services/document_service.py) persists raw documents and their vector embeddings after processing. It provides add_documents for bulk ingestion, delete_documents for pruning obsolete data, and get_document/list_documents for retrieval. This service interacts directly with the embedding pipeline to ensure vectors remain synchronized with source text.

LangflowFileService

Specialized for Langflow integration, LangflowFileService (src/services/langflow_file_service.py) manages the upload, parsing, and ingestion of Langflow .json flow definitions via its primary method process_langflow_file.

Vector Search and Knowledge Retrieval

The retrieval layer combines vector database queries with pre-filtering capabilities to return relevant context for LLM prompts.

SearchService

SearchService (src/services/search_service.py) executes vector-based searches against OpenSearch or other configured vector databases. It exposes search for generic queries, vector_search for pure semantic similarity, and hybrid_search for combining dense vectors with BM25 sparse retrieval. Results are formatted for direct consumption by downstream chat services.

KnowledgeFilterService

Before executing retrieval, KnowledgeFilterService (src/services/knowledge_filter_service.py) applies user-defined constraints such as date ranges, tags, or source filters via apply_filters, ensuring only authorized or relevant content enters the context window.

Chat Orchestration and Persistence

Conversational AI workflows rely on two coordinated services to manage dialogue state and generate responses.

ChatService

ChatService (src/services/chat_service.py) provides the high-level “ask-a-question” entry point. Its ask method coordinates retrieval via SearchService, invokes the LLM (selected through ModelsService), and updates conversation history. For streaming responses, use stream_answer, while reset_chat clears session context.

ConversationPersistenceService

ConversationPersistenceService (src/services/conversation_persistence_service.py) stores and restores chat history (messages, metadata) across sessions. It offers save_conversation, load_conversation, and list_conversations to enable long-running dialogue resumption and audit trails.

Langflow Flow Management

For complex RAG pipelines defined as visual flows, OpenRAG provides dedicated services for CRUD operations, remote execution, and audit logging.

FlowsService

FlowsService (src/services/flows_service.py) manages Langflow flow definitions describing retrieval-augmented generation pipelines. It implements list_flows, get_flow, save_flow, and delete_flow for full lifecycle management of flow configurations.

LangflowMCPService

LangflowMCPService (src/services/langflow_mcp_service.py) bridges the OpenRAG backend with the Langflow Micro-Connector-Protocol (MCP) for remote flow execution. Key methods include invoke_flow for triggering remote runs and list_remote_flows for discovery.

LangflowHistoryService

To support debugging and compliance, LangflowHistoryService (src/services/langflow_history_service.py) persists execution history of Langflow runs via record_run and list_runs.

Authentication and Access Control

Security services enforce identity verification and resource ownership across all endpoints.

AuthService

AuthService (src/services/auth_service.py) verifies JSON Web Tokens (JWTs) and extracts user identity via verify_token and get_current_user, enabling role-based access control across the API surface.

APIKeyService

For external SDK and UI clients, APIKeyService (src/services/api_key_service.py) manages API key lifecycle through create_key, validate_key, and revoke_key, providing an alternative authentication mechanism to JWTs.

SessionOwnershipService

SessionOwnershipService (src/services/session_ownership_service.py) maps tasks and chats to specific user sessions, enforcing isolation via assign_owner and check_ownership to prevent cross-tenant data leakage.

System Infrastructure and Connectors

Supporting services handle model configuration, health monitoring, and external data source integration.

ModelsService

ModelsService (src/services/models_service.py) catalogs available LLM and embedding configurations. Use list_models to enumerate providers and get_model_config to retrieve specific model parameters for flow execution.

MonitorService

For observability, MonitorService (src/services/monitor_service.py) emits health-check metrics including CPU, memory, and queue length via collect_metrics, integrating with external monitoring platforms.

ConnectorService

Serving as the base class for external data sources, ConnectorService (src/connectors/service.py) provides a unified interface for connectors such as SharePoint or OneDrive through list_items and fetch_item, enabling enterprise content ingestion.

Service Interaction Workflow

The core services in OpenRAG operate in a coordinated pipeline:

  1. Ingestion: The UI or SDK calls TaskService.create_upload_task, spawning a background coroutine that processes files through DocumentFileProcessor before handing results to DocumentService.
  2. Storage: DocumentService.add_documents persists raw text and embeddings to the vector store.
  3. Retrieval: ChatService.ask invokes SearchService.hybrid_search, optionally pre-filtered by KnowledgeFilterService.apply_filters.
  4. Generation: The retrieved context is passed to the LLM (configured via ModelsService.get_model_config), with the response streamed or returned synchronously.
  5. Persistence: The conversation turn is saved via ConversationPersistenceService.save_conversation.
  6. Flow Execution: For complex pipelines, FlowsService loads definitions, LangflowMCPService.invoke_flow executes remotely, and LangflowHistoryService.record_run logs the outcome.
  7. Security: Every request passes through AuthService.verify_token or APIKeyService.validate_key, with SessionOwnershipService.check_ownership enforcing resource isolation.

Practical Implementation Examples

Creating an Upload Task

from src.services.task_service import TaskService
from src.services.document_service import DocumentService

doc_svc = DocumentService()
task_svc = TaskService(document_service=doc_svc)

# Asynchronously create a task that ingests two local PDFs for user "alice"

task_id = await task_svc.create_upload_task(
    user_id="alice",
    file_paths=["/tmp/report1.pdf", "/tmp/report2.pdf"],
)
print(f"Upload task started – ID: {task_id}")
from src.services.search_service import SearchService
from src.services.models_service import ModelsService

search_svc = SearchService()
models_svc = ModelsService()

# Perform a hybrid search (vector + BM25) against the "default" index

results = await search_svc.hybrid_search(
    query="What are the security implications of using OpenAI models?",
    top_k=5,
    filters={"metadata.source": "docs"},
    model=models_svc.get_model_config(name="openai-gpt-4o-mini"),
)

for hit in results:
    print(f"• {hit['metadata']['title']}: {hit['content'][:120]}...")

Initiating a Chat Session

from src.services.chat_service import ChatService
from src.services.conversation_persistence_service import ConversationPersistenceService
from src.services.models_service import ModelsService

conv_svc = ConversationPersistenceService()
model_svc = ModelsService()
chat_svc = ChatService(conv_svc, model_svc)

# Start a new chat session for user "bob"

session_id = await chat_svc.start_chat(user_id="bob")
answer = await chat_svc.ask(
    session_id=session_id,
    question="Summarize the latest Langflow release notes.",
)

print("Assistant:", answer)

Cancelling a Background Task


# Assume task_id from the previous example belongs to the same user

cancel_success = await task_svc.cancel_task(user_id="alice", task_id=task_id)
print("Cancellation succeeded:" if cancel_success else "Task already finished")

Summary

  • TaskService (src/services/task_service.py) orchestrates asynchronous document ingestion with progress tracking and cancellation support.
  • DocumentService (src/services/document_service.py) handles persistence of raw documents and embeddings.
  • SearchService (src/services/search_service.py) executes vector and hybrid searches against configured databases.
  • ChatService (src/services/chat_service.py) provides the primary conversational interface, coordinating retrieval and LLM invocation.
  • ConversationPersistenceService (src/services/conversation_persistence_service.py) maintains chat history for session resumption.
  • FlowsService and LangflowMCPService manage Langflow pipeline definitions and remote execution via the Micro-Connector-Protocol.
  • AuthService and APIKeyService enforce JWT and API-key authentication across all endpoints.
  • ConnectorService (src/connectors/service.py) abstracts external data sources like SharePoint and OneDrive for enterprise ingestion.

Frequently Asked Questions

What is the primary responsibility of TaskService in OpenRAG?

TaskService manages the complete lifecycle of background ingestion jobs, including creation via create_upload_task, progress monitoring through get_task_status, graceful cancellation with cancel_task, and automated cleanup of stale records via cleanup_old_tasks. It runs as an async service in src/services/task_service.py and coordinates with DocumentService to persist processed files.

How does SearchService differ from DocumentService?

SearchService (src/services/search_service.py) executes read-only queries against vector stores using vector_search or hybrid_search, while DocumentService (src/services/document_service.py) handles write operations like add_documents and delete_documents that modify the underlying storage. SearchService focuses on retrieval algorithms; DocumentService manages data persistence and embedding synchronization.

What role does LangflowMCPService play in flow execution?

LangflowMCPService (src/services/langflow_mcp_service.py) acts as a bridge to the Langflow Micro-Connector-Protocol (MCP), enabling OpenRAG to invoke flows running on remote Langflow instances. Its invoke_flow method triggers execution, while list_remote_flows discovers available pipelines, allowing OpenRAG to orchestrate complex RAG workflows beyond local processing capabilities.

How does OpenRAG handle authentication across services?

Every request passes through either AuthService (src/services/auth_service.py), which validates JWTs via verify_token, or APIKeyService (src/services/api_key_service.py), which manages API keys through validate_key. SessionOwnershipService (src/services/session_ownership_service.py) then enforces resource isolation by checking user-specific ownership of tasks and conversations via check_ownership, ensuring multi-tenant security.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →