How to Implement Chat Functionality with ChatService in OpenRAG

To implement chat functionality with ChatService in OpenRAG, instantiate the service via FastAPI dependencies and invoke langflow_chat() for Langflow-driven conversations or chat() for direct LLM calls, supporting both synchronous and streaming response modes.

The langflow-ai/openrag repository provides a production-ready ChatService that encapsulates all chat interaction logic, from simple LLM calls to complex retrieval-augmented conversations orchestrated through Langflow. This async-first service integrates with FastAPI to expose both standard JSON endpoints and real-time streaming via Server-Sent Events (SSE).

ChatService Architecture and Initialization

Service Bootstrapping in the Application Layer

The ChatService is instantiated as a singleton during application startup. In src/main.py, the initialize_services() function (lines 618-682) creates the service instance and registers it in the global services registry:

chat_service = ChatService()
services["chat_service"] = chat_service

This pattern ensures a single service instance handles all chat operations throughout the application lifecycle, maintaining connection pools to Langflow and the LLM providers.

Dependency Injection for FastAPI Routes

To expose the service to HTTP handlers, src/dependencies.py defines get_chat_service (lines 44-47) as a FastAPI dependency:

def get_chat_service():
    return services["chat_service"]

This dependency is injected into the route handlers in src/api/v1/chat.py, which implement the public REST API at the /v1/chat endpoint.

Core Service Methods and the Agent Layer

The core implementation resides in src/services/chat_service.py (lines 10-190), which provides three high-level entry points:

  • chat() – Executes raw LLM calls via the patched OpenAI client
  • langflow_chat() – Routes conversations through a configured Langflow flow (the default for public API interactions)
  • langflow_nudges_chat() – Specialized handler for nudges-specific flows

These methods delegate to low-level async wrappers in src/agent.py (lines 347-695), such as async_chat, async_langflow_chat, and async_langflow_chat_stream, which perform the actual HTTP calls to Langflow or the LLM provider.

Implementing Standard Chat Requests

Request Flow Architecture

A typical non-streaming chat request flows through these layers:

  1. API Receptionchat_create_endpoint in src/api/v1/chat.py receives the HTTP POST request and parses the body into a ChatV1Body model
  2. Context Setup – The endpoint stores JWT tokens, search filters, and score thresholds using auth_context.set_auth_context, set_search_filters, set_search_limit, and set_score_threshold
  3. Service Invocation – The handler calls chat_service.langflow_chat() (lines 49-78 in src/services/chat_service.py)
  4. Header Construction – The service builds extra HTTP headers containing the JWT, selected embedding model, and provider credentials
  5. Filter Expression – It constructs a filter expression from the current search context (lines 90-130) for retrieval-augmented generation
  6. Langflow Execution – The Langflow client (initialized via clients.ensure_langflow_client()) sends the request to the flow configured in LANGFLOW_CHAT_FLOW_ID
  7. Response Packagingasync_langflow_chat in src/agent.py returns response_text, response_id, and sources, which the service packages into a dict containing the final response and citation metadata

Direct Python Implementation

To use ChatService directly within Python code:

from src.services.chat_service import ChatService
import asyncio

async def run_conversation():
    service = ChatService()
    
    # Langflow-driven chat (default behavior)

    result = await service.langflow_chat(
        prompt="Explain the OpenRAG architecture",
        user_id="user-123",
        jwt_token="eyJhbGciOiJIUzI1NiIs...",
        stream=False,
    )
    
    print("Response:", result["response"])
    print("Sources:", result.get("sources", []))
    print("Chat ID:", result["response_id"])

asyncio.run(run_conversation())

All ChatService methods are async and must be awaited. The langflow_chat() method automatically handles retrieval context injection and credential forwarding to the Langflow backend.

HTTP API Implementation

For external clients, the public REST API accepts JSON payloads:

curl -X POST https://your-openrag-instance.com/v1/chat \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
        "message": "Summarize the latest Langflow release notes",
        "stream": false,
        "filters": {"data_sources": ["github", "docs"]},
        "limit": 5,
        "score_threshold": 0.2
      }'

Response format:

{
  "response": "Langflow 0.7.0 introduces a new UI for component configuration...",
  "chat_id": "f2b8e9c4-a3d1-4e5b-8c9d-1234567890ab",
  "sources": [
    {
      "filename": "release_notes.md",
      "text": "Version 0.7.0 adds support for...",
      "score": 0.97,
      "mimetype": "text/markdown"
    }
  ]
}

Implementing Streaming Chat Responses

Streaming implementations follow the same initialization path but return async generators instead of complete responses. When stream=True is passed to langflow_chat(), the service returns the generator from async_langflow_chat_stream in src/agent.py.

The API layer transforms these chunks into Server-Sent Events using _transform_stream_to_sse in src/api/v1/chat.py (lines 46-100), which extracts delta text, tool-call sources, and the final chat_id.

Client-Side Streaming Implementation

const eventSource = new EventSource('/v1/chat', {
  method: 'POST',
  headers: {
    'Authorization': 'Bearer YOUR_API_KEY',
    'Content-Type': 'application/json'
  },
  body: JSON.stringify({
    message: "Explain how knowledge filters work",
    stream: true,
    filter_id: "production-filter"
  })
});

eventSource.addEventListener('message', (e) => {
  const data = JSON.parse(e.data);
  
  if (data.type === 'content') {
    // Append incremental text to UI
    console.log('Delta:', data.delta);
  } else if (data.type === 'sources') {
    // Handle retrieved documents
    console.log('Sources:', data.sources);
  } else if (data.type === 'done') {
    // Final message contains chat_id
    console.log('Chat ID:', data.chat_id);
    eventSource.close();
  }
});

The SSE stream emits three event types: content for incremental text, sources for retrieved documents, and done for session metadata.

Retrieving and Managing Conversation History

Accessing Historical Conversations

The service provides two methods for history retrieval:

  • get_langflow_history() – Merges local conversation metadata with the full conversation data stored in Langflow
  • get_chat_history() – Returns only the in-memory conversation cache

API Endpoints for History

List all conversations for the authenticated user:

curl -H "Authorization: Bearer YOUR_API_KEY" \
  https://your-openrag-instance.com/v1/chat

Retrieve a specific conversation thread:

curl -H "Authorization: Bearer YOUR_API_KEY" \
  https://your-openrag-instance.com/v1/chat/<CHAT_ID>

Both endpoints invoke the history methods on ChatService and return enriched conversation metadata including timestamps, message counts, and source references.

Summary

  • InstantiationChatService is created as a singleton in src/main.py (lines 618-682) and injected via get_chat_service in src/dependencies.py
  • Standard Chat – Use chat_service.langflow_chat() for RAG-enabled conversations or chat_service.chat() for direct LLM access
  • Streaming – Pass stream=True to receive an async generator; the HTTP API translates this to SSE via _transform_stream_to_sse in src/api/v1/chat.py
  • Context Management – Set filters, JWT tokens, and score thresholds using the auth context utilities before invoking chat methods
  • History – Retrieve conversations via get_langflow_history() or the REST endpoints at /v1/chat

Frequently Asked Questions

How do I instantiate ChatService outside of the FastAPI application?

Import the class directly from src.services.chat_service and instantiate it with ChatService(). The service initializes its own Langflow client lazily via clients.ensure_langflow_client(), so no additional setup is required for standalone scripts or background workers.

What is the difference between chat() and langflow_chat() methods?

The chat() method (lines 10-48 in src/services/chat_service.py) calls the raw OpenAI-compatible client directly for simple LLM interactions, while langflow_chat() (lines 49-78) routes requests through a Langflow flow ID specified by the LANGFLOW_CHAT_FLOW_ID environment variable, enabling complex retrieval-augmented generation workflows with tool calling.

How does authentication context propagate to the chat service?

The API layer stores JWT tokens and search parameters in thread-local storage using auth_context.set_auth_context and set_search_filters before calling the service. The langflow_chat() method reads this context to construct HTTP headers and filter expressions (lines 90-130), ensuring retrieval tools and Langflow flows receive the correct credentials and search constraints.

Can I use custom retrieval filters when calling ChatService programmatically?

Yes. When calling langflow_chat() directly, the service automatically picks up filters set via set_search_filters() in the auth context. For HTTP requests, include a filters object and optional filter_id, limit, and score_threshold in the JSON payload; these are converted to Langflow-compatible filter expressions before the request is dispatched.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →