How RAGFlow Supports Querying Heterogeneous Data Sources: Architecture and Implementation

RAGFlow unifies disparate data sources through a connector-based abstraction layer that normalizes documents into a standard format and stores them in a unified doc-store, enabling seamless cross-source queries without requiring source-specific logic.

Modern RAG applications must ingest content from disparate systems—from cloud storage and SaaS platforms to internal databases. The open-source RAGFlow framework (infiniflow/ragflow) solves this challenge by implementing a layered architecture specifically designed for querying heterogeneous data sources through normalized connectors and a unified storage backend.

The Connector Layer: Abstracting Source Diversity

ConnectorBase Interface and Standardized Document Model

Every external data source in RAGFlow implements the ConnectorBase interface defined in common/data_source/interfaces.py. Whether ingesting from Google Drive, GitHub, Notion, or Confluence, each connector exposes uniform methods: list_documents() and fetch_document(). These methods return instances of the standardized Document model defined in common/data_source/models.py, ensuring that metadata, content blobs, and semantic identifiers follow a consistent schema regardless of origin.

Supported Data Sources and Implementation Examples

Source-specific implementations reside in common/data_source/ directories. For example, common/data_source/google_drive/connector.py and common/data_source/github/connector.py handle OAuth authentication and API pagination internally while exposing the same interface to the rest of the system. This polymorphic design allows the sync engine to treat Notion pages, Slack messages, and database records identically once they pass through the connector boundary.

To add support for a new source, implement the base interface:


# common/data_source/airtable_connector.py

from .interfaces import ConnectorBase
from .models import Document

class AirtableConnector(ConnectorBase):
    def __init__(self, base_id: str, api_key: str):
        self.base_id = base_id
        self.api_key = api_key

    async def list_documents(self) -> list[Document]:
        # fetch records via Airtable REST API and convert to Document objects

        ...

The Sync Engine: Ingesting and Normalizing Documents

The asynchronous ingestion pipeline in rag/svr/sync_data_source.py orchestrates data movement from connectors to storage. When a sync task executes, the worker instantiates the appropriate connector class based on the source_type field stored in the database configuration.

For each document retrieved, the engine constructs a unified payload containing:

  • id: A hashed identifier derived from the source document ID
  • source: The canonical source name (e.g., "google_drive", "github")
  • semantic_identifier: Human-readable document reference
  • blob: Raw content bytes
  • metadata: Source-specific attributes normalized to JSON
  • extension, size_bytes, doc_updated_at: File characteristics

This payload is passed to KnowledgebaseService.index_documents() in api/db/services/knowledgebase_service.py, which persists the document to the RAG doc-store. Once indexed, the original source system is never queried again during retrieval operations.


# rag/svr/sync_data_source.py (excerpt)

self.connector = GoogleDriveConnector(**self.conf)   # any connector class

self.connector.load_credentials(self.conf["credentials"])

document_batch_generator = await self._generate(task)   # yields batches of Document

for doc in document_batch_generator:
    payload = {
        "id": hash128(doc.id),
        "source": self.SOURCE_NAME,
        "semantic_identifier": doc.semantic_identifier,
        ...
    }
    KnowledgebaseService.index_documents(kb_id=task["kb_id"], docs=[payload])

Unified Doc-Store Abstraction

Infinity and Elasticsearch Backends

RAGFlow supports two storage backends for the unified document index: Infinity and Elasticsearch. The InfinityConnection class in rag/utils/infinity_conn.py and ESConnection in rag/utils/es_conn.py both implement identical CRUD interfaces. This dual-backend support allows operators to choose between Infinity's vector-native architecture or existing Elasticsearch clusters without modifying application code.

Common CRUD Interface

Both connection classes expose standardized methods: search(), insert(), delete(), and update(). The search() method accepts match expressions (MatchTextExpr, MatchDenseExpr) and returns results as a pandas DataFrame with canonical columns: id, content, metadata, score, and source. This abstraction ensures that query logic remains agnostic to whether documents originated from object storage or a SaaS API.

The Query Layer: Cross-Source Retrieval

QueryBase and Match Expressions

The query construction layer uses QueryBase defined in common/query_base.py as the contract for all query objects. Concrete implementations like MsgTextQuery (utilized in chat interfaces) generate match expressions that the doc-store understands. The memory/services/query.py module translates natural language questions into these structured expressions, then forwards them to the selected backend via the unified search method.

Result Unification

Search results return as standardized DataFrames where the source column indicates the original data provenance—Google Drive, Confluence, or custom databases—but the schema remains consistent. This allows the API layer in api/apps/knowledgebase/route.py to render mixed-source results without source-specific parsing logic.


# memory/services/query.py

def search(kb_id: str, raw_query: str):
    match_expr, _ = MsgTextQuery().question(raw_query)
    # Search the unified doc‑store (Infinity or ES)

    df, total = InfinityConnection().search(
        select_fields=["content", "source", "metadata"],
        highlight_fields=[],
        condition={"kb_id": [kb_id]},
        match_expressions=[match_expr],
        order_by=None,
        offset=0,
        limit=20,
        index_names=f"ragflow_doc_meta_{kb_id}",
        knowledgebase_ids=[kb_id],
    )
    return df

Summary

  • Connector Architecture: All data sources implement ConnectorBase in common/data_source/, exposing list_documents() and returning standardized Document models defined in common/data_source/models.py.
  • Sync Pipeline: rag/svr/sync_data_source.py normalizes documents into unified payloads with hashed IDs and metadata, storing them via KnowledgebaseService.
  • Unified Storage: Both InfinityConnection and ESConnection provide identical CRUD interfaces in rag/utils/, abstracting backend differences.
  • Standardized Queries: The QueryBase interface and memory/services/query.py enable cross-source search using match expressions that return canonical pandas DataFrames.
  • Source Agnostic Results: Query results include a source field for provenance tracking while maintaining a consistent schema across all heterogeneous inputs.

Frequently Asked Questions

What types of data sources does RAGFlow support?

RAGFlow supports any source that implements the ConnectorBase interface, including Google Drive, GitHub, Notion, Confluence, Slack, databases, and object storage. Each connector resides in common/data_source/ and handles authentication and API specifics internally while exposing standardized list_documents() and fetch_document() methods to the sync engine.

How does RAGFlow handle document schema differences between sources?

The Document model in common/data_source/models.py enforces a normalized schema. During synchronization, rag/svr/sync_data_source.py maps source-specific fields to unified attributes like semantic_identifier, blob, and metadata, ensuring that MongoDB documents and Google Drive files share the same structure in the doc-store.

Can I query across multiple data sources simultaneously?

Yes. Once documents are synchronized, they reside in a unified index managed by InfinityConnection or ESConnection. The query layer in memory/services/query.py searches across all indexed documents using kb_id (knowledge base ID) filters, returning results from multiple sources in a single pandas DataFrame with the source column indicating origin.

How do I add a custom data source connector?

Create a new class inheriting from ConnectorBase in common/data_source/, implement list_documents() to return Document objects, and register the connector type in the database configuration. The sync engine in rag/svr/sync_data_source.py will automatically instantiate your connector when processing tasks with the corresponding source_type, enabling immediate querying of your custom heterogeneous data source alongside existing integrations.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →