# How to Implement Chat Functionality with ChatService in OpenRAG

> Learn to implement chat functionality in OpenRAG using ChatService. Instantiate the service and invoke langflow_chat() or chat() for synchronous or streaming LLM responses.

- Repository: [Langflow/openrag](https://github.com/langflow-ai/openrag)
- Tags: how-to-guide
- Published: 2026-03-13

---

**To implement chat functionality with ChatService in OpenRAG, instantiate the service via FastAPI dependencies and invoke `langflow_chat()` for Langflow-driven conversations or `chat()` for direct LLM calls, supporting both synchronous and streaming response modes.**

The langflow-ai/openrag repository provides a production-ready **ChatService** that encapsulates all chat interaction logic, from simple LLM calls to complex retrieval-augmented conversations orchestrated through Langflow. This async-first service integrates with FastAPI to expose both standard JSON endpoints and real-time streaming via Server-Sent Events (SSE).

## ChatService Architecture and Initialization

### Service Bootstrapping in the Application Layer

The `ChatService` is instantiated as a singleton during application startup. In [`src/main.py`](https://github.com/langflow-ai/openrag/blob/main/src/main.py), the `initialize_services()` function (lines 618-682) creates the service instance and registers it in the global services registry:

```python
chat_service = ChatService()
services["chat_service"] = chat_service

```

This pattern ensures a single service instance handles all chat operations throughout the application lifecycle, maintaining connection pools to Langflow and the LLM providers.

### Dependency Injection for FastAPI Routes

To expose the service to HTTP handlers, [`src/dependencies.py`](https://github.com/langflow-ai/openrag/blob/main/src/dependencies.py) defines `get_chat_service` (lines 44-47) as a FastAPI dependency:

```python
def get_chat_service():
    return services["chat_service"]

```

This dependency is injected into the route handlers in [`src/api/v1/chat.py`](https://github.com/langflow-ai/openrag/blob/main/src/api/v1/chat.py), which implement the public REST API at the `/v1/chat` endpoint.

### Core Service Methods and the Agent Layer

The core implementation resides in [`src/services/chat_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/chat_service.py) (lines 10-190), which provides three high-level entry points:

- **`chat()`** – Executes raw LLM calls via the patched OpenAI client
- **`langflow_chat()`** – Routes conversations through a configured Langflow flow (the default for public API interactions)
- **`langflow_nudges_chat()`** – Specialized handler for nudges-specific flows

These methods delegate to low-level async wrappers in [`src/agent.py`](https://github.com/langflow-ai/openrag/blob/main/src/agent.py) (lines 347-695), such as `async_chat`, `async_langflow_chat`, and `async_langflow_chat_stream`, which perform the actual HTTP calls to Langflow or the LLM provider.

## Implementing Standard Chat Requests

### Request Flow Architecture

A typical non-streaming chat request flows through these layers:

1. **API Reception** – `chat_create_endpoint` in [`src/api/v1/chat.py`](https://github.com/langflow-ai/openrag/blob/main/src/api/v1/chat.py) receives the HTTP POST request and parses the body into a `ChatV1Body` model
2. **Context Setup** – The endpoint stores JWT tokens, search filters, and score thresholds using `auth_context.set_auth_context`, `set_search_filters`, `set_search_limit`, and `set_score_threshold`
3. **Service Invocation** – The handler calls `chat_service.langflow_chat()` (lines 49-78 in [`src/services/chat_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/chat_service.py))
4. **Header Construction** – The service builds extra HTTP headers containing the JWT, selected embedding model, and provider credentials
5. **Filter Expression** – It constructs a filter expression from the current search context (lines 90-130) for retrieval-augmented generation
6. **Langflow Execution** – The Langflow client (initialized via `clients.ensure_langflow_client()`) sends the request to the flow configured in `LANGFLOW_CHAT_FLOW_ID`
7. **Response Packaging** – `async_langflow_chat` in [`src/agent.py`](https://github.com/langflow-ai/openrag/blob/main/src/agent.py) returns `response_text`, `response_id`, and `sources`, which the service packages into a dict containing the final response and citation metadata

### Direct Python Implementation

To use **ChatService** directly within Python code:

```python
from src.services.chat_service import ChatService
import asyncio

async def run_conversation():
    service = ChatService()
    
    # Langflow-driven chat (default behavior)

    result = await service.langflow_chat(
        prompt="Explain the OpenRAG architecture",
        user_id="user-123",
        jwt_token="eyJhbGciOiJIUzI1NiIs...",
        stream=False,
    )
    
    print("Response:", result["response"])
    print("Sources:", result.get("sources", []))
    print("Chat ID:", result["response_id"])

asyncio.run(run_conversation())

```

All `ChatService` methods are **async** and must be awaited. The `langflow_chat()` method automatically handles retrieval context injection and credential forwarding to the Langflow backend.

### HTTP API Implementation

For external clients, the public REST API accepts JSON payloads:

```bash
curl -X POST https://your-openrag-instance.com/v1/chat \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
        "message": "Summarize the latest Langflow release notes",
        "stream": false,
        "filters": {"data_sources": ["github", "docs"]},
        "limit": 5,
        "score_threshold": 0.2
      }'

```

**Response format:**

```json
{
  "response": "Langflow 0.7.0 introduces a new UI for component configuration...",
  "chat_id": "f2b8e9c4-a3d1-4e5b-8c9d-1234567890ab",
  "sources": [
    {
      "filename": "release_notes.md",
      "text": "Version 0.7.0 adds support for...",
      "score": 0.97,
      "mimetype": "text/markdown"
    }
  ]
}

```

## Implementing Streaming Chat Responses

Streaming implementations follow the same initialization path but return async generators instead of complete responses. When `stream=True` is passed to `langflow_chat()`, the service returns the generator from `async_langflow_chat_stream` in [`src/agent.py`](https://github.com/langflow-ai/openrag/blob/main/src/agent.py).

The API layer transforms these chunks into Server-Sent Events using `_transform_stream_to_sse` in [`src/api/v1/chat.py`](https://github.com/langflow-ai/openrag/blob/main/src/api/v1/chat.py) (lines 46-100), which extracts `delta` text, tool-call sources, and the final `chat_id`.

### Client-Side Streaming Implementation

```javascript
const eventSource = new EventSource('/v1/chat', {
  method: 'POST',
  headers: {
    'Authorization': 'Bearer YOUR_API_KEY',
    'Content-Type': 'application/json'
  },
  body: JSON.stringify({
    message: "Explain how knowledge filters work",
    stream: true,
    filter_id: "production-filter"
  })
});

eventSource.addEventListener('message', (e) => {
  const data = JSON.parse(e.data);
  
  if (data.type === 'content') {
    // Append incremental text to UI
    console.log('Delta:', data.delta);
  } else if (data.type === 'sources') {
    // Handle retrieved documents
    console.log('Sources:', data.sources);
  } else if (data.type === 'done') {
    // Final message contains chat_id
    console.log('Chat ID:', data.chat_id);
    eventSource.close();
  }
});

```

The SSE stream emits three event types: `content` for incremental text, `sources` for retrieved documents, and `done` for session metadata.

## Retrieving and Managing Conversation History

### Accessing Historical Conversations

The service provides two methods for history retrieval:

- **`get_langflow_history()`** – Merges local conversation metadata with the full conversation data stored in Langflow
- **`get_chat_history()`** – Returns only the in-memory conversation cache

### API Endpoints for History

List all conversations for the authenticated user:

```bash
curl -H "Authorization: Bearer YOUR_API_KEY" \
  https://your-openrag-instance.com/v1/chat

```

Retrieve a specific conversation thread:

```bash
curl -H "Authorization: Bearer YOUR_API_KEY" \
  https://your-openrag-instance.com/v1/chat/<CHAT_ID>

```

Both endpoints invoke the history methods on `ChatService` and return enriched conversation metadata including timestamps, message counts, and source references.

## Summary

- **Instantiation** – `ChatService` is created as a singleton in [`src/main.py`](https://github.com/langflow-ai/openrag/blob/main/src/main.py) (lines 618-682) and injected via `get_chat_service` in [`src/dependencies.py`](https://github.com/langflow-ai/openrag/blob/main/src/dependencies.py)
- **Standard Chat** – Use `chat_service.langflow_chat()` for RAG-enabled conversations or `chat_service.chat()` for direct LLM access
- **Streaming** – Pass `stream=True` to receive an async generator; the HTTP API translates this to SSE via `_transform_stream_to_sse` in [`src/api/v1/chat.py`](https://github.com/langflow-ai/openrag/blob/main/src/api/v1/chat.py)
- **Context Management** – Set filters, JWT tokens, and score thresholds using the auth context utilities before invoking chat methods
- **History** – Retrieve conversations via `get_langflow_history()` or the REST endpoints at `/v1/chat`

## Frequently Asked Questions

### How do I instantiate ChatService outside of the FastAPI application?

Import the class directly from `src.services.chat_service` and instantiate it with `ChatService()`. The service initializes its own Langflow client lazily via `clients.ensure_langflow_client()`, so no additional setup is required for standalone scripts or background workers.

### What is the difference between `chat()` and `langflow_chat()` methods?

The `chat()` method (lines 10-48 in [`src/services/chat_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/chat_service.py)) calls the raw OpenAI-compatible client directly for simple LLM interactions, while `langflow_chat()` (lines 49-78) routes requests through a Langflow flow ID specified by the `LANGFLOW_CHAT_FLOW_ID` environment variable, enabling complex retrieval-augmented generation workflows with tool calling.

### How does authentication context propagate to the chat service?

The API layer stores JWT tokens and search parameters in thread-local storage using `auth_context.set_auth_context` and `set_search_filters` before calling the service. The `langflow_chat()` method reads this context to construct HTTP headers and filter expressions (lines 90-130), ensuring retrieval tools and Langflow flows receive the correct credentials and search constraints.

### Can I use custom retrieval filters when calling ChatService programmatically?

Yes. When calling `langflow_chat()` directly, the service automatically picks up filters set via `set_search_filters()` in the auth context. For HTTP requests, include a `filters` object and optional `filter_id`, `limit`, and `score_threshold` in the JSON payload; these are converted to Langflow-compatible filter expressions before the request is dispatched.