# How Langflow Orchestrates RAG Pipelines in OpenRAG: Architecture and Implementation

> Discover how Langflow orchestrates RAG pipelines in OpenRAG. Learn about its architecture for document ingestion, MCP server sync, and chat-time retrieval with consistent authentication.

- Repository: [Langflow/openrag](https://github.com/langflow-ai/openrag)
- Tags: architecture
- Published: 2026-03-13

---

**Langflow serves as the plug-in orchestration layer for OpenRAG, managing document ingestion through TaskService and LangflowFileProcessor, synchronizing MCP servers with LangflowMCPService, and handling chat-time retrieval via ChatService, all while propagating consistent authentication and embedding model headers across every request.**

OpenRAG (langflow-ai/openrag) leverages Langflow as its core orchestration engine to transform raw documents into searchable embeddings and deliver chat-style retrieval-augmented responses. The integration relies on a tightly-coupled set of services that handle everything from file uploads to conversation history, ensuring that authentication credentials and model configurations propagate consistently through every stage of the pipeline.

## Core Orchestration Components

The orchestration layer consists of seven specialized components that manage distinct phases of the RAG lifecycle.

### TaskService and Concurrency Control

Located in [`src/services/task_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/task_service.py), the **TaskService** creates and schedules upload tasks that feed files to Langflow’s ingestion endpoints. It implements a global semaphore to throttle concurrent processing, preventing overload on both Langflow and Docling services during high-volume ingestion periods.

### LangflowFileProcessor for Document Ingestion

The `LangflowFileProcessor` class in [`src/models/processors.py`](https://github.com/langflow-ai/openrag/blob/main/src/models/processors.py) serves as the concrete implementation of the `TaskProcessor` interface. This processor constructs the necessary Langflow headers—including JWT tokens, owner metadata, selected embedding models, and ACLs—and uploads files via the Langflow File API endpoint (`/api/v2/files`). After successful upload, it optionally triggers an ingest flow to begin vectorization.

### LangflowMCPService for Model Control Plane Management

In [`src/services/langflow_mcp_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/langflow_mcp_service.py), the **LangflowMCPService** manages the Model-Control-Plane (MCP) server configuration. It reads the current MCP server list from Langflow and patches their CLI arguments to inject global-variable headers containing JWT credentials and the selected embedding model. This ensures that downstream model calls for embeddings and LLM inference inherit the same environment variables that Langflow uses internally.

### ChatService for Retrieval-Augmented Generation

The chat bridge resides in [`src/services/chat_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/chat_service.py), specifically within the `ChatService.langflow_chat` and `ChatService.langflow_nudges_chat` methods. These functions build request payloads that include headers for `X-LANGFLOW-GLOBAL-VAR-SELECTED_EMBEDDING_MODEL`, JWT authentication, and serialized OpenRAG filter JSON. The service forwards prompts to the configured Langflow chat flow ID and supports both streaming and single-payload response modes.

### LangflowHistoryService for Conversation Context

The `LangflowHistoryService` in [`src/services/langflow_history_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/langflow_history_service.py) retrieves conversation history from Langflow, enabling OpenRAG to surface past chats, function calls, and tool usage. This history merges with OpenRAG’s own metadata to present a unified conversational view across the platform.

### Header Utilities and Credential Propagation

The `utils.langflow_headers` module in [`src/utils/langflow_headers.py`](https://github.com/langflow-ai/openrag/blob/main/src/utils/langflow_headers.py) provides the `add_provider_credentials_to_headers` helper, which injects provider credentials (e.g., OpenAI API keys) as Langflow global variables using the `X-LANGFLOW-GLOBAL-VAR-<KEY>` pattern. All Langflow-bound calls reuse this helper, guaranteeing consistent credential propagation throughout the system.

### Health Checks and Startup Synchronization

Before accepting any ingestion or chat requests, OpenRAG ensures Langflow availability through `utils.langflow_utils.wait_for_langflow` in [`src/utils/langflow_utils.py`](https://github.com/langflow-ai/openrag/blob/main/src/utils/langflow_utils.py). This utility polls `LANGFLOW_URL/health` during startup, blocking further operations until the backend returns a 200 response.

## End-to-End Orchestration Flow

The RAG pipeline follows a strict five-phase sequence that maintains header consistency from startup through chat completion.

1. **Startup Health Verification** – The TUI executes `wait_for_langflow` to verify that Langflow’s health endpoint responds with HTTP 200 before proceeding.

2. **Document Upload and Ingestion** – When users upload documents, `ChatService.upload_context_chat` or the UI layer calls `TaskService.create_langflow_upload_task` to instantiate a `LangflowFileProcessor`. The processor calls `LangflowFileService.upload_file` with JWT, owner, embedding model, and ACL headers, then optionally triggers the Langflow ingest flow via the configured flow ID.

3. **MCP Server Synchronization** – At startup and whenever the embedding model changes, `LangflowMCPService.update_mcp_servers_with_global_vars` patches every MCP server’s CLI arguments to include `SELECTED_EMBEDDING_MODEL` and JWT headers, ensuring downstream models receive consistent configuration.

4. **Chat Request Construction** – The `ChatService.langflow_chat` method assembles headers:
   ```python
   extra_headers = {
       "X-LANGFLOW-GLOBAL-VAR-JWT": jwt_token,
       "X-LANGFLOW-GLOBAL-VAR-SELECTED_EMBEDDING_MODEL": embedding_model,
       # + provider credentials from utils.langflow_headers

   }
   ```

   It serializes the current OpenRAG filter JSON into `X-LANGFLOW-GLOBAL-VAR-OPENRAG-QUERY-FILTER` and sends the request to the Langflow chat flow (`LANGFLOW_CHAT_FLOW_ID`).

5. **Response Processing** – Langflow returns the LLM answer along with optional source documents. The service decorates this payload with `response_id` and `sources` before forwarding to the frontend.

This architecture enables OpenRAG to switch between local OpenSearch and cloud LLM providers without modifying the core pipeline—only the global-variable headers require updates.

## Implementation Examples

The following patterns demonstrate how to interact with the Langflow orchestration layer programmatically.

### Creating a Langflow-Backed Upload Task

Use `TaskService` to schedule document processing through Langflow’s ingestion pipeline:

```python
from services.task_service import TaskService
from services.langflow_file_service import LangflowFileService
from services.session_ownership_service import SessionOwnershipService

async def ingest_file(user_id, file_path, jwt):
    # Langflow‑specific services

    file_service = LangflowFileService()
    session_mgr = SessionOwnershipService()

    task_svc = TaskService(document_service=None)   # DocumentService not used for Langflow uploads

    task_id = await task_svc.create_langflow_upload_task(
        user_id=user_id,
        file_paths=[file_path],
        langflow_file_service=file_service,
        session_manager=session_mgr,
        jwt_token=jwt,
        owner_name="alice@example.com",
        owner_email="alice@example.com",
    )
    return task_id

```

### Sending Chat Requests with OpenRAG Filters

Invoke the chat flow with properly constructed headers and filters:

```python
from services.chat_service import ChatService
from utils.langflow_headers import add_provider_credentials_to_headers
from config.settings import get_openrag_config, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID

async def ask_langflow(prompt, user_id, jwt):
    chat_svc = ChatService()
    response = await chat_svc.langflow_chat(
        prompt=prompt,
        user_id=user_id,
        jwt_token=jwt,
        stream=False,               # or True for async streaming

    )
    return response

```

### Synchronizing MCP Servers on Model Changes

Update MCP server configurations when switching embedding models:

```python
from services.langflow_mcp_service import LangflowMCPService
from config.settings import get_openrag_config

async def sync_mcp():
    cfg = get_openrag_config()
    global_vars = {
        "selected_embedding_model": cfg.knowledge.embedding_model,
        "jwt": cfg.auth.jwt_token,               # if you keep JWT centrally

    }
    mcp = LangflowMCPService()
    summary = await mcp.update_mcp_servers_with_global_vars(global_vars)
    print("MCP sync:", summary)

```

## Summary

- **TaskService** in [`src/services/task_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/task_service.py) manages concurrent upload tasks using a global semaphore to prevent system overload.
- **LangflowFileProcessor** handles file uploads and optional ingestion triggers via the `/api/v2/files` endpoint with proper JWT and ACL headers.
- **LangflowMCPService** ensures MCP servers inherit global variables for embedding models and authentication through CLI argument patching.
- **ChatService** constructs requests with `X-LANGFLOW-GLOBAL-VAR-*` headers to maintain consistent authentication and filter context during chat operations.
- **Header utilities** in [`src/utils/langflow_headers.py`](https://github.com/langflow-ai/openrag/blob/main/src/utils/langflow_headers.py) provide a single source of truth for credential propagation across all Langflow interactions.

## Frequently Asked Questions

### How does OpenRAG handle authentication when communicating with Langflow?

OpenRAG injects JWT tokens and provider credentials into every request using the `X-LANGFLOW-GLOBAL-VAR-JWT` header and the `add_provider_credentials_to_headers` utility from [`src/utils/langflow_headers.py`](https://github.com/langflow-ai/openrag/blob/main/src/utils/langflow_headers.py). This ensures that all upstream Langflow flows and MCP servers receive consistent authentication context without manual header construction in individual service calls.

### What mechanism prevents OpenRAG from overwhelming Langflow during bulk uploads?

The **TaskService** implements a global semaphore in [`src/services/task_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/task_service.py) that limits concurrent upload tasks. This concurrency control protects both Langflow and Docling from resource exhaustion when processing large batches of documents.

### Can OpenRAG switch between different embedding models without restarting?

Yes. When the embedding model configuration changes, `LangflowMCPService.update_mcp_servers_with_global_vars` in [`src/services/langflow_mcp_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/langflow_mcp_service.py) dynamically patches all MCP server CLI arguments to inject the new `SELECTED_EMBEDDING_MODEL` value. Chat requests also update the `X-LANGFLOW-GLOBAL-VAR-SELECTED_EMBEDDING_MODEL` header, allowing runtime model switching without service restarts.

### Where does OpenRAG store conversation history when using Langflow orchestration?

The `LangflowHistoryService` in [`src/services/langflow_history_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/langflow_history_service.py) retrieves conversation history directly from Langflow. This service merges Langflow’s chat records with OpenRAG’s metadata to present a unified view of past interactions, function calls, and tool usage across the platform.