How to Implement Time-Travel and Checkpoint Recovery in Agent Framework Workflows

Agent Framework workflows support time-travel and checkpoint recovery through the CheckpointStorage protocol, which persists workflow state snapshots and allows you to resume execution from any previous point by passing a checkpoint_id and optional responses to the run() method.

The microsoft/agent-framework provides built-in checkpointing capabilities that enable you to pause workflows at pending requests, save their complete state, and resume later—even after application restarts. This article explains how to implement time-travel and checkpoint recovery using the FileCheckpointStorage class and the checkpoint_storage parameter available in workflow builders like MagenticBuilder and WorkflowBuilder.

How Checkpointing Works

At the core of the system lies the CheckpointStorage protocol defined in python/packages/core/agent_framework/_workflows/_checkpoint.py. When you wire a storage implementation into a workflow via the checkpoint_storage argument, the runtime automatically creates checkpoints whenever the graph reaches a stable state, such as after emitting a pending request.

The Core Components

The checkpointing architecture relies on three primary abstractions:

  • WorkflowCheckpoint – A dataclass capturing the full execution state including workflow name, graph signature hash, messages, committed state, pending request events, iteration count, and metadata. According to the source code in python/packages/core/agent_framework/_workflows/_checkpoint.py, this structure ensures that every aspect of the workflow's memory is serialized.

  • CheckpointStorage – An abstract protocol that storage backends must implement. It defines methods save, load, list_checkpoints, delete, get_latest, and list_checkpoint_ids that the runtime calls automatically.

  • FileCheckpointStorage – A production-ready implementation that persists checkpoints to JSON files with base-64-encoded pickle payloads for complex objects. It guarantees atomic writes and validates that checkpoint IDs remain inside the designated storage directory to prevent path-traversal attacks.

The Checkpoint Lifecycle

When running a workflow with checkpointing enabled, the framework follows this sequence:

  1. Initialization – You create a storage instance (e.g., FileCheckpointStorage(Path("tmp/checkpoints"))) and pass it to your workflow builder.

  2. Automatic Persistence – The runtime pauses when a request (such as MagenticPlanReviewRequest) is emitted. The pending request information is stored inside the checkpoint under pending_request_info_events, and the storage's save() method writes a JSON file (<id>.json).

  3. Inspection – You can retrieve the latest checkpoint via storage.get_latest(workflow_name=...) or list all historical checkpoints with storage.list_checkpoints() to enable time-travel scenarios.

  4. Resumption – You call workflow.run(checkpoint_id=..., responses={request_id: response}, stream=True). The runtime inside python/packages/core/agent_framework/_workflows/_runner.py loads the snapshot, re-hydrates the graph, and injects the supplied responses, allowing the workflow to continue from the saved iteration.

Implementing Checkpoint Recovery

To add fault tolerance to your workflows, you need to configure storage, run until a pause point, and then resume with user input.

Setting Up FileCheckpointStorage

First, instantiate the storage backend and ensure the directory exists:

from pathlib import Path
from agent_framework import FileCheckpointStorage

CHECKPOINT_DIR = Path(__file__).parent / "tmp" / "my_checkpoints"
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)
checkpoint_storage = FileCheckpointStorage(CHECKPOINT_DIR)

Pass this instance to your workflow builder. The following example uses a generic MyWorkflowBuilder, which could be MagenticBuilder or any custom implementation:

def build_my_workflow():
    # … create your agents and tools …

    return MyWorkflowBuilder(
        participants=[agent_a, agent_b],
        checkpoint_storage=checkpoint_storage,  # Required for persistence

        max_round_count=10,
    ).build()

Capturing Checkpoints During Execution

Run the workflow and listen for pending requests. The checkpoint is created automatically when the workflow pauses:

async def run_until_pause():
    workflow = build_my_workflow()
    plan_request = None
    
    async for ev in workflow.run(task_input, stream=True):
        if ev.type == "request_info":
            plan_request = ev.data
        if ev.type == "status" and ev.state.is_idle_with_pending_requests:
            break
    
    return plan_request

Resuming from a Checkpoint

To recover, retrieve the latest checkpoint and provide responses for any pending requests:

async def resume_workflow(plan_request):
    latest_cp = await checkpoint_storage.get_latest(workflow_name="my_workflow")
    if not latest_cp:
        raise RuntimeError("No checkpoint persisted")
    
    print(f"Resuming from checkpoint {latest_cp.checkpoint_id} "
          f"at iteration {latest_cp.iteration_count}")
    
    response = plan_request.approve()  # Or any user-provided answer

    resumed_wf = build_my_workflow()
    
    async for ev in resumed_wf.run(
        checkpoint_id=latest_cp.checkpoint_id,
        responses={plan_request.request_id: response},
        stream=True
    ):
        # Process continued output events …

        pass

Time-Travel to Previous States

True time-travel involves loading an earlier checkpoint rather than just the latest one. Use list_checkpoints() to browse historical states:

async def time_travel_to_earlier_point():
    all_cps = await checkpoint_storage.list_checkpoints(workflow_name="my_workflow")
    
    # Sort by iteration count to find the earliest state

    target_cp = sorted(all_cps, key=lambda cp: cp.iteration_count)[0]
    
    wf = build_my_workflow()
    async for ev in wf.run(checkpoint_id=target_cp.checkpoint_id, stream=True):
        # Execution resumes from the selected historical state

        pass

This technique enables debugging scenarios where you want to test alternative responses or retry failed branches without recomputing earlier steps.

Storage Backend Options

The framework provides two concrete implementations of the CheckpointStorage protocol for different use cases.

FileCheckpointStorage

FileCheckpointStorage persists data to disk using atomic write operations to prevent corruption during crashes. It stores metadata as JSON and serializes complex Python objects using base-64-encoded pickle. This backend is suitable for production deployments where workflows must survive process restarts.

InMemoryCheckpointStorage

For unit tests or demonstrations, use InMemoryCheckpointStorage, which keeps all data in RAM. This avoids file I/O overhead and automatically cleans up when the process exits. It is defined in the same _checkpoint.py module and follows the identical protocol, making it a drop-in replacement for testing:

from agent_framework import InMemoryCheckpointStorage

test_storage = InMemoryCheckpointStorage()

Production Examples from the Repository

The microsoft/agent-framework repository includes several reference implementations demonstrating checkpoint patterns:

Best Practices for Checkpoint Recovery

When implementing time-travel and recovery in production systems, follow these guidelines derived from the source code analysis:

  • Maintain Deterministic Executor IDs – Ensure that agent and tool executors have stable identifiers across runs. The checkpoint relies on the graph signature hash; changing IDs breaks state restoration.

  • Validate Checkpoint IDsFileCheckpointStorage includes path-traversal protection. Never pass untrusted checkpoint IDs directly to the storage layer without validation.

  • Use Dedicated Storage Directories – Isolate checkpoint files in a specific tmp/ or checkpoint/ directory. Clean this folder between test runs to prevent stale data contamination.

  • Map Responses Correctly – When calling run(checkpoint_id=..., responses=...), the dictionary must map request_id strings to response objects. Missing entries will cause the workflow to pause again at the same point.

  • Handle Pending Requests – The runtime automatically captures request_info events in the checkpoint. If you define custom request types, ensure they inherit from WorkflowEvent so the checkpointing system recognizes them.

Summary

  • Checkpoint Recovery relies on the CheckpointStorage protocol, typically implemented via FileCheckpointStorage, to persist workflow snapshots after pending requests.

  • Time-Travel is achieved by calling storage.list_checkpoints() or get_latest() to retrieve historical states, then passing a specific checkpoint_id to workflow.run().

  • Resumption requires the responses parameter, which injects answers for pending requests stored in the checkpoint's pending_request_info_events field.

  • Storage Options include FileCheckpointStorage for production persistence and InMemoryCheckpointStorage for testing.

  • Key Files referenced include python/packages/core/agent_framework/_workflows/_checkpoint.py (core logic) and python/packages/core/agent_framework/_workflows/_runner.py (runtime resumption).

Frequently Asked Questions

What data is stored inside a WorkflowCheckpoint?

The checkpoint captures the workflow name, graph signature hash, message history, committed agent state, pending request events (including request IDs awaiting responses), the current iteration count, and metadata. According to the implementation in python/packages/core/agent_framework/_workflows/_checkpoint.py, this comprehensive snapshot ensures that execution can resume identically from the point where it paused.

How do I prevent security issues with checkpoint IDs?

FileCheckpointStorage includes a _validate_file_path method that guards against path-traversal attacks by ensuring checkpoint IDs resolve inside the designated storage directory. You should never construct checkpoint IDs from untrusted user input without validation, and you should treat checkpoint files as sensitive data since they may contain conversation history and internal state.

Can I use time-travel for debugging "what-if" scenarios?

Yes. By listing all checkpoints with storage.list_checkpoints() and sorting by iteration_count, you can select any historical state to resume from. This allows you to test alternative responses to pending requests or retry failed branches without re-executing expensive earlier steps, making it ideal for debugging complex multi-agent workflows.

What is the difference between FileCheckpointStorage and InMemoryCheckpointStorage?

FileCheckpointStorage writes JSON and base-64-encoded pickle data to disk, surviving process restarts and enabling recovery after crashes. InMemoryCheckpointStorage stores data only in RAM, making it faster for unit tests but unsuitable for production recovery scenarios. Both implement the same CheckpointStorage protocol, so you can swap them by changing the initialization line in your builder setup.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →