How to Implement Time-Travel and Checkpoint Recovery in Agent Framework Workflows
Agent Framework workflows support time-travel and checkpoint recovery through the CheckpointStorage protocol, which persists workflow state snapshots and allows you to resume execution from any previous point by passing a checkpoint_id and optional responses to the run() method.
The microsoft/agent-framework provides built-in checkpointing capabilities that enable you to pause workflows at pending requests, save their complete state, and resume later—even after application restarts. This article explains how to implement time-travel and checkpoint recovery using the FileCheckpointStorage class and the checkpoint_storage parameter available in workflow builders like MagenticBuilder and WorkflowBuilder.
How Checkpointing Works
At the core of the system lies the CheckpointStorage protocol defined in python/packages/core/agent_framework/_workflows/_checkpoint.py. When you wire a storage implementation into a workflow via the checkpoint_storage argument, the runtime automatically creates checkpoints whenever the graph reaches a stable state, such as after emitting a pending request.
The Core Components
The checkpointing architecture relies on three primary abstractions:
-
WorkflowCheckpoint– A dataclass capturing the full execution state including workflow name, graph signature hash, messages, committed state, pending request events, iteration count, and metadata. According to the source code inpython/packages/core/agent_framework/_workflows/_checkpoint.py, this structure ensures that every aspect of the workflow's memory is serialized. -
CheckpointStorage– An abstract protocol that storage backends must implement. It defines methodssave,load,list_checkpoints,delete,get_latest, andlist_checkpoint_idsthat the runtime calls automatically. -
FileCheckpointStorage– A production-ready implementation that persists checkpoints to JSON files with base-64-encoded pickle payloads for complex objects. It guarantees atomic writes and validates that checkpoint IDs remain inside the designated storage directory to prevent path-traversal attacks.
The Checkpoint Lifecycle
When running a workflow with checkpointing enabled, the framework follows this sequence:
-
Initialization – You create a storage instance (e.g.,
FileCheckpointStorage(Path("tmp/checkpoints"))) and pass it to your workflow builder. -
Automatic Persistence – The runtime pauses when a request (such as
MagenticPlanReviewRequest) is emitted. The pending request information is stored inside the checkpoint underpending_request_info_events, and the storage'ssave()method writes a JSON file (<id>.json). -
Inspection – You can retrieve the latest checkpoint via
storage.get_latest(workflow_name=...)or list all historical checkpoints withstorage.list_checkpoints()to enable time-travel scenarios. -
Resumption – You call
workflow.run(checkpoint_id=..., responses={request_id: response}, stream=True). The runtime insidepython/packages/core/agent_framework/_workflows/_runner.pyloads the snapshot, re-hydrates the graph, and injects the supplied responses, allowing the workflow to continue from the saved iteration.
Implementing Checkpoint Recovery
To add fault tolerance to your workflows, you need to configure storage, run until a pause point, and then resume with user input.
Setting Up FileCheckpointStorage
First, instantiate the storage backend and ensure the directory exists:
from pathlib import Path
from agent_framework import FileCheckpointStorage
CHECKPOINT_DIR = Path(__file__).parent / "tmp" / "my_checkpoints"
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)
checkpoint_storage = FileCheckpointStorage(CHECKPOINT_DIR)
Pass this instance to your workflow builder. The following example uses a generic MyWorkflowBuilder, which could be MagenticBuilder or any custom implementation:
def build_my_workflow():
# … create your agents and tools …
return MyWorkflowBuilder(
participants=[agent_a, agent_b],
checkpoint_storage=checkpoint_storage, # Required for persistence
max_round_count=10,
).build()
Capturing Checkpoints During Execution
Run the workflow and listen for pending requests. The checkpoint is created automatically when the workflow pauses:
async def run_until_pause():
workflow = build_my_workflow()
plan_request = None
async for ev in workflow.run(task_input, stream=True):
if ev.type == "request_info":
plan_request = ev.data
if ev.type == "status" and ev.state.is_idle_with_pending_requests:
break
return plan_request
Resuming from a Checkpoint
To recover, retrieve the latest checkpoint and provide responses for any pending requests:
async def resume_workflow(plan_request):
latest_cp = await checkpoint_storage.get_latest(workflow_name="my_workflow")
if not latest_cp:
raise RuntimeError("No checkpoint persisted")
print(f"Resuming from checkpoint {latest_cp.checkpoint_id} "
f"at iteration {latest_cp.iteration_count}")
response = plan_request.approve() # Or any user-provided answer
resumed_wf = build_my_workflow()
async for ev in resumed_wf.run(
checkpoint_id=latest_cp.checkpoint_id,
responses={plan_request.request_id: response},
stream=True
):
# Process continued output events …
pass
Time-Travel to Previous States
True time-travel involves loading an earlier checkpoint rather than just the latest one. Use list_checkpoints() to browse historical states:
async def time_travel_to_earlier_point():
all_cps = await checkpoint_storage.list_checkpoints(workflow_name="my_workflow")
# Sort by iteration count to find the earliest state
target_cp = sorted(all_cps, key=lambda cp: cp.iteration_count)[0]
wf = build_my_workflow()
async for ev in wf.run(checkpoint_id=target_cp.checkpoint_id, stream=True):
# Execution resumes from the selected historical state
pass
This technique enables debugging scenarios where you want to test alternative responses or retry failed branches without recomputing earlier steps.
Storage Backend Options
The framework provides two concrete implementations of the CheckpointStorage protocol for different use cases.
FileCheckpointStorage
FileCheckpointStorage persists data to disk using atomic write operations to prevent corruption during crashes. It stores metadata as JSON and serializes complex Python objects using base-64-encoded pickle. This backend is suitable for production deployments where workflows must survive process restarts.
InMemoryCheckpointStorage
For unit tests or demonstrations, use InMemoryCheckpointStorage, which keeps all data in RAM. This avoids file I/O overhead and automatically cleans up when the process exits. It is defined in the same _checkpoint.py module and follows the identical protocol, making it a drop-in replacement for testing:
from agent_framework import InMemoryCheckpointStorage
test_storage = InMemoryCheckpointStorage()
Production Examples from the Repository
The microsoft/agent-framework repository includes several reference implementations demonstrating checkpoint patterns:
-
python/samples/03-workflows/orchestrations/magentic_checkpoint.py– Demonstrates end-to-end runs with pause points for plan review, resumption with user feedback, and jumping to later checkpoints. -
python/samples/03-workflows/orchestrations/handoff_with_tool_approval_checkpoint_resume.py– Shows checkpointing around tool-approval requests and feeding user responses on restore. -
python/samples/03-workflows/checkpoint/workflow_as_agent_checkpoint.py– Illustrates how to wrap a workflow inside an Agent while persisting checkpoints and preserving conversation thread history.
Best Practices for Checkpoint Recovery
When implementing time-travel and recovery in production systems, follow these guidelines derived from the source code analysis:
-
Maintain Deterministic Executor IDs – Ensure that agent and tool executors have stable identifiers across runs. The checkpoint relies on the graph signature hash; changing IDs breaks state restoration.
-
Validate Checkpoint IDs –
FileCheckpointStorageincludes path-traversal protection. Never pass untrusted checkpoint IDs directly to the storage layer without validation. -
Use Dedicated Storage Directories – Isolate checkpoint files in a specific
tmp/orcheckpoint/directory. Clean this folder between test runs to prevent stale data contamination. -
Map Responses Correctly – When calling
run(checkpoint_id=..., responses=...), the dictionary must map request_id strings to response objects. Missing entries will cause the workflow to pause again at the same point. -
Handle Pending Requests – The runtime automatically captures
request_infoevents in the checkpoint. If you define custom request types, ensure they inherit fromWorkflowEventso the checkpointing system recognizes them.
Summary
-
Checkpoint Recovery relies on the
CheckpointStorageprotocol, typically implemented viaFileCheckpointStorage, to persist workflow snapshots after pending requests. -
Time-Travel is achieved by calling
storage.list_checkpoints()orget_latest()to retrieve historical states, then passing a specificcheckpoint_idtoworkflow.run(). -
Resumption requires the
responsesparameter, which injects answers for pending requests stored in the checkpoint'spending_request_info_eventsfield. -
Storage Options include
FileCheckpointStoragefor production persistence andInMemoryCheckpointStoragefor testing. -
Key Files referenced include
python/packages/core/agent_framework/_workflows/_checkpoint.py(core logic) andpython/packages/core/agent_framework/_workflows/_runner.py(runtime resumption).
Frequently Asked Questions
What data is stored inside a WorkflowCheckpoint?
The checkpoint captures the workflow name, graph signature hash, message history, committed agent state, pending request events (including request IDs awaiting responses), the current iteration count, and metadata. According to the implementation in python/packages/core/agent_framework/_workflows/_checkpoint.py, this comprehensive snapshot ensures that execution can resume identically from the point where it paused.
How do I prevent security issues with checkpoint IDs?
FileCheckpointStorage includes a _validate_file_path method that guards against path-traversal attacks by ensuring checkpoint IDs resolve inside the designated storage directory. You should never construct checkpoint IDs from untrusted user input without validation, and you should treat checkpoint files as sensitive data since they may contain conversation history and internal state.
Can I use time-travel for debugging "what-if" scenarios?
Yes. By listing all checkpoints with storage.list_checkpoints() and sorting by iteration_count, you can select any historical state to resume from. This allows you to test alternative responses to pending requests or retry failed branches without re-executing expensive earlier steps, making it ideal for debugging complex multi-agent workflows.
What is the difference between FileCheckpointStorage and InMemoryCheckpointStorage?
FileCheckpointStorage writes JSON and base-64-encoded pickle data to disk, surviving process restarts and enabling recovery after crashes. InMemoryCheckpointStorage stores data only in RAM, making it faster for unit tests but unsuitable for production recovery scenarios. Both implement the same CheckpointStorage protocol, so you can swap them by changing the initialization line in your builder setup.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →