How RAGFlow Handles Data Synchronization from Confluence, S3, Notion, Discord, and Google Drive
RAGFlow uses a unified SyncBase abstraction to orchestrate data synchronization across heterogeneous sources, employing connector-specific implementations for Confluence, S3, Notion, Discord, and Google Drive while supporting both incremental polling and full re-indexing through a configurable batch processing pipeline.
RAGFlow, an open-source RAG engine developed by Infiniflow, ingests unstructured data from disparate cloud platforms and collaboration tools. The synchronization architecture centers on a generic task dispatcher that routes source-specific connectors through a standardized ingestion pipeline defined in rag/svr/sync_data_source.py.
The SyncBase Abstraction and Factory Pattern
At the core of RAGFlow’s data synchronization engine is the SyncBase class. This abstraction defines a common interface that every source connector must implement, ensuring consistent behavior regardless of whether the backend is a Confluence wiki or an S3 bucket.
Concrete implementations—Confluence, S3, Notion, Discord, GoogleDrive, and others—are registered in a func_factory mapping. When the dispatcher (dispatch_tasks) retrieves a pending job from SyncLogsService, it instantiates the appropriate class by looking up the FileSource enum value in this factory dictionary. Each subclass implements the _generate method, which returns a document batch generator specific to that source’s API.
Source-Specific Connector Implementations
Each external system is encapsulated by a dedicated connector class living in common/data_source/:
-
ConfluenceConnector(common/data_source/confluence_connector.py): Queries Atlassian Confluence using CQL, handles pagination, and supports both cloud and server instances. Configuration includeswiki_base,space, andindex_recursivelyflags. -
BlobStorageConnector(common/data_source/blob_storage_connector.py): Manages S3-compatible object storage including AWS S3, Cloudflare R2, and OCI. Accepts parameters likebucket_type,bucket_name, and optionalprefixfiltering. -
NotionConnector(common/data_source/notion_connector.py): Traverses Notion workspaces using theroot_page_idas an entry point, respecting API pagination and converting block structures into documents. -
DiscordConnector(common/data_source/discord_connector.py): Connects via bot token to specifiedserver_idsandchannel_names, fetching message history and yielding structured documents. -
GoogleDriveConnector(common/data_source/google_drive/connector.py): Handles service account authentication, shared drive enumeration, and optional image ingestion viaallow_imagesflags.
Incremental Polling vs. Full Re-indexing
RAGFlow supports two synchronization modes controlled by the reindex parameter and poll_range_start timestamp:
-
Full Re-index: When
reindex == "1"or nopoll_range_startexists, the connector begins from epoch (0.0orNone), fetching all available documents. -
Incremental Sync: If a previous sync timestamp exists and
reindexis unset, the connector passespoll_range_startto itsload_from_checkpointmethod, retrieving only content modified since the last successful run.
This logic appears in each concrete class. For example, in the Google Drive implementation at lines 96-106 of sync_data_source.py, the code checks reindex to determine whether to reset the time window before invoking the connector.
The Synchronization Execution Flow
The end-to-end pipeline follows these steps:
-
Task Dispatch:
dispatch_tasksqueriesSyncLogsServicefor pending jobs and acquires a global semaphore (MAX_CONCURRENT_TASKS) to limit concurrent operations. -
Connector Instantiation: The factory creates the appropriate connector (e.g.,
self.connector = ConfluenceConnector(...)) and loads encrypted credentials from the configuration store. -
Document Batching: The
_generatemethod yieldsdocument_batchesby repeatedly callingload_from_checkpointuntil the configurablebatch_sizethreshold is reached. -
Ingestion: For each batch, RAGFlow hashes document IDs and passes the payload to
KnowledgebaseServicefor vectorization and storage. -
Cursor Update: Upon completion, the system updates
task["poll_range_start"]with the current timestamp, enabling incremental polling for subsequent runs. -
Logging: Each phase writes structured logs via
SyncLogsService, recording source type, document counts, and skipped items.
Credential Rotation and Failure Handling
Connectors for OAuth-based sources like Gmail and Google Drive may return refreshed tokens during the sync process. RAGFlow detects these updates and persists them immediately via ConnectorService.update_by_id, ensuring uninterrupted access without manual re-authentication.
Errors are handled gracefully through ConnectorFailure objects. When a connector encounters a recoverable error (e.g., a missing Confluence page or temporary S3 timeout), it returns a failure record rather than raising an exception. The sync engine logs these failures in SyncLogsService and continues processing remaining batches, ensuring that one corrupted document does not abort an entire synchronization job.
Practical Example: Configuring a Confluence Sync
The following Python snippet demonstrates how to programmatically create a synchronization task for a Confluence space via RAGFlow’s API:
import requests
import json
import os
API_URL = "http://localhost:8000/api/v1/connector"
HEADERS = {
"Authorization": f"Bearer {os.getenv('ADMIN_TOKEN')}",
"Content-Type": "application/json"
}
payload = {
"name": "Confluence Docs",
"source": "confluence",
"config": {
"wiki_base": "https://mycompany.atlassian.net/wiki",
"is_cloud": True,
"index_mode": "space",
"space": "DOCS"
},
"kb_id": "123e4567-e89b-12d3-a456-426614174000",
"auto_parse": True,
"reindex": "1"
}
resp = requests.post(f"{API_URL}/sync", headers=HEADERS, data=json.dumps(payload))
print(resp.status_code, resp.json())
Once submitted, the dispatch_tasks loop picks up the job, instantiates the Confluence class, and streams documents from the specified space into the target knowledge base using the batch processing logic described above.
Summary
-
Unified Abstraction: RAGFlow’s
SyncBaseclass andfunc_factorypattern allow diverse sources to share the same orchestration logic while encapsulating API-specific quirks in dedicated connectors. -
Flexible Sync Modes: The system supports both full re-indexing (
reindex = "1") and incremental polling viapoll_range_starttimestamps to minimize API calls and processing time. -
Resilient Processing: Connector-specific failures are logged without aborting the entire sync run, and credentials are automatically refreshed for OAuth sources like Google Drive.
-
Batch Architecture: Documents flow through checkpoint-aware generators (
load_from_checkpoint) in configurable batches, optimizing memory usage and throughput. -
Centralized Logging: Every operation is recorded in
SyncLogsService, providing audit trails for document ingestion and failure analysis.
Frequently Asked Questions
How does RAGFlow handle authentication token expiration during long-running syncs?
When connectors like GoogleDriveConnector or Gmail refresh their OAuth tokens during execution, RAGFlow captures the new credentials from the connector’s response and immediately persists them using ConnectorService.update_by_id. This ensures subsequent syncs use valid tokens without requiring manual reconfiguration.
Can RAGFlow sync only specific folders or channels rather than entire workspaces?
Yes. Connectors accept scope-limiting parameters in their configuration. For example, BlobStorageConnector supports a prefix parameter for S3 buckets, DiscordConnector accepts specific channel_names, and ConfluenceConnector can target individual space values or page_id entries rather than full wiki instances.
What happens if a document fails to process during synchronization?
The sync engine catches ConnectorFailure objects returned by the source connector and logs them via SyncLogsService without stopping the batch. The failure is recorded with context (document ID, error type), allowing administrators to review skipped items while the remaining documents complete ingestion.
How does RAGFlow determine which documents to sync in incremental mode?
The system stores the last successful sync timestamp in poll_range_start. On the next run, this value is passed to the connector’s load_from_checkpoint method, which queries the source API for content modified after that time. If reindex is set to "1", this timestamp is ignored and the connector fetches all available documents from the beginning of the dataset.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →