How the Sync Importer Handles Bulk Memory Imports in MCP Memory Service
The sync importer (MemoryImporter) ingests large batches of memories from JSON exports into local MCP memory storage with built-in deduplication, source tracking, and dry-run capabilities.
The MCP Memory Service (doobidoo/mcp-memory-service) provides a robust synchronization layer for managing knowledge across devices. When migrating data from backups, external devices, or third-party tools, the bulk memory import pipeline ensures safe, efficient data consolidation without duplicates or blocking operations.
The 9-Step Bulk Import Pipeline
The import process is implemented in src/mcp_memory_service/sync/importer.py and follows a strict async workflow:
1. Initialization and Storage Binding
When instantiating MemoryImporter, you provide a concrete MemoryStorage implementation such as SqliteVecStorage or CloudflareStorage. The constructor (lines 39-46) stores this reference for all subsequent persistence operations.
2. Deduplication Preparation
If deduplicate=True (the default), the importer calls _get_existing_hashes (lines 24-31) to fetch all content hashes already present in storage. This creates an in-memory set for O(1) duplicate detection throughout the import run.
3. JSON File Processing Loop
The import_from_json method (lines 83-99) iterates over each supplied Path object, invoking _import_single_file for individual processing while aggregating statistics across the entire batch.
4. File Validation and Parsing
Each file must contain an export_metadata block and a memories list. The validation logic in _import_single_file (lines 28-35) raises ValueError for malformed exports, preventing partial or corrupted imports.
5. Memory Entry Processing
For every memory dictionary in the file, the importer (lines 54-67) performs three critical checks:
- Skips entries missing
content_hashwith a warning - Checks against
existing_hashesto skip duplicates - Builds a
Memoryobject via_create_memory_from_dictfor new entries
6. Metadata and Source Tag Injection
The _create_memory_from_dict method (lines 98-108) enriches each memory with:
- An
import_infometadata block recording timestamp, source machine, source file, and importer version - An optional
source:<machine>tag whenadd_source_tags=True
7. Storage or Dry-Run Execution
Depending on the dry_run flag, the importer either:
- Persists the memory via
await self.storage.store(memory)(lines 73-76) - Counts the memory without writing to the backend
8. Hash Tracking and Statistics Update
After successful storage, the content hash is added to existing_hashes (lines 78-82), preventing re-insertion within the same import batch. Per-file and global counters are merged for the final report.
9. Aggregate Reporting
The import_from_json method returns a comprehensive dictionary (lines 103-116) containing totals, per-source breakdowns, dry-run status, timestamps, and any errors encountered during processing.
Key Design Features
Content-Hash Deduplication
By default, the importer avoids storing memories that already exist (same content_hash). The hash set is refreshed only once per import run, providing O(1) lookups and ensuring idempotent imports even when processing overlapping backup files.
Source Awareness and Traceability
The optional source:<machine> tag and detailed import_info metadata let downstream tools trace where each memory originated. This is critical for conflict resolution and sync-status dashboards when merging memories from multiple devices.
Dry-Run Validation Mode
Setting dry_run=True executes the entire pipeline—parsing, validation, and duplicate detection—without persisting any data. This enables safe previewing of large imports or CI validation of export file formats before touching production storage.
Async-First Architecture
All I/O operations (reading files, storing memories, fetching existing hashes) use await, allowing the importer to run in async command-line tools or background sync jobs without blocking the event loop. This is essential for handling bulk memory imports containing thousands of entries.
Implementation Details
The importer relies on three core components:
src/mcp_memory_service/sync/importer.py– ContainsMemoryImporterwith methodsimport_from_json,_import_single_file,_create_memory_from_dict, and_get_existing_hashessrc/mcp_memory_service/models/memory.py– Defines theMemorydataclass instantiated during importsrc/mcp_memory_service/storage/base.py– AbstractMemoryStorageAPI specifyingstore()andget_all_memories()methods implemented by concrete backends
The sync package exports MemoryImporter via src/mcp_memory_service/sync/__init__.py, making it available for CLI tools such as those in src/mcp_memory_service/cli/ingestion.py.
Code Examples
Simple Async Import (Production Run)
import asyncio
from pathlib import Path
from mcp_memory_service.storage.sqlite_vec import SqliteVecStorage
from mcp_memory_service.sync import MemoryImporter
async def bulk_import():
storage = await SqliteVecStorage().initialize()
importer = MemoryImporter(storage)
json_files = [
Path("backup_2024-02-01.json"),
Path("backup_2024-03-15.json"),
]
report = await importer.import_from_json(json_files)
print(f"Imported: {report['imported']}")
print(f"Duplicates skipped: {report['duplicates_skipped']}")
asyncio.run(bulk_import())
Dry-Run to Preview Imports
report = await importer.import_from_json(
json_files=[Path("large_export.json")],
dry_run=True
)
print("Would import:", report["total_processed"])
print("Would skip:", report["duplicates_skipped"])
Analyze Without Database Writes
analysis = await importer.analyze_import([Path("export.json")])
print("New memories:", analysis["unique_memories"])
print("Potential duplicates:", analysis["potential_duplicates"])
print("Internal conflicts:", len(analysis["conflicts"]))
FastAPI Endpoint Integration
from fastapi import APIRouter, UploadFile, Depends
from mcp_memory_service.sync import MemoryImporter
from .dependencies import get_storage
router = APIRouter()
@router.post("/sync/import")
async def import_memories(file: UploadFile, storage=Depends(get_storage)):
tmp_path = Path("/tmp") / file.filename
with open(tmp_path, "wb") as f:
f.write(await file.read())
importer = MemoryImporter(storage)
report = await importer.import_from_json([tmp_path])
return {
"imported": report["imported"],
"duplicates_skipped": report["duplicates_skipped"],
"errors": report["errors"]
}
Summary
- The
MemoryImporterclass insrc/mcp_memory_service/sync/importer.pyorchestrates bulk memory imports through a 9-step async pipeline - Deduplication uses content hashes fetched once at startup (
_get_existing_hashes, lines 24-31) to provide O(1) duplicate detection across all files - Each memory receives
import_infometadata and optional source tags for complete provenance tracking - Dry-run mode validates exports and counts duplicates without writing to storage
- The importer works with any
MemoryStoragebackend (SQLite, Cloudflare, hybrid) via the abstract interface insrc/mcp_memory_service/storage/base.py
Frequently Asked Questions
How does the MCP Memory Service prevent duplicate memories during bulk imports?
The importer fetches all existing content hashes from storage before processing begins (_get_existing_hashes, lines 24-31) and maintains an in-memory set throughout the import. Each incoming memory's hash is checked against this set; duplicates are skipped and counted in the final report. Successfully stored hashes are added to the set immediately, preventing re-insertion even within the same batch.
What happens if a JSON export file is missing required metadata fields?
The _import_single_file validation logic (lines 28-35) requires every export file to contain an export_metadata block and a memories list. If either is missing, the method raises a ValueError immediately, ensuring no partial data is imported and providing clear feedback about malformed files.
Can I preview a bulk import without actually saving the memories?
Yes. Set dry_run=True when calling import_from_json. This executes the full pipeline including parsing, validation, and duplicate detection, but replaces the storage.store() call (lines 73-76) with a no-op counter. The returned report shows exactly what would be imported, skipped, or errored without modifying the database.
Which storage backends are compatible with the MemoryImporter?
Any class implementing the MemoryStorage abstract interface from src/mcp_memory_service/storage/base.py works with the importer. This includes SqliteVecStorage for local SQLite with vector extensions, CloudflareStorage for edge deployment, and hybrid implementations. The importer calls await self.storage.store(memory) (line 75) and await self.storage.get_all_memories() (line 27) through this abstraction, remaining backend-agnostic.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →