# How RAGFlow Handles Data Synchronization from Confluence, S3, Notion, Discord, and Google Drive

> Discover how RAGFlow synchronizes data from Confluence, S3, Notion, Discord, and Google Drive using its unified SyncBase abstraction and configurable batch processing for seamless updates.

- Repository: [InfiniFlow/ragflow](https://github.com/infiniflow/ragflow)
- Tags: how-to-guide
- Published: 2026-02-23

---

**RAGFlow uses a unified `SyncBase` abstraction to orchestrate data synchronization across heterogeneous sources, employing connector-specific implementations for Confluence, S3, Notion, Discord, and Google Drive while supporting both incremental polling and full re-indexing through a configurable batch processing pipeline.**

RAGFlow, an open-source RAG engine developed by Infiniflow, ingests unstructured data from disparate cloud platforms and collaboration tools. The synchronization architecture centers on a generic task dispatcher that routes source-specific connectors through a standardized ingestion pipeline defined in [`rag/svr/sync_data_source.py`](https://github.com/infiniflow/ragflow/blob/main/rag/svr/sync_data_source.py).

## The SyncBase Abstraction and Factory Pattern

At the core of RAGFlow’s data synchronization engine is the **`SyncBase`** class. This abstraction defines a common interface that every source connector must implement, ensuring consistent behavior regardless of whether the backend is a Confluence wiki or an S3 bucket.

Concrete implementations—`Confluence`, `S3`, `Notion`, `Discord`, `GoogleDrive`, and others—are registered in a **`func_factory`** mapping. When the dispatcher (`dispatch_tasks`) retrieves a pending job from `SyncLogsService`, it instantiates the appropriate class by looking up the `FileSource` enum value in this factory dictionary. Each subclass implements the `_generate` method, which returns a document batch generator specific to that source’s API.

## Source-Specific Connector Implementations

Each external system is encapsulated by a dedicated connector class living in `common/data_source/`:

- **`ConfluenceConnector`** ([`common/data_source/confluence_connector.py`](https://github.com/infiniflow/ragflow/blob/main/common/data_source/confluence_connector.py)): Queries Atlassian Confluence using CQL, handles pagination, and supports both cloud and server instances. Configuration includes `wiki_base`, `space`, and `index_recursively` flags.

- **`BlobStorageConnector`** ([`common/data_source/blob_storage_connector.py`](https://github.com/infiniflow/ragflow/blob/main/common/data_source/blob_storage_connector.py)): Manages S3-compatible object storage including AWS S3, Cloudflare R2, and OCI. Accepts parameters like `bucket_type`, `bucket_name`, and optional `prefix` filtering.

- **`NotionConnector`** ([`common/data_source/notion_connector.py`](https://github.com/infiniflow/ragflow/blob/main/common/data_source/notion_connector.py)): Traverses Notion workspaces using the `root_page_id` as an entry point, respecting API pagination and converting block structures into documents.

- **`DiscordConnector`** ([`common/data_source/discord_connector.py`](https://github.com/infiniflow/ragflow/blob/main/common/data_source/discord_connector.py)): Connects via bot token to specified `server_ids` and `channel_names`, fetching message history and yielding structured documents.

- **`GoogleDriveConnector`** ([`common/data_source/google_drive/connector.py`](https://github.com/infiniflow/ragflow/blob/main/common/data_source/google_drive/connector.py)): Handles service account authentication, shared drive enumeration, and optional image ingestion via `allow_images` flags.

## Incremental Polling vs. Full Re-indexing

RAGFlow supports two synchronization modes controlled by the **`reindex`** parameter and **`poll_range_start`** timestamp:

1. **Full Re-index**: When `reindex == "1"` or no `poll_range_start` exists, the connector begins from epoch (`0.0` or `None`), fetching all available documents.

2. **Incremental Sync**: If a previous sync timestamp exists and `reindex` is unset, the connector passes `poll_range_start` to its `load_from_checkpoint` method, retrieving only content modified since the last successful run.

This logic appears in each concrete class. For example, in the Google Drive implementation at lines 96-106 of [`sync_data_source.py`](https://github.com/infiniflow/ragflow/blob/main/sync_data_source.py), the code checks `reindex` to determine whether to reset the time window before invoking the connector.

## The Synchronization Execution Flow

The end-to-end pipeline follows these steps:

1. **Task Dispatch**: `dispatch_tasks` queries `SyncLogsService` for pending jobs and acquires a global semaphore (`MAX_CONCURRENT_TASKS`) to limit concurrent operations.

2. **Connector Instantiation**: The factory creates the appropriate connector (e.g., `self.connector = ConfluenceConnector(...)`) and loads encrypted credentials from the configuration store.

3. **Document Batching**: The `_generate` method yields **`document_batches`** by repeatedly calling `load_from_checkpoint` until the configurable `batch_size` threshold is reached.

4. **Ingestion**: For each batch, RAGFlow hashes document IDs and passes the payload to `KnowledgebaseService` for vectorization and storage.

5. **Cursor Update**: Upon completion, the system updates `task["poll_range_start"]` with the current timestamp, enabling incremental polling for subsequent runs.

6. **Logging**: Each phase writes structured logs via `SyncLogsService`, recording source type, document counts, and skipped items.

## Credential Rotation and Failure Handling

Connectors for OAuth-based sources like Gmail and Google Drive may return refreshed tokens during the sync process. RAGFlow detects these updates and persists them immediately via **`ConnectorService.update_by_id`**, ensuring uninterrupted access without manual re-authentication.

Errors are handled gracefully through **`ConnectorFailure`** objects. When a connector encounters a recoverable error (e.g., a missing Confluence page or temporary S3 timeout), it returns a failure record rather than raising an exception. The sync engine logs these failures in `SyncLogsService` and continues processing remaining batches, ensuring that one corrupted document does not abort an entire synchronization job.

## Practical Example: Configuring a Confluence Sync

The following Python snippet demonstrates how to programmatically create a synchronization task for a Confluence space via RAGFlow’s API:

```python
import requests
import json
import os

API_URL = "http://localhost:8000/api/v1/connector"
HEADERS = {
    "Authorization": f"Bearer {os.getenv('ADMIN_TOKEN')}",
    "Content-Type": "application/json"
}

payload = {
    "name": "Confluence Docs",
    "source": "confluence",
    "config": {
        "wiki_base": "https://mycompany.atlassian.net/wiki",
        "is_cloud": True,
        "index_mode": "space",
        "space": "DOCS"
    },
    "kb_id": "123e4567-e89b-12d3-a456-426614174000",
    "auto_parse": True,
    "reindex": "1"
}

resp = requests.post(f"{API_URL}/sync", headers=HEADERS, data=json.dumps(payload))
print(resp.status_code, resp.json())

```

Once submitted, the `dispatch_tasks` loop picks up the job, instantiates the `Confluence` class, and streams documents from the specified space into the target knowledge base using the batch processing logic described above.

## Summary

- **Unified Abstraction**: RAGFlow’s `SyncBase` class and `func_factory` pattern allow diverse sources to share the same orchestration logic while encapsulating API-specific quirks in dedicated connectors.

- **Flexible Sync Modes**: The system supports both full re-indexing (`reindex = "1"`) and incremental polling via `poll_range_start` timestamps to minimize API calls and processing time.

- **Resilient Processing**: Connector-specific failures are logged without aborting the entire sync run, and credentials are automatically refreshed for OAuth sources like Google Drive.

- **Batch Architecture**: Documents flow through checkpoint-aware generators (`load_from_checkpoint`) in configurable batches, optimizing memory usage and throughput.

- **Centralized Logging**: Every operation is recorded in `SyncLogsService`, providing audit trails for document ingestion and failure analysis.

## Frequently Asked Questions

### How does RAGFlow handle authentication token expiration during long-running syncs?

When connectors like `GoogleDriveConnector` or Gmail refresh their OAuth tokens during execution, RAGFlow captures the new credentials from the connector’s response and immediately persists them using `ConnectorService.update_by_id`. This ensures subsequent syncs use valid tokens without requiring manual reconfiguration.

### Can RAGFlow sync only specific folders or channels rather than entire workspaces?

Yes. Connectors accept scope-limiting parameters in their configuration. For example, `BlobStorageConnector` supports a `prefix` parameter for S3 buckets, `DiscordConnector` accepts specific `channel_names`, and `ConfluenceConnector` can target individual `space` values or `page_id` entries rather than full wiki instances.

### What happens if a document fails to process during synchronization?

The sync engine catches `ConnectorFailure` objects returned by the source connector and logs them via `SyncLogsService` without stopping the batch. The failure is recorded with context (document ID, error type), allowing administrators to review skipped items while the remaining documents complete ingestion.

### How does RAGFlow determine which documents to sync in incremental mode?

The system stores the last successful sync timestamp in `poll_range_start`. On the next run, this value is passed to the connector’s `load_from_checkpoint` method, which queries the source API for content modified after that time. If `reindex` is set to `"1"`, this timestamp is ignored and the connector fetches all available documents from the beginning of the dataset.