# Core Services in OpenRAG: Architecture, Functions, and Implementation Guide

> Explore OpenRAG core services: learn about ingestion, vector retrieval, AI, Langflow orchestration, and access control within its architecture and implementation to build robust RAG applications.

- Repository: [Langflow/openrag](https://github.com/langflow-ai/openrag)
- Tags: architecture
- Published: 2026-03-13

---

**OpenRAG organizes its backend into 16 specialized service classes located in `src/services/` that handle background ingestion, vector retrieval, conversational AI, Langflow orchestration, and access control, exposing deterministic APIs for the TUI, REST endpoints, and SDK clients.**

The `langflow-ai/openrag` repository implements a modular service-layer architecture where domain logic is isolated into discrete, testable components. These **core services in OpenRAG** manage the complete lifecycle of retrieval-augmented generation (RAG) workflows—from document upload to chat response—while maintaining strict separation between data ingestion, search, and orchestration concerns.

## Task and Document Ingestion Pipeline

OpenRAG’s ingestion layer handles asynchronous file processing, embedding generation, and persistence through three coordinated services.

### TaskService

Located in [`src/services/task_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/task_service.py), **TaskService** orchestrates background upload and ingestion jobs. It tracks progress, handles cancellations, and performs periodic cleanup of stale tasks. Key methods include `create_upload_task` for initiating uploads, `background_custom_processor` for handling file-specific logic, `get_task_status` for polling, and `cancel_task` for aborting long-running operations. The service also exposes `shutdown` for graceful termination of worker coroutines.

### DocumentService

The **DocumentService** ([`src/services/document_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/document_service.py)) persists raw documents and their vector embeddings after processing. It provides `add_documents` for bulk ingestion, `delete_documents` for pruning obsolete data, and `get_document`/`list_documents` for retrieval. This service interacts directly with the embedding pipeline to ensure vectors remain synchronized with source text.

### LangflowFileService

Specialized for Langflow integration, **LangflowFileService** ([`src/services/langflow_file_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/langflow_file_service.py)) manages the upload, parsing, and ingestion of Langflow `.json` flow definitions via its primary method `process_langflow_file`.

## Vector Search and Knowledge Retrieval

The retrieval layer combines vector database queries with pre-filtering capabilities to return relevant context for LLM prompts.

### SearchService

**SearchService** ([`src/services/search_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/search_service.py)) executes vector-based searches against OpenSearch or other configured vector databases. It exposes `search` for generic queries, `vector_search` for pure semantic similarity, and `hybrid_search` for combining dense vectors with BM25 sparse retrieval. Results are formatted for direct consumption by downstream chat services.

### KnowledgeFilterService

Before executing retrieval, **KnowledgeFilterService** ([`src/services/knowledge_filter_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/knowledge_filter_service.py)) applies user-defined constraints such as date ranges, tags, or source filters via `apply_filters`, ensuring only authorized or relevant content enters the context window.

## Chat Orchestration and Persistence

Conversational AI workflows rely on two coordinated services to manage dialogue state and generate responses.

### ChatService

**ChatService** ([`src/services/chat_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/chat_service.py)) provides the high-level “ask-a-question” entry point. Its `ask` method coordinates retrieval via SearchService, invokes the LLM (selected through ModelsService), and updates conversation history. For streaming responses, use `stream_answer`, while `reset_chat` clears session context.

### ConversationPersistenceService

**ConversationPersistenceService** ([`src/services/conversation_persistence_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/conversation_persistence_service.py)) stores and restores chat history (messages, metadata) across sessions. It offers `save_conversation`, `load_conversation`, and `list_conversations` to enable long-running dialogue resumption and audit trails.

## Langflow Flow Management

For complex RAG pipelines defined as visual flows, OpenRAG provides dedicated services for CRUD operations, remote execution, and audit logging.

### FlowsService

**FlowsService** ([`src/services/flows_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/flows_service.py)) manages Langflow flow definitions describing retrieval-augmented generation pipelines. It implements `list_flows`, `get_flow`, `save_flow`, and `delete_flow` for full lifecycle management of flow configurations.

### LangflowMCPService

**LangflowMCPService** ([`src/services/langflow_mcp_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/langflow_mcp_service.py)) bridges the OpenRAG backend with the Langflow Micro-Connector-Protocol (MCP) for remote flow execution. Key methods include `invoke_flow` for triggering remote runs and `list_remote_flows` for discovery.

### LangflowHistoryService

To support debugging and compliance, **LangflowHistoryService** ([`src/services/langflow_history_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/langflow_history_service.py)) persists execution history of Langflow runs via `record_run` and `list_runs`.

## Authentication and Access Control

Security services enforce identity verification and resource ownership across all endpoints.

### AuthService

**AuthService** ([`src/services/auth_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/auth_service.py)) verifies JSON Web Tokens (JWTs) and extracts user identity via `verify_token` and `get_current_user`, enabling role-based access control across the API surface.

### APIKeyService

For external SDK and UI clients, **APIKeyService** ([`src/services/api_key_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/api_key_service.py)) manages API key lifecycle through `create_key`, `validate_key`, and `revoke_key`, providing an alternative authentication mechanism to JWTs.

### SessionOwnershipService

**SessionOwnershipService** ([`src/services/session_ownership_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/session_ownership_service.py)) maps tasks and chats to specific user sessions, enforcing isolation via `assign_owner` and `check_ownership` to prevent cross-tenant data leakage.

## System Infrastructure and Connectors

Supporting services handle model configuration, health monitoring, and external data source integration.

### ModelsService

**ModelsService** ([`src/services/models_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/models_service.py)) catalogs available LLM and embedding configurations. Use `list_models` to enumerate providers and `get_model_config` to retrieve specific model parameters for flow execution.

### MonitorService

For observability, **MonitorService** ([`src/services/monitor_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/monitor_service.py)) emits health-check metrics including CPU, memory, and queue length via `collect_metrics`, integrating with external monitoring platforms.

### ConnectorService

Serving as the base class for external data sources, **ConnectorService** ([`src/connectors/service.py`](https://github.com/langflow-ai/openrag/blob/main/src/connectors/service.py)) provides a unified interface for connectors such as SharePoint or OneDrive through `list_items` and `fetch_item`, enabling enterprise content ingestion.

## Service Interaction Workflow

The **core services in OpenRAG** operate in a coordinated pipeline:

1. **Ingestion**: The UI or SDK calls `TaskService.create_upload_task`, spawning a background coroutine that processes files through `DocumentFileProcessor` before handing results to `DocumentService`.
2. **Storage**: `DocumentService.add_documents` persists raw text and embeddings to the vector store.
3. **Retrieval**: `ChatService.ask` invokes `SearchService.hybrid_search`, optionally pre-filtered by `KnowledgeFilterService.apply_filters`.
4. **Generation**: The retrieved context is passed to the LLM (configured via `ModelsService.get_model_config`), with the response streamed or returned synchronously.
5. **Persistence**: The conversation turn is saved via `ConversationPersistenceService.save_conversation`.
6. **Flow Execution**: For complex pipelines, `FlowsService` loads definitions, `LangflowMCPService.invoke_flow` executes remotely, and `LangflowHistoryService.record_run` logs the outcome.
7. **Security**: Every request passes through `AuthService.verify_token` or `APIKeyService.validate_key`, with `SessionOwnershipService.check_ownership` enforcing resource isolation.

## Practical Implementation Examples

### Creating an Upload Task

```python
from src.services.task_service import TaskService
from src.services.document_service import DocumentService

doc_svc = DocumentService()
task_svc = TaskService(document_service=doc_svc)

# Asynchronously create a task that ingests two local PDFs for user "alice"

task_id = await task_svc.create_upload_task(
    user_id="alice",
    file_paths=["/tmp/report1.pdf", "/tmp/report2.pdf"],
)
print(f"Upload task started – ID: {task_id}")

```

### Executing Hybrid Vector Search

```python
from src.services.search_service import SearchService
from src.services.models_service import ModelsService

search_svc = SearchService()
models_svc = ModelsService()

# Perform a hybrid search (vector + BM25) against the "default" index

results = await search_svc.hybrid_search(
    query="What are the security implications of using OpenAI models?",
    top_k=5,
    filters={"metadata.source": "docs"},
    model=models_svc.get_model_config(name="openai-gpt-4o-mini"),
)

for hit in results:
    print(f"• {hit['metadata']['title']}: {hit['content'][:120]}...")

```

### Initiating a Chat Session

```python
from src.services.chat_service import ChatService
from src.services.conversation_persistence_service import ConversationPersistenceService
from src.services.models_service import ModelsService

conv_svc = ConversationPersistenceService()
model_svc = ModelsService()
chat_svc = ChatService(conv_svc, model_svc)

# Start a new chat session for user "bob"

session_id = await chat_svc.start_chat(user_id="bob")
answer = await chat_svc.ask(
    session_id=session_id,
    question="Summarize the latest Langflow release notes.",
)

print("Assistant:", answer)

```

### Cancelling a Background Task

```python

# Assume task_id from the previous example belongs to the same user

cancel_success = await task_svc.cancel_task(user_id="alice", task_id=task_id)
print("Cancellation succeeded:" if cancel_success else "Task already finished")

```

## Summary

- **TaskService** ([`src/services/task_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/task_service.py)) orchestrates asynchronous document ingestion with progress tracking and cancellation support.
- **DocumentService** ([`src/services/document_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/document_service.py)) handles persistence of raw documents and embeddings.
- **SearchService** ([`src/services/search_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/search_service.py)) executes vector and hybrid searches against configured databases.
- **ChatService** ([`src/services/chat_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/chat_service.py)) provides the primary conversational interface, coordinating retrieval and LLM invocation.
- **ConversationPersistenceService** ([`src/services/conversation_persistence_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/conversation_persistence_service.py)) maintains chat history for session resumption.
- **FlowsService** and **LangflowMCPService** manage Langflow pipeline definitions and remote execution via the Micro-Connector-Protocol.
- **AuthService** and **APIKeyService** enforce JWT and API-key authentication across all endpoints.
- **ConnectorService** ([`src/connectors/service.py`](https://github.com/langflow-ai/openrag/blob/main/src/connectors/service.py)) abstracts external data sources like SharePoint and OneDrive for enterprise ingestion.

## Frequently Asked Questions

### What is the primary responsibility of TaskService in OpenRAG?

**TaskService** manages the complete lifecycle of background ingestion jobs, including creation via `create_upload_task`, progress monitoring through `get_task_status`, graceful cancellation with `cancel_task`, and automated cleanup of stale records via `cleanup_old_tasks`. It runs as an async service in [`src/services/task_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/task_service.py) and coordinates with DocumentService to persist processed files.

### How does SearchService differ from DocumentService?

**SearchService** ([`src/services/search_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/search_service.py)) executes read-only queries against vector stores using `vector_search` or `hybrid_search`, while **DocumentService** ([`src/services/document_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/document_service.py)) handles write operations like `add_documents` and `delete_documents` that modify the underlying storage. SearchService focuses on retrieval algorithms; DocumentService manages data persistence and embedding synchronization.

### What role does LangflowMCPService play in flow execution?

**LangflowMCPService** ([`src/services/langflow_mcp_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/langflow_mcp_service.py)) acts as a bridge to the Langflow Micro-Connector-Protocol (MCP), enabling OpenRAG to invoke flows running on remote Langflow instances. Its `invoke_flow` method triggers execution, while `list_remote_flows` discovers available pipelines, allowing OpenRAG to orchestrate complex RAG workflows beyond local processing capabilities.

### How does OpenRAG handle authentication across services?

Every request passes through either **AuthService** ([`src/services/auth_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/auth_service.py)), which validates JWTs via `verify_token`, or **APIKeyService** ([`src/services/api_key_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/api_key_service.py)), which manages API keys through `validate_key`. **SessionOwnershipService** ([`src/services/session_ownership_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/session_ownership_service.py)) then enforces resource isolation by checking user-specific ownership of tasks and conversations via `check_ownership`, ensuring multi-tenant security.