# How RAGFlow Supports Querying Heterogeneous Data Sources: Architecture and Implementation

> Discover how RAGFlow unifies heterogeneous data sources with its connector abstraction and unified doc-store for seamless cross-source querying without complex logic. Explore architecture and implementation.

- Repository: [InfiniFlow/ragflow](https://github.com/infiniflow/ragflow)
- Tags: architecture
- Published: 2026-02-23

---

**RAGFlow unifies disparate data sources through a connector-based abstraction layer that normalizes documents into a standard format and stores them in a unified doc-store, enabling seamless cross-source queries without requiring source-specific logic.**

Modern RAG applications must ingest content from disparate systems—from cloud storage and SaaS platforms to internal databases. The open-source RAGFlow framework (infiniflow/ragflow) solves this challenge by implementing a layered architecture specifically designed for querying heterogeneous data sources through normalized connectors and a unified storage backend.

## The Connector Layer: Abstracting Source Diversity

### ConnectorBase Interface and Standardized Document Model

Every external data source in RAGFlow implements the `ConnectorBase` interface defined in [`common/data_source/interfaces.py`](https://github.com/infiniflow/ragflow/blob/main/common/data_source/interfaces.py). Whether ingesting from Google Drive, GitHub, Notion, or Confluence, each connector exposes uniform methods: `list_documents()` and `fetch_document()`. These methods return instances of the standardized `Document` model defined in [`common/data_source/models.py`](https://github.com/infiniflow/ragflow/blob/main/common/data_source/models.py), ensuring that metadata, content blobs, and semantic identifiers follow a consistent schema regardless of origin.

### Supported Data Sources and Implementation Examples

Source-specific implementations reside in `common/data_source/` directories. For example, [`common/data_source/google_drive/connector.py`](https://github.com/infiniflow/ragflow/blob/main/common/data_source/google_drive/connector.py) and [`common/data_source/github/connector.py`](https://github.com/infiniflow/ragflow/blob/main/common/data_source/github/connector.py) handle OAuth authentication and API pagination internally while exposing the same interface to the rest of the system. This polymorphic design allows the sync engine to treat Notion pages, Slack messages, and database records identically once they pass through the connector boundary.

To add support for a new source, implement the base interface:

```python

# common/data_source/airtable_connector.py

from .interfaces import ConnectorBase
from .models import Document

class AirtableConnector(ConnectorBase):
    def __init__(self, base_id: str, api_key: str):
        self.base_id = base_id
        self.api_key = api_key

    async def list_documents(self) -> list[Document]:
        # fetch records via Airtable REST API and convert to Document objects

        ...

```

## The Sync Engine: Ingesting and Normalizing Documents

The asynchronous ingestion pipeline in [`rag/svr/sync_data_source.py`](https://github.com/infiniflow/ragflow/blob/main/rag/svr/sync_data_source.py) orchestrates data movement from connectors to storage. When a sync task executes, the worker instantiates the appropriate connector class based on the `source_type` field stored in the database configuration.

For each document retrieved, the engine constructs a unified payload containing:

- `id`: A hashed identifier derived from the source document ID
- `source`: The canonical source name (e.g., "google_drive", "github")
- `semantic_identifier`: Human-readable document reference
- `blob`: Raw content bytes
- `metadata`: Source-specific attributes normalized to JSON
- `extension`, `size_bytes`, `doc_updated_at`: File characteristics

This payload is passed to `KnowledgebaseService.index_documents()` in [`api/db/services/knowledgebase_service.py`](https://github.com/infiniflow/ragflow/blob/main/api/db/services/knowledgebase_service.py), which persists the document to the RAG doc-store. Once indexed, the original source system is never queried again during retrieval operations.

```python

# rag/svr/sync_data_source.py (excerpt)

self.connector = GoogleDriveConnector(**self.conf)   # any connector class

self.connector.load_credentials(self.conf["credentials"])

document_batch_generator = await self._generate(task)   # yields batches of Document

for doc in document_batch_generator:
    payload = {
        "id": hash128(doc.id),
        "source": self.SOURCE_NAME,
        "semantic_identifier": doc.semantic_identifier,
        ...
    }
    KnowledgebaseService.index_documents(kb_id=task["kb_id"], docs=[payload])

```

## Unified Doc-Store Abstraction

### Infinity and Elasticsearch Backends

RAGFlow supports two storage backends for the unified document index: Infinity and Elasticsearch. The `InfinityConnection` class in [`rag/utils/infinity_conn.py`](https://github.com/infiniflow/ragflow/blob/main/rag/utils/infinity_conn.py) and `ESConnection` in [`rag/utils/es_conn.py`](https://github.com/infiniflow/ragflow/blob/main/rag/utils/es_conn.py) both implement identical CRUD interfaces. This dual-backend support allows operators to choose between Infinity's vector-native architecture or existing Elasticsearch clusters without modifying application code.

### Common CRUD Interface

Both connection classes expose standardized methods: `search()`, `insert()`, `delete()`, and `update()`. The `search()` method accepts match expressions (`MatchTextExpr`, `MatchDenseExpr`) and returns results as a pandas `DataFrame` with canonical columns: `id`, `content`, `metadata`, `score`, and `source`. This abstraction ensures that query logic remains agnostic to whether documents originated from object storage or a SaaS API.

## The Query Layer: Cross-Source Retrieval

### QueryBase and Match Expressions

The query construction layer uses `QueryBase` defined in [`common/query_base.py`](https://github.com/infiniflow/ragflow/blob/main/common/query_base.py) as the contract for all query objects. Concrete implementations like `MsgTextQuery` (utilized in chat interfaces) generate match expressions that the doc-store understands. The [`memory/services/query.py`](https://github.com/infiniflow/ragflow/blob/main/memory/services/query.py) module translates natural language questions into these structured expressions, then forwards them to the selected backend via the unified `search` method.

### Result Unification

Search results return as standardized DataFrames where the `source` column indicates the original data provenance—Google Drive, Confluence, or custom databases—but the schema remains consistent. This allows the API layer in [`api/apps/knowledgebase/route.py`](https://github.com/infiniflow/ragflow/blob/main/api/apps/knowledgebase/route.py) to render mixed-source results without source-specific parsing logic.

```python

# memory/services/query.py

def search(kb_id: str, raw_query: str):
    match_expr, _ = MsgTextQuery().question(raw_query)
    # Search the unified doc‑store (Infinity or ES)

    df, total = InfinityConnection().search(
        select_fields=["content", "source", "metadata"],
        highlight_fields=[],
        condition={"kb_id": [kb_id]},
        match_expressions=[match_expr],
        order_by=None,
        offset=0,
        limit=20,
        index_names=f"ragflow_doc_meta_{kb_id}",
        knowledgebase_ids=[kb_id],
    )
    return df

```

## Summary

- **Connector Architecture**: All data sources implement `ConnectorBase` in `common/data_source/`, exposing `list_documents()` and returning standardized `Document` models defined in [`common/data_source/models.py`](https://github.com/infiniflow/ragflow/blob/main/common/data_source/models.py).
- **Sync Pipeline**: [`rag/svr/sync_data_source.py`](https://github.com/infiniflow/ragflow/blob/main/rag/svr/sync_data_source.py) normalizes documents into unified payloads with hashed IDs and metadata, storing them via `KnowledgebaseService`.
- **Unified Storage**: Both `InfinityConnection` and `ESConnection` provide identical CRUD interfaces in `rag/utils/`, abstracting backend differences.
- **Standardized Queries**: The `QueryBase` interface and [`memory/services/query.py`](https://github.com/infiniflow/ragflow/blob/main/memory/services/query.py) enable cross-source search using match expressions that return canonical pandas DataFrames.
- **Source Agnostic Results**: Query results include a `source` field for provenance tracking while maintaining a consistent schema across all heterogeneous inputs.

## Frequently Asked Questions

### What types of data sources does RAGFlow support?

RAGFlow supports any source that implements the `ConnectorBase` interface, including Google Drive, GitHub, Notion, Confluence, Slack, databases, and object storage. Each connector resides in `common/data_source/` and handles authentication and API specifics internally while exposing standardized `list_documents()` and `fetch_document()` methods to the sync engine.

### How does RAGFlow handle document schema differences between sources?

The `Document` model in [`common/data_source/models.py`](https://github.com/infiniflow/ragflow/blob/main/common/data_source/models.py) enforces a normalized schema. During synchronization, [`rag/svr/sync_data_source.py`](https://github.com/infiniflow/ragflow/blob/main/rag/svr/sync_data_source.py) maps source-specific fields to unified attributes like `semantic_identifier`, `blob`, and `metadata`, ensuring that MongoDB documents and Google Drive files share the same structure in the doc-store.

### Can I query across multiple data sources simultaneously?

Yes. Once documents are synchronized, they reside in a unified index managed by `InfinityConnection` or `ESConnection`. The query layer in [`memory/services/query.py`](https://github.com/infiniflow/ragflow/blob/main/memory/services/query.py) searches across all indexed documents using `kb_id` (knowledge base ID) filters, returning results from multiple sources in a single pandas DataFrame with the `source` column indicating origin.

### How do I add a custom data source connector?

Create a new class inheriting from `ConnectorBase` in `common/data_source/`, implement `list_documents()` to return `Document` objects, and register the connector type in the database configuration. The sync engine in [`rag/svr/sync_data_source.py`](https://github.com/infiniflow/ragflow/blob/main/rag/svr/sync_data_source.py) will automatically instantiate your connector when processing tasks with the corresponding `source_type`, enabling immediate querying of your custom heterogeneous data source alongside existing integrations.