How SSE Events Provide Real-Time Notifications for Memory Operations in MCP Memory Service

The MCP Memory Service leverages a singleton SSEManager to push live updates via Server-Sent Events, broadcasting typed notifications for every memory store, delete, and search operation while maintaining persistent connections through automatic heartbeats.

The MCP Memory Service implements Server-Sent Events (SSE) to deliver immediate, push-based notifications whenever the memory graph changes. This architecture eliminates polling overhead and enables real-time synchronization across dashboards, CLI tools, and automated workflows reacting to memory operations.

SSE Architecture in MCP Memory Service

The notification system consists of four integrated components that manage connection state, application lifecycle, client subscriptions, and event broadcasting.

The SSE Manager Singleton

Located in src/mcp_memory_service/web/sse.py, the SSEManager class maintains global state for all active connections. For each client, it creates a dedicated asyncio.Queue to buffer events and stores connection metadata in an internal registry. The manager starts a background _heartbeat_loop task that emits periodic keep-alive messages based on the SSE_HEARTBEAT_INTERVAL setting defined in src/mcp_memory_service/config.py (lines 56-73 and 138-152). The broadcast_event method distributes SSEEvent objects to all connected clients or filtered subsets based on connection metadata.

Application Lifecycle Integration

The FastAPI application integrates the manager through its lifespan context in src/mcp_memory_service/web/app.py. During startup, the code calls await sse_manager.start() to initialize the heartbeat task (lines 61-66). On shutdown, await sse_manager.stop() gracefully closes all connections and cancels background tasks (lines 54-57). This ensures no dangling connections remain when the service terminates.

Event Stream Endpoint

Clients subscribe to updates via GET /api/events, implemented in src/mcp_memory_service/web/api/events.py. This endpoint generates a unique connection ID, registers the client with the SSEManager, and returns an EventSourceResponse. The underlying generator continuously yields queued events; if the queue remains empty for 60 seconds, it automatically emits a ping event to prevent HTTP connection timeouts (lines 26-33 and 30-45 in sse.py).

Broadcasting Memory Operations

API handlers broadcast typed events immediately after successful operations:

  • Store operations: In src/mcp_memory_service/web/api/memories.py (lines 76-99), after persisting a memory, the code constructs a memory_stored event containing the content hash, tags, memory type, and metadata, then calls await sse_manager.broadcast_event(event).
  • Delete operations: Successful deletions trigger memory_deleted events containing the removed content hash (lines 108-114).
  • Search operations: Upon query completion, src/mcp_memory_service/web/api/search.py (lines 79-88) broadcasts search_completed events with the query string, result count, and latency metrics.

Event Payload Structure

Every SSE event transmitted by the MCP Memory Service contains a standardized payload:

  • event_type: String identifier such as "memory_stored", "memory_deleted", "search_completed", or "heartbeat".
  • data: Dictionary with operation-specific fields (e.g., content_hash, tags, query, results_count).
  • event_id: Auto-generated unique identifier for event tracking.
  • timestamp: ISO-formatted timestamp indicating when the operation completed.
  • retry: Optional retry interval hint for clients experiencing network interruptions.

Consuming Real-Time Notifications

Clients can connect to the SSE stream using standard HTTP clients that support text/event-stream parsing.

JavaScript Client Example

Connect from a browser or Node.js environment to receive live updates:

// Subscribe to the SSE endpoint
const evtSource = new EventSource('/api/events');

// Handle initial connection handshake
evtSource.addEventListener('connection_established', (e) => {
  const info = JSON.parse(e.data);
  console.log('SSE connected (id:', info.connection_id, ')');
});

// Listen for new memories
evtSource.addEventListener('memory_stored', (e) => {
  const payload = JSON.parse(e.data);
  console.log('🧠 New memory stored:', payload.content_hash);
});

// Listen for deletions
evtSource.addEventListener('memory_deleted', (e) => {
  const payload = JSON.parse(e.data);
  console.log('❌ Memory deleted:', payload.content_hash);
});

// Listen for search completions
evtSource.addEventListener('search_completed', (e) => {
  const payload = JSON.parse(e.data);
  console.log(`🔎 Search "${payload.query}" returned ${payload.results_count} results`);
});

// Optional: monitor heartbeats
evtSource.addEventListener('heartbeat', (e) => console.debug('heartbeat', JSON.parse(e.data)));

Python Client Example

Use httpx with sseclient-py to process the stream asynchronously:

import httpx
import json
from sseclient import SSEClient

with httpx.Client(base_url="http://localhost:8000") as client:
    resp = client.get("/api/events", stream=True)
    for event in SSEClient(resp):
        if event.event == "memory_stored":
            data = json.loads(event.data)
            print(f"Stored: {data['content_hash']} ({data['tags']})")
        elif event.event == "memory_deleted":
            data = json.loads(event.data)
            print(f"Deleted: {data['content_hash']}")
        elif event.event == "search_completed":
            data = json.loads(event.data)
            print(f"Search '{data['query']}' finished – {data['results_count']} hits")
        elif event.event == "heartbeat":
            pass  # Connection health indicator

Server-Side Broadcasting

Developers can emit custom events from within the service using helper factories available in the SSE module:

from mcp_memory_service.web.sse import sse_manager, create_memory_stored_event

async def broadcast_bulk_import(memories):
    for mem in memories:
        event = create_memory_stored_event({
            "content_hash": mem.content_hash,
            "content": mem.content,
            "tags": mem.tags,
            "memory_type": mem.memory_type,
        })
        await sse_manager.broadcast_event(event)

Summary

Frequently Asked Questions

What endpoint do clients use to subscribe to SSE events in MCP Memory Service?

Clients connect to GET /api/events as defined in src/mcp_memory_service/web/api/events.py. This endpoint returns an EventSourceResponse that maintains an open HTTP connection and streams events indefinitely until the client disconnects or the server shuts down.

How does the service keep SSE connections alive during idle periods?

The SSEManager runs a background _heartbeat_loop that emits periodic heartbeat events based on the SSE_HEARTBEAT_INTERVAL configuration value. Additionally, the event generator in src/mcp_memory_service/web/sse.py emits automatic ping events every 60 seconds if no other messages are queued, preventing HTTP timeouts.

Which memory operations trigger automatic SSE notifications?

According to the source code in doobidoo/mcp-memory-service, three primary operations trigger broadcasts: storing a memory emits memory_stored (lines 76-99 in memories.py), deleting a memory emits memory_deleted (lines 108-114), and completing a search emits search_completed (lines 79-88 in search.py). Each handler calls await sse_manager.broadcast_event() immediately after the operation succeeds.

Can external scripts broadcast custom events through the MCP Memory Service?

Yes, any Python code running within the service context can import the sse_manager singleton from mcp_memory_service.web.sse and call await sse_manager.broadcast_event(). Helper factory functions like create_memory_stored_event() generate properly structured payloads, allowing bulk imports or custom workflows to notify connected clients in real time.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →