Core Services in OpenRAG: Architecture, Functions, and Implementation Guide
OpenRAG organizes its backend into 16 specialized service classes located in src/services/ that handle background ingestion, vector retrieval, conversational AI, Langflow orchestration, and access control, exposing deterministic APIs for the TUI, REST endpoints, and SDK clients.
The langflow-ai/openrag repository implements a modular service-layer architecture where domain logic is isolated into discrete, testable components. These core services in OpenRAG manage the complete lifecycle of retrieval-augmented generation (RAG) workflows—from document upload to chat response—while maintaining strict separation between data ingestion, search, and orchestration concerns.
Task and Document Ingestion Pipeline
OpenRAG’s ingestion layer handles asynchronous file processing, embedding generation, and persistence through three coordinated services.
TaskService
Located in src/services/task_service.py, TaskService orchestrates background upload and ingestion jobs. It tracks progress, handles cancellations, and performs periodic cleanup of stale tasks. Key methods include create_upload_task for initiating uploads, background_custom_processor for handling file-specific logic, get_task_status for polling, and cancel_task for aborting long-running operations. The service also exposes shutdown for graceful termination of worker coroutines.
DocumentService
The DocumentService (src/services/document_service.py) persists raw documents and their vector embeddings after processing. It provides add_documents for bulk ingestion, delete_documents for pruning obsolete data, and get_document/list_documents for retrieval. This service interacts directly with the embedding pipeline to ensure vectors remain synchronized with source text.
LangflowFileService
Specialized for Langflow integration, LangflowFileService (src/services/langflow_file_service.py) manages the upload, parsing, and ingestion of Langflow .json flow definitions via its primary method process_langflow_file.
Vector Search and Knowledge Retrieval
The retrieval layer combines vector database queries with pre-filtering capabilities to return relevant context for LLM prompts.
SearchService
SearchService (src/services/search_service.py) executes vector-based searches against OpenSearch or other configured vector databases. It exposes search for generic queries, vector_search for pure semantic similarity, and hybrid_search for combining dense vectors with BM25 sparse retrieval. Results are formatted for direct consumption by downstream chat services.
KnowledgeFilterService
Before executing retrieval, KnowledgeFilterService (src/services/knowledge_filter_service.py) applies user-defined constraints such as date ranges, tags, or source filters via apply_filters, ensuring only authorized or relevant content enters the context window.
Chat Orchestration and Persistence
Conversational AI workflows rely on two coordinated services to manage dialogue state and generate responses.
ChatService
ChatService (src/services/chat_service.py) provides the high-level “ask-a-question” entry point. Its ask method coordinates retrieval via SearchService, invokes the LLM (selected through ModelsService), and updates conversation history. For streaming responses, use stream_answer, while reset_chat clears session context.
ConversationPersistenceService
ConversationPersistenceService (src/services/conversation_persistence_service.py) stores and restores chat history (messages, metadata) across sessions. It offers save_conversation, load_conversation, and list_conversations to enable long-running dialogue resumption and audit trails.
Langflow Flow Management
For complex RAG pipelines defined as visual flows, OpenRAG provides dedicated services for CRUD operations, remote execution, and audit logging.
FlowsService
FlowsService (src/services/flows_service.py) manages Langflow flow definitions describing retrieval-augmented generation pipelines. It implements list_flows, get_flow, save_flow, and delete_flow for full lifecycle management of flow configurations.
LangflowMCPService
LangflowMCPService (src/services/langflow_mcp_service.py) bridges the OpenRAG backend with the Langflow Micro-Connector-Protocol (MCP) for remote flow execution. Key methods include invoke_flow for triggering remote runs and list_remote_flows for discovery.
LangflowHistoryService
To support debugging and compliance, LangflowHistoryService (src/services/langflow_history_service.py) persists execution history of Langflow runs via record_run and list_runs.
Authentication and Access Control
Security services enforce identity verification and resource ownership across all endpoints.
AuthService
AuthService (src/services/auth_service.py) verifies JSON Web Tokens (JWTs) and extracts user identity via verify_token and get_current_user, enabling role-based access control across the API surface.
APIKeyService
For external SDK and UI clients, APIKeyService (src/services/api_key_service.py) manages API key lifecycle through create_key, validate_key, and revoke_key, providing an alternative authentication mechanism to JWTs.
SessionOwnershipService
SessionOwnershipService (src/services/session_ownership_service.py) maps tasks and chats to specific user sessions, enforcing isolation via assign_owner and check_ownership to prevent cross-tenant data leakage.
System Infrastructure and Connectors
Supporting services handle model configuration, health monitoring, and external data source integration.
ModelsService
ModelsService (src/services/models_service.py) catalogs available LLM and embedding configurations. Use list_models to enumerate providers and get_model_config to retrieve specific model parameters for flow execution.
MonitorService
For observability, MonitorService (src/services/monitor_service.py) emits health-check metrics including CPU, memory, and queue length via collect_metrics, integrating with external monitoring platforms.
ConnectorService
Serving as the base class for external data sources, ConnectorService (src/connectors/service.py) provides a unified interface for connectors such as SharePoint or OneDrive through list_items and fetch_item, enabling enterprise content ingestion.
Service Interaction Workflow
The core services in OpenRAG operate in a coordinated pipeline:
- Ingestion: The UI or SDK calls
TaskService.create_upload_task, spawning a background coroutine that processes files throughDocumentFileProcessorbefore handing results toDocumentService. - Storage:
DocumentService.add_documentspersists raw text and embeddings to the vector store. - Retrieval:
ChatService.askinvokesSearchService.hybrid_search, optionally pre-filtered byKnowledgeFilterService.apply_filters. - Generation: The retrieved context is passed to the LLM (configured via
ModelsService.get_model_config), with the response streamed or returned synchronously. - Persistence: The conversation turn is saved via
ConversationPersistenceService.save_conversation. - Flow Execution: For complex pipelines,
FlowsServiceloads definitions,LangflowMCPService.invoke_flowexecutes remotely, andLangflowHistoryService.record_runlogs the outcome. - Security: Every request passes through
AuthService.verify_tokenorAPIKeyService.validate_key, withSessionOwnershipService.check_ownershipenforcing resource isolation.
Practical Implementation Examples
Creating an Upload Task
from src.services.task_service import TaskService
from src.services.document_service import DocumentService
doc_svc = DocumentService()
task_svc = TaskService(document_service=doc_svc)
# Asynchronously create a task that ingests two local PDFs for user "alice"
task_id = await task_svc.create_upload_task(
user_id="alice",
file_paths=["/tmp/report1.pdf", "/tmp/report2.pdf"],
)
print(f"Upload task started – ID: {task_id}")
Executing Hybrid Vector Search
from src.services.search_service import SearchService
from src.services.models_service import ModelsService
search_svc = SearchService()
models_svc = ModelsService()
# Perform a hybrid search (vector + BM25) against the "default" index
results = await search_svc.hybrid_search(
query="What are the security implications of using OpenAI models?",
top_k=5,
filters={"metadata.source": "docs"},
model=models_svc.get_model_config(name="openai-gpt-4o-mini"),
)
for hit in results:
print(f"• {hit['metadata']['title']}: {hit['content'][:120]}...")
Initiating a Chat Session
from src.services.chat_service import ChatService
from src.services.conversation_persistence_service import ConversationPersistenceService
from src.services.models_service import ModelsService
conv_svc = ConversationPersistenceService()
model_svc = ModelsService()
chat_svc = ChatService(conv_svc, model_svc)
# Start a new chat session for user "bob"
session_id = await chat_svc.start_chat(user_id="bob")
answer = await chat_svc.ask(
session_id=session_id,
question="Summarize the latest Langflow release notes.",
)
print("Assistant:", answer)
Cancelling a Background Task
# Assume task_id from the previous example belongs to the same user
cancel_success = await task_svc.cancel_task(user_id="alice", task_id=task_id)
print("Cancellation succeeded:" if cancel_success else "Task already finished")
Summary
- TaskService (
src/services/task_service.py) orchestrates asynchronous document ingestion with progress tracking and cancellation support. - DocumentService (
src/services/document_service.py) handles persistence of raw documents and embeddings. - SearchService (
src/services/search_service.py) executes vector and hybrid searches against configured databases. - ChatService (
src/services/chat_service.py) provides the primary conversational interface, coordinating retrieval and LLM invocation. - ConversationPersistenceService (
src/services/conversation_persistence_service.py) maintains chat history for session resumption. - FlowsService and LangflowMCPService manage Langflow pipeline definitions and remote execution via the Micro-Connector-Protocol.
- AuthService and APIKeyService enforce JWT and API-key authentication across all endpoints.
- ConnectorService (
src/connectors/service.py) abstracts external data sources like SharePoint and OneDrive for enterprise ingestion.
Frequently Asked Questions
What is the primary responsibility of TaskService in OpenRAG?
TaskService manages the complete lifecycle of background ingestion jobs, including creation via create_upload_task, progress monitoring through get_task_status, graceful cancellation with cancel_task, and automated cleanup of stale records via cleanup_old_tasks. It runs as an async service in src/services/task_service.py and coordinates with DocumentService to persist processed files.
How does SearchService differ from DocumentService?
SearchService (src/services/search_service.py) executes read-only queries against vector stores using vector_search or hybrid_search, while DocumentService (src/services/document_service.py) handles write operations like add_documents and delete_documents that modify the underlying storage. SearchService focuses on retrieval algorithms; DocumentService manages data persistence and embedding synchronization.
What role does LangflowMCPService play in flow execution?
LangflowMCPService (src/services/langflow_mcp_service.py) acts as a bridge to the Langflow Micro-Connector-Protocol (MCP), enabling OpenRAG to invoke flows running on remote Langflow instances. Its invoke_flow method triggers execution, while list_remote_flows discovers available pipelines, allowing OpenRAG to orchestrate complex RAG workflows beyond local processing capabilities.
How does OpenRAG handle authentication across services?
Every request passes through either AuthService (src/services/auth_service.py), which validates JWTs via verify_token, or APIKeyService (src/services/api_key_service.py), which manages API keys through validate_key. SessionOwnershipService (src/services/session_ownership_service.py) then enforces resource isolation by checking user-specific ownership of tasks and conversations via check_ownership, ensuring multi-tenant security.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →