How Langflow Orchestrates RAG Pipelines in OpenRAG: Architecture and Implementation

Langflow serves as the plug-in orchestration layer for OpenRAG, managing document ingestion through TaskService and LangflowFileProcessor, synchronizing MCP servers with LangflowMCPService, and handling chat-time retrieval via ChatService, all while propagating consistent authentication and embedding model headers across every request.

OpenRAG (langflow-ai/openrag) leverages Langflow as its core orchestration engine to transform raw documents into searchable embeddings and deliver chat-style retrieval-augmented responses. The integration relies on a tightly-coupled set of services that handle everything from file uploads to conversation history, ensuring that authentication credentials and model configurations propagate consistently through every stage of the pipeline.

Core Orchestration Components

The orchestration layer consists of seven specialized components that manage distinct phases of the RAG lifecycle.

TaskService and Concurrency Control

Located in src/services/task_service.py, the TaskService creates and schedules upload tasks that feed files to Langflow’s ingestion endpoints. It implements a global semaphore to throttle concurrent processing, preventing overload on both Langflow and Docling services during high-volume ingestion periods.

LangflowFileProcessor for Document Ingestion

The LangflowFileProcessor class in src/models/processors.py serves as the concrete implementation of the TaskProcessor interface. This processor constructs the necessary Langflow headers—including JWT tokens, owner metadata, selected embedding models, and ACLs—and uploads files via the Langflow File API endpoint (/api/v2/files). After successful upload, it optionally triggers an ingest flow to begin vectorization.

LangflowMCPService for Model Control Plane Management

In src/services/langflow_mcp_service.py, the LangflowMCPService manages the Model-Control-Plane (MCP) server configuration. It reads the current MCP server list from Langflow and patches their CLI arguments to inject global-variable headers containing JWT credentials and the selected embedding model. This ensures that downstream model calls for embeddings and LLM inference inherit the same environment variables that Langflow uses internally.

ChatService for Retrieval-Augmented Generation

The chat bridge resides in src/services/chat_service.py, specifically within the ChatService.langflow_chat and ChatService.langflow_nudges_chat methods. These functions build request payloads that include headers for X-LANGFLOW-GLOBAL-VAR-SELECTED_EMBEDDING_MODEL, JWT authentication, and serialized OpenRAG filter JSON. The service forwards prompts to the configured Langflow chat flow ID and supports both streaming and single-payload response modes.

LangflowHistoryService for Conversation Context

The LangflowHistoryService in src/services/langflow_history_service.py retrieves conversation history from Langflow, enabling OpenRAG to surface past chats, function calls, and tool usage. This history merges with OpenRAG’s own metadata to present a unified conversational view across the platform.

Header Utilities and Credential Propagation

The utils.langflow_headers module in src/utils/langflow_headers.py provides the add_provider_credentials_to_headers helper, which injects provider credentials (e.g., OpenAI API keys) as Langflow global variables using the X-LANGFLOW-GLOBAL-VAR-<KEY> pattern. All Langflow-bound calls reuse this helper, guaranteeing consistent credential propagation throughout the system.

Health Checks and Startup Synchronization

Before accepting any ingestion or chat requests, OpenRAG ensures Langflow availability through utils.langflow_utils.wait_for_langflow in src/utils/langflow_utils.py. This utility polls LANGFLOW_URL/health during startup, blocking further operations until the backend returns a 200 response.

End-to-End Orchestration Flow

The RAG pipeline follows a strict five-phase sequence that maintains header consistency from startup through chat completion.

  1. Startup Health Verification – The TUI executes wait_for_langflow to verify that Langflow’s health endpoint responds with HTTP 200 before proceeding.

  2. Document Upload and Ingestion – When users upload documents, ChatService.upload_context_chat or the UI layer calls TaskService.create_langflow_upload_task to instantiate a LangflowFileProcessor. The processor calls LangflowFileService.upload_file with JWT, owner, embedding model, and ACL headers, then optionally triggers the Langflow ingest flow via the configured flow ID.

  3. MCP Server Synchronization – At startup and whenever the embedding model changes, LangflowMCPService.update_mcp_servers_with_global_vars patches every MCP server’s CLI arguments to include SELECTED_EMBEDDING_MODEL and JWT headers, ensuring downstream models receive consistent configuration.

  4. Chat Request Construction – The ChatService.langflow_chat method assembles headers:

    extra_headers = {
        "X-LANGFLOW-GLOBAL-VAR-JWT": jwt_token,
        "X-LANGFLOW-GLOBAL-VAR-SELECTED_EMBEDDING_MODEL": embedding_model,
        # + provider credentials from utils.langflow_headers
    
    }

    It serializes the current OpenRAG filter JSON into X-LANGFLOW-GLOBAL-VAR-OPENRAG-QUERY-FILTER and sends the request to the Langflow chat flow (LANGFLOW_CHAT_FLOW_ID).

  5. Response Processing – Langflow returns the LLM answer along with optional source documents. The service decorates this payload with response_id and sources before forwarding to the frontend.

This architecture enables OpenRAG to switch between local OpenSearch and cloud LLM providers without modifying the core pipeline—only the global-variable headers require updates.

Implementation Examples

The following patterns demonstrate how to interact with the Langflow orchestration layer programmatically.

Creating a Langflow-Backed Upload Task

Use TaskService to schedule document processing through Langflow’s ingestion pipeline:

from services.task_service import TaskService
from services.langflow_file_service import LangflowFileService
from services.session_ownership_service import SessionOwnershipService

async def ingest_file(user_id, file_path, jwt):
    # Langflow‑specific services

    file_service = LangflowFileService()
    session_mgr = SessionOwnershipService()

    task_svc = TaskService(document_service=None)   # DocumentService not used for Langflow uploads

    task_id = await task_svc.create_langflow_upload_task(
        user_id=user_id,
        file_paths=[file_path],
        langflow_file_service=file_service,
        session_manager=session_mgr,
        jwt_token=jwt,
        owner_name="[email protected]",
        owner_email="[email protected]",
    )
    return task_id

Sending Chat Requests with OpenRAG Filters

Invoke the chat flow with properly constructed headers and filters:

from services.chat_service import ChatService
from utils.langflow_headers import add_provider_credentials_to_headers
from config.settings import get_openrag_config, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID

async def ask_langflow(prompt, user_id, jwt):
    chat_svc = ChatService()
    response = await chat_svc.langflow_chat(
        prompt=prompt,
        user_id=user_id,
        jwt_token=jwt,
        stream=False,               # or True for async streaming

    )
    return response

Synchronizing MCP Servers on Model Changes

Update MCP server configurations when switching embedding models:

from services.langflow_mcp_service import LangflowMCPService
from config.settings import get_openrag_config

async def sync_mcp():
    cfg = get_openrag_config()
    global_vars = {
        "selected_embedding_model": cfg.knowledge.embedding_model,
        "jwt": cfg.auth.jwt_token,               # if you keep JWT centrally

    }
    mcp = LangflowMCPService()
    summary = await mcp.update_mcp_servers_with_global_vars(global_vars)
    print("MCP sync:", summary)

Summary

  • TaskService in src/services/task_service.py manages concurrent upload tasks using a global semaphore to prevent system overload.
  • LangflowFileProcessor handles file uploads and optional ingestion triggers via the /api/v2/files endpoint with proper JWT and ACL headers.
  • LangflowMCPService ensures MCP servers inherit global variables for embedding models and authentication through CLI argument patching.
  • ChatService constructs requests with X-LANGFLOW-GLOBAL-VAR-* headers to maintain consistent authentication and filter context during chat operations.
  • Header utilities in src/utils/langflow_headers.py provide a single source of truth for credential propagation across all Langflow interactions.

Frequently Asked Questions

How does OpenRAG handle authentication when communicating with Langflow?

OpenRAG injects JWT tokens and provider credentials into every request using the X-LANGFLOW-GLOBAL-VAR-JWT header and the add_provider_credentials_to_headers utility from src/utils/langflow_headers.py. This ensures that all upstream Langflow flows and MCP servers receive consistent authentication context without manual header construction in individual service calls.

What mechanism prevents OpenRAG from overwhelming Langflow during bulk uploads?

The TaskService implements a global semaphore in src/services/task_service.py that limits concurrent upload tasks. This concurrency control protects both Langflow and Docling from resource exhaustion when processing large batches of documents.

Can OpenRAG switch between different embedding models without restarting?

Yes. When the embedding model configuration changes, LangflowMCPService.update_mcp_servers_with_global_vars in src/services/langflow_mcp_service.py dynamically patches all MCP server CLI arguments to inject the new SELECTED_EMBEDDING_MODEL value. Chat requests also update the X-LANGFLOW-GLOBAL-VAR-SELECTED_EMBEDDING_MODEL header, allowing runtime model switching without service restarts.

Where does OpenRAG store conversation history when using Langflow orchestration?

The LangflowHistoryService in src/services/langflow_history_service.py retrieves conversation history directly from Langflow. This service merges Langflow’s chat records with OpenRAG’s metadata to present a unified view of past interactions, function calls, and tool usage across the platform.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →