How RAGFlow Supports Querying Heterogeneous Data Sources: Architecture and Implementation
RAGFlow unifies disparate data sources through a connector-based abstraction layer that normalizes documents into a standard format and stores them in a unified doc-store, enabling seamless cross-source queries without requiring source-specific logic.
Modern RAG applications must ingest content from disparate systems—from cloud storage and SaaS platforms to internal databases. The open-source RAGFlow framework (infiniflow/ragflow) solves this challenge by implementing a layered architecture specifically designed for querying heterogeneous data sources through normalized connectors and a unified storage backend.
The Connector Layer: Abstracting Source Diversity
ConnectorBase Interface and Standardized Document Model
Every external data source in RAGFlow implements the ConnectorBase interface defined in common/data_source/interfaces.py. Whether ingesting from Google Drive, GitHub, Notion, or Confluence, each connector exposes uniform methods: list_documents() and fetch_document(). These methods return instances of the standardized Document model defined in common/data_source/models.py, ensuring that metadata, content blobs, and semantic identifiers follow a consistent schema regardless of origin.
Supported Data Sources and Implementation Examples
Source-specific implementations reside in common/data_source/ directories. For example, common/data_source/google_drive/connector.py and common/data_source/github/connector.py handle OAuth authentication and API pagination internally while exposing the same interface to the rest of the system. This polymorphic design allows the sync engine to treat Notion pages, Slack messages, and database records identically once they pass through the connector boundary.
To add support for a new source, implement the base interface:
# common/data_source/airtable_connector.py
from .interfaces import ConnectorBase
from .models import Document
class AirtableConnector(ConnectorBase):
def __init__(self, base_id: str, api_key: str):
self.base_id = base_id
self.api_key = api_key
async def list_documents(self) -> list[Document]:
# fetch records via Airtable REST API and convert to Document objects
...
The Sync Engine: Ingesting and Normalizing Documents
The asynchronous ingestion pipeline in rag/svr/sync_data_source.py orchestrates data movement from connectors to storage. When a sync task executes, the worker instantiates the appropriate connector class based on the source_type field stored in the database configuration.
For each document retrieved, the engine constructs a unified payload containing:
id: A hashed identifier derived from the source document IDsource: The canonical source name (e.g., "google_drive", "github")semantic_identifier: Human-readable document referenceblob: Raw content bytesmetadata: Source-specific attributes normalized to JSONextension,size_bytes,doc_updated_at: File characteristics
This payload is passed to KnowledgebaseService.index_documents() in api/db/services/knowledgebase_service.py, which persists the document to the RAG doc-store. Once indexed, the original source system is never queried again during retrieval operations.
# rag/svr/sync_data_source.py (excerpt)
self.connector = GoogleDriveConnector(**self.conf) # any connector class
self.connector.load_credentials(self.conf["credentials"])
document_batch_generator = await self._generate(task) # yields batches of Document
for doc in document_batch_generator:
payload = {
"id": hash128(doc.id),
"source": self.SOURCE_NAME,
"semantic_identifier": doc.semantic_identifier,
...
}
KnowledgebaseService.index_documents(kb_id=task["kb_id"], docs=[payload])
Unified Doc-Store Abstraction
Infinity and Elasticsearch Backends
RAGFlow supports two storage backends for the unified document index: Infinity and Elasticsearch. The InfinityConnection class in rag/utils/infinity_conn.py and ESConnection in rag/utils/es_conn.py both implement identical CRUD interfaces. This dual-backend support allows operators to choose between Infinity's vector-native architecture or existing Elasticsearch clusters without modifying application code.
Common CRUD Interface
Both connection classes expose standardized methods: search(), insert(), delete(), and update(). The search() method accepts match expressions (MatchTextExpr, MatchDenseExpr) and returns results as a pandas DataFrame with canonical columns: id, content, metadata, score, and source. This abstraction ensures that query logic remains agnostic to whether documents originated from object storage or a SaaS API.
The Query Layer: Cross-Source Retrieval
QueryBase and Match Expressions
The query construction layer uses QueryBase defined in common/query_base.py as the contract for all query objects. Concrete implementations like MsgTextQuery (utilized in chat interfaces) generate match expressions that the doc-store understands. The memory/services/query.py module translates natural language questions into these structured expressions, then forwards them to the selected backend via the unified search method.
Result Unification
Search results return as standardized DataFrames where the source column indicates the original data provenance—Google Drive, Confluence, or custom databases—but the schema remains consistent. This allows the API layer in api/apps/knowledgebase/route.py to render mixed-source results without source-specific parsing logic.
# memory/services/query.py
def search(kb_id: str, raw_query: str):
match_expr, _ = MsgTextQuery().question(raw_query)
# Search the unified doc‑store (Infinity or ES)
df, total = InfinityConnection().search(
select_fields=["content", "source", "metadata"],
highlight_fields=[],
condition={"kb_id": [kb_id]},
match_expressions=[match_expr],
order_by=None,
offset=0,
limit=20,
index_names=f"ragflow_doc_meta_{kb_id}",
knowledgebase_ids=[kb_id],
)
return df
Summary
- Connector Architecture: All data sources implement
ConnectorBaseincommon/data_source/, exposinglist_documents()and returning standardizedDocumentmodels defined incommon/data_source/models.py. - Sync Pipeline:
rag/svr/sync_data_source.pynormalizes documents into unified payloads with hashed IDs and metadata, storing them viaKnowledgebaseService. - Unified Storage: Both
InfinityConnectionandESConnectionprovide identical CRUD interfaces inrag/utils/, abstracting backend differences. - Standardized Queries: The
QueryBaseinterface andmemory/services/query.pyenable cross-source search using match expressions that return canonical pandas DataFrames. - Source Agnostic Results: Query results include a
sourcefield for provenance tracking while maintaining a consistent schema across all heterogeneous inputs.
Frequently Asked Questions
What types of data sources does RAGFlow support?
RAGFlow supports any source that implements the ConnectorBase interface, including Google Drive, GitHub, Notion, Confluence, Slack, databases, and object storage. Each connector resides in common/data_source/ and handles authentication and API specifics internally while exposing standardized list_documents() and fetch_document() methods to the sync engine.
How does RAGFlow handle document schema differences between sources?
The Document model in common/data_source/models.py enforces a normalized schema. During synchronization, rag/svr/sync_data_source.py maps source-specific fields to unified attributes like semantic_identifier, blob, and metadata, ensuring that MongoDB documents and Google Drive files share the same structure in the doc-store.
Can I query across multiple data sources simultaneously?
Yes. Once documents are synchronized, they reside in a unified index managed by InfinityConnection or ESConnection. The query layer in memory/services/query.py searches across all indexed documents using kb_id (knowledge base ID) filters, returning results from multiple sources in a single pandas DataFrame with the source column indicating origin.
How do I add a custom data source connector?
Create a new class inheriting from ConnectorBase in common/data_source/, implement list_documents() to return Document objects, and register the connector type in the database configuration. The sync engine in rag/svr/sync_data_source.py will automatically instantiate your connector when processing tasks with the corresponding source_type, enabling immediate querying of your custom heterogeneous data source alongside existing integrations.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →