How to Implement Chat Functionality with ChatService in OpenRAG
To implement chat functionality with ChatService in OpenRAG, instantiate the service via FastAPI dependencies and invoke langflow_chat() for Langflow-driven conversations or chat() for direct LLM calls, supporting both synchronous and streaming response modes.
The langflow-ai/openrag repository provides a production-ready ChatService that encapsulates all chat interaction logic, from simple LLM calls to complex retrieval-augmented conversations orchestrated through Langflow. This async-first service integrates with FastAPI to expose both standard JSON endpoints and real-time streaming via Server-Sent Events (SSE).
ChatService Architecture and Initialization
Service Bootstrapping in the Application Layer
The ChatService is instantiated as a singleton during application startup. In src/main.py, the initialize_services() function (lines 618-682) creates the service instance and registers it in the global services registry:
chat_service = ChatService()
services["chat_service"] = chat_service
This pattern ensures a single service instance handles all chat operations throughout the application lifecycle, maintaining connection pools to Langflow and the LLM providers.
Dependency Injection for FastAPI Routes
To expose the service to HTTP handlers, src/dependencies.py defines get_chat_service (lines 44-47) as a FastAPI dependency:
def get_chat_service():
return services["chat_service"]
This dependency is injected into the route handlers in src/api/v1/chat.py, which implement the public REST API at the /v1/chat endpoint.
Core Service Methods and the Agent Layer
The core implementation resides in src/services/chat_service.py (lines 10-190), which provides three high-level entry points:
chat()– Executes raw LLM calls via the patched OpenAI clientlangflow_chat()– Routes conversations through a configured Langflow flow (the default for public API interactions)langflow_nudges_chat()– Specialized handler for nudges-specific flows
These methods delegate to low-level async wrappers in src/agent.py (lines 347-695), such as async_chat, async_langflow_chat, and async_langflow_chat_stream, which perform the actual HTTP calls to Langflow or the LLM provider.
Implementing Standard Chat Requests
Request Flow Architecture
A typical non-streaming chat request flows through these layers:
- API Reception –
chat_create_endpointinsrc/api/v1/chat.pyreceives the HTTP POST request and parses the body into aChatV1Bodymodel - Context Setup – The endpoint stores JWT tokens, search filters, and score thresholds using
auth_context.set_auth_context,set_search_filters,set_search_limit, andset_score_threshold - Service Invocation – The handler calls
chat_service.langflow_chat()(lines 49-78 insrc/services/chat_service.py) - Header Construction – The service builds extra HTTP headers containing the JWT, selected embedding model, and provider credentials
- Filter Expression – It constructs a filter expression from the current search context (lines 90-130) for retrieval-augmented generation
- Langflow Execution – The Langflow client (initialized via
clients.ensure_langflow_client()) sends the request to the flow configured inLANGFLOW_CHAT_FLOW_ID - Response Packaging –
async_langflow_chatinsrc/agent.pyreturnsresponse_text,response_id, andsources, which the service packages into a dict containing the final response and citation metadata
Direct Python Implementation
To use ChatService directly within Python code:
from src.services.chat_service import ChatService
import asyncio
async def run_conversation():
service = ChatService()
# Langflow-driven chat (default behavior)
result = await service.langflow_chat(
prompt="Explain the OpenRAG architecture",
user_id="user-123",
jwt_token="eyJhbGciOiJIUzI1NiIs...",
stream=False,
)
print("Response:", result["response"])
print("Sources:", result.get("sources", []))
print("Chat ID:", result["response_id"])
asyncio.run(run_conversation())
All ChatService methods are async and must be awaited. The langflow_chat() method automatically handles retrieval context injection and credential forwarding to the Langflow backend.
HTTP API Implementation
For external clients, the public REST API accepts JSON payloads:
curl -X POST https://your-openrag-instance.com/v1/chat \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"message": "Summarize the latest Langflow release notes",
"stream": false,
"filters": {"data_sources": ["github", "docs"]},
"limit": 5,
"score_threshold": 0.2
}'
Response format:
{
"response": "Langflow 0.7.0 introduces a new UI for component configuration...",
"chat_id": "f2b8e9c4-a3d1-4e5b-8c9d-1234567890ab",
"sources": [
{
"filename": "release_notes.md",
"text": "Version 0.7.0 adds support for...",
"score": 0.97,
"mimetype": "text/markdown"
}
]
}
Implementing Streaming Chat Responses
Streaming implementations follow the same initialization path but return async generators instead of complete responses. When stream=True is passed to langflow_chat(), the service returns the generator from async_langflow_chat_stream in src/agent.py.
The API layer transforms these chunks into Server-Sent Events using _transform_stream_to_sse in src/api/v1/chat.py (lines 46-100), which extracts delta text, tool-call sources, and the final chat_id.
Client-Side Streaming Implementation
const eventSource = new EventSource('/v1/chat', {
method: 'POST',
headers: {
'Authorization': 'Bearer YOUR_API_KEY',
'Content-Type': 'application/json'
},
body: JSON.stringify({
message: "Explain how knowledge filters work",
stream: true,
filter_id: "production-filter"
})
});
eventSource.addEventListener('message', (e) => {
const data = JSON.parse(e.data);
if (data.type === 'content') {
// Append incremental text to UI
console.log('Delta:', data.delta);
} else if (data.type === 'sources') {
// Handle retrieved documents
console.log('Sources:', data.sources);
} else if (data.type === 'done') {
// Final message contains chat_id
console.log('Chat ID:', data.chat_id);
eventSource.close();
}
});
The SSE stream emits three event types: content for incremental text, sources for retrieved documents, and done for session metadata.
Retrieving and Managing Conversation History
Accessing Historical Conversations
The service provides two methods for history retrieval:
get_langflow_history()– Merges local conversation metadata with the full conversation data stored in Langflowget_chat_history()– Returns only the in-memory conversation cache
API Endpoints for History
List all conversations for the authenticated user:
curl -H "Authorization: Bearer YOUR_API_KEY" \
https://your-openrag-instance.com/v1/chat
Retrieve a specific conversation thread:
curl -H "Authorization: Bearer YOUR_API_KEY" \
https://your-openrag-instance.com/v1/chat/<CHAT_ID>
Both endpoints invoke the history methods on ChatService and return enriched conversation metadata including timestamps, message counts, and source references.
Summary
- Instantiation –
ChatServiceis created as a singleton insrc/main.py(lines 618-682) and injected viaget_chat_serviceinsrc/dependencies.py - Standard Chat – Use
chat_service.langflow_chat()for RAG-enabled conversations orchat_service.chat()for direct LLM access - Streaming – Pass
stream=Trueto receive an async generator; the HTTP API translates this to SSE via_transform_stream_to_sseinsrc/api/v1/chat.py - Context Management – Set filters, JWT tokens, and score thresholds using the auth context utilities before invoking chat methods
- History – Retrieve conversations via
get_langflow_history()or the REST endpoints at/v1/chat
Frequently Asked Questions
How do I instantiate ChatService outside of the FastAPI application?
Import the class directly from src.services.chat_service and instantiate it with ChatService(). The service initializes its own Langflow client lazily via clients.ensure_langflow_client(), so no additional setup is required for standalone scripts or background workers.
What is the difference between chat() and langflow_chat() methods?
The chat() method (lines 10-48 in src/services/chat_service.py) calls the raw OpenAI-compatible client directly for simple LLM interactions, while langflow_chat() (lines 49-78) routes requests through a Langflow flow ID specified by the LANGFLOW_CHAT_FLOW_ID environment variable, enabling complex retrieval-augmented generation workflows with tool calling.
How does authentication context propagate to the chat service?
The API layer stores JWT tokens and search parameters in thread-local storage using auth_context.set_auth_context and set_search_filters before calling the service. The langflow_chat() method reads this context to construct HTTP headers and filter expressions (lines 90-130), ensuring retrieval tools and Langflow flows receive the correct credentials and search constraints.
Can I use custom retrieval filters when calling ChatService programmatically?
Yes. When calling langflow_chat() directly, the service automatically picks up filters set via set_search_filters() in the auth context. For HTTP requests, include a filters object and optional filter_id, limit, and score_threshold in the JSON payload; these are converted to Langflow-compatible filter expressions before the request is dispatched.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →