# How to Implement Durable Agents with Checkpoint and Resume in the Microsoft Agent Framework

> Learn to implement durable agents with checkpoint and resume in Microsoft Agent Framework. Configure storage, enable checkpointing, and manage state for resilient agent behavior. Maximize your agent's uptime.

- Repository: [Microsoft/agent-framework](https://github.com/microsoft/agent-framework)
- Tags: how-to-guide
- Published: 2026-04-05

---

**To implement durable agents with checkpoint and resume, configure a `CheckpointStorage` backend, enable checkpointing via the `WorkflowBuilder`, and implement the `on_checkpoint_save()` and `on_checkpoint_restore()` hooks in your executors to persist internal mutable state.**

The Microsoft Agent Framework executes long-running, multi-step workflows that may wait for external input or survive infrastructure interruptions. When you implement durable agents with checkpoint and resume capabilities, you create production-grade AI systems that withstand process crashes, scaling events, and human-in-the-loop delays without losing progress.

## Understanding the Checkpoint Architecture

The checkpointing system centers on the **`WorkflowCheckpoint`** dataclass defined in [`agent_framework/_workflows/_checkpoint.py`](https://github.com/microsoft/agent-framework/blob/main/agent_framework/_workflows/_checkpoint.py). This immutable structure captures the complete workflow snapshot, including exchanged messages, executor internal state, pending request-info events, and a monotonic iteration counter.

### The WorkflowCheckpoint Data Model

According to the source code in `microsoft/agent-framework`, a `WorkflowCheckpoint` serializes:

- The full message history between executors
- Pending `request_info` events (such as human approval requests)
- The current iteration count
- Executor-specific state dictionaries

The framework emits a **`superstep_completed`** event after each execution step. This event triggers automatic serialization of the current `WorkflowCheckpoint` to your configured storage backend.

### Storage Backend Options

The framework provides two concrete implementations of the `CheckpointStorage` protocol:

- **`InMemoryCheckpointStorage`**: A volatile, dictionary-based store ideal for unit tests and ephemeral demonstrations. All checkpoints disappear when the process terminates.
- **`FileCheckpointStorage`**: A durable backend that persists checkpoints as JSON files to the local filesystem. This guarantees survival across process restarts and server failures.

For production deployments, `FileCheckpointStorage` is the recommended approach. It writes metadata as JSON while handling complex state objects via pickle, ensuring durability without sacrificing flexibility.

## Configuring Checkpointing in Your Workflow

You enable checkpointing by passing a storage instance to your `WorkflowBuilder`. The builder automatically wires the necessary runtime hooks that trigger `on_checkpoint_save` and `on_checkpoint_restore` calls on every executor.

```python
from agent_framework import WorkflowBuilder, InMemoryCheckpointStorage

storage = InMemoryCheckpointStorage()

builder = WorkflowBuilder(
    start_executor=CounterExecutor("counter"),
    checkpoint_storage=storage
)
workflow = builder.build()

```

When checkpointing is enabled, the runtime manages persistence automatically. You do not manually call save methods; the framework handles serialization after each superstep completion.

## Persisting Executor State with Hooks

Executors that maintain internal mutable state must implement two asynchronous methods to survive restarts:

- **`on_checkpoint_save()`**: Returns a dictionary representing the executor's current state
- **`on_checkpoint_restore(state)`**: Receives the stored dictionary and restores internal variables

The following `CounterExecutor` demonstrates the pattern as implemented in [`samples/03-workflows/checkpoint/checkpoint_with_resume.py`](https://github.com/microsoft/agent-framework/blob/main/samples/03-workflows/checkpoint/checkpoint_with_resume.py):

```python
class CounterExecutor(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
        self._counter = 0

    @handler
    async def count(self, _: None, ctx: WorkflowContext[None]) -> None:
        self._counter += 1
        print(f"counter = {self._counter}")
        if self._counter < 5:
            await ctx.send_message(None)
        else:
            await ctx.yield_output(self._counter)

    async def on_checkpoint_save(self) -> dict:
        return {"counter": self._counter}

    async def on_checkpoint_restore(self, state: dict) -> None:
        self._counter = state.get("counter", 0)

```

Without these hooks, `_counter` would reset to zero after every resume, causing infinite loops or lost progress.

## Production Pattern: File-Backed Storage and Human-in-the-Loop

For real-world scenarios requiring durability and human approval, combine `FileCheckpointStorage` with the Magentic orchestration framework. This pattern handles plan-review requests that pause execution until manual approval arrives.

### Setting Up Durable Storage

First, instantiate `FileCheckpointStorage` with a persistent directory:

```python
from pathlib import Path
from agent_framework import FileCheckpointStorage

CHECKPOINT_DIR = Path("./checkpoints")
storage = FileCheckpointStorage(CHECKPOINT_DIR)

```

### Handling Plan Review Requests

The Magentic orchestrator automatically stores pending `MagenticPlanReviewRequest` objects in the checkpoint when `enable_plan_review=True`. The workflow enters an `IDLE_WITH_PENDING_REQUESTS` state, allowing you to capture the checkpoint ID:

```python
from agent_framework.orchestrations import MagenticBuilder, MagenticPlanReviewRequest

def build_workflow(storage):
    return MagenticBuilder(
        participants=[researcher, writer],
        enable_plan_review=True,
        checkpoint_storage=storage,
        manager_agent=manager,
    ).build()

# Run until human review is needed

plan_request = None
async for ev in workflow.run("Draft a brief...", stream=True):
    if ev.type == "request_info" and isinstance(ev.data, MagenticPlanReviewRequest):
        plan_request = ev.data
    if ev.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS:
        break

# Retrieve the checkpoint

checkpoint = await storage.get_latest(workflow_name=workflow.name)

```

### Resuming with External Responses

To resume, reconstruct the workflow with the same storage configuration and pass the `checkpoint_id` and a `responses` dictionary mapping the request ID to the approval:

```python
approval = plan_request.approve()

async for ev in workflow.run(
    checkpoint_id=checkpoint.checkpoint_id,
    stream=True,
    responses={plan_request.request_id: approval}
):
    if ev.type == "output":
        print("Final result:", ev.data)

```

This pattern appears in full in [`samples/03-workflows/orchestrations/magentic_checkpoint.py`](https://github.com/microsoft/agent-framework/blob/main/samples/03-workflows/orchestrations/magentic_checkpoint.py), demonstrating end-to-end durability across process restarts.

## Advanced: Custom Executor State Management

Complex executors may cache intermediate computations or maintain lookup tables. The `WorkerExecutor` from [`samples/03-workflows/checkpoint/checkpoint_with_resume.py`](https://github.com/microsoft/agent-framework/blob/main/samples/03-workflows/checkpoint/checkpoint_with_resume.py) shows how to persist a dictionary of computed factor pairs:

```python
class WorkerExecutor(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
        self._pairs: dict[int, list[tuple[int, int]]] = {}

    @handler
    async def compute(self, task: ComputeTask, ctx: WorkflowContext[ComputeTask]):
        n = task.remaining_numbers.pop(0)
        self._pairs[n] = [(i, n // i) for i in range(1, n) if n % i == 0]

        if task.remaining_numbers:
            await ctx.send_message(task)
        else:
            await ctx.yield_output(self._pairs)

    async def on_checkpoint_save(self) -> dict:
        return {"pairs": self._pairs}

    async def on_checkpoint_restore(self, state: dict) -> None:
        self._pairs = state.get("pairs", {})

```

The framework calls these hooks atomically during checkpoint write and restore operations, ensuring that partially computed results survive crashes without requiring recomputation.

## Summary

Implementing durable agents in the Microsoft Agent Framework requires three core components:

- **Immutable snapshots**: The `WorkflowCheckpoint` dataclass in [`agent_framework/_workflows/_checkpoint.py`](https://github.com/microsoft/agent-framework/blob/main/agent_framework/_workflows/_checkpoint.py) captures complete workflow state, including messages and pending requests.
- **Persistent storage**: Use `FileCheckpointStorage` for production durability or `InMemoryCheckpointStorage` for testing.
- **State hooks**: Implement `on_checkpoint_save()` and `on_checkpoint_restore()` in every executor that maintains internal counters, caches, or accumulators.
- **Resume capability**: Restart workflows using `workflow.run(checkpoint_id="...", responses={...})` to inject external inputs like human approvals.

## Frequently Asked Questions

### What data is included in a WorkflowCheckpoint?

A `WorkflowCheckpoint` contains the entire message history between executors, the current iteration count, executor-specific state dictionaries returned by `on_checkpoint_save()`, and any pending `request_info` events such as `MagenticPlanReviewRequest` objects awaiting human input. According to [`agent_framework/_workflows/_checkpoint.py`](https://github.com/microsoft/agent-framework/blob/main/agent_framework/_workflows/_checkpoint.py), this snapshot is sufficient to reconstruct the exact runtime state at the moment of the last superstep completion.

### How do I resume a workflow after a crash or intentional pause?

Retrieve the latest checkpoint using `await storage.get_latest(workflow_name=workflow.name)`, then call `workflow.run(checkpoint_id=cp.checkpoint_id, stream=True)`. If resuming a human-in-the-loop flow, pass a `responses` dictionary mapping request IDs to approval objects. The framework restores executor state via `on_checkpoint_restore()` and continues execution from the exact instruction where it paused.

### Which storage backend should I use for production workloads?

Use **`FileCheckpointStorage`** for production. Defined in [`agent_framework/_workflows/_checkpoint.py`](https://github.com/microsoft/agent-framework/blob/main/agent_framework/_workflows/_checkpoint.py), this backend writes JSON metadata and pickled state to the local filesystem, ensuring checkpoints survive process restarts and server failures. `InMemoryCheckpointStorage` is suitable only for unit tests or ephemeral demonstrations where persistence is not required.

### Do executor IDs need to be deterministic for checkpointing to work?

Yes. Executors must use stable, deterministic IDs (such as `"counter"` or `"researcher"`) rather than randomly generated UUIDs. The restore mechanism reinstantiates executors by ID; if IDs change between runs, the framework cannot match saved state to the correct executor instance, causing `on_checkpoint_restore()` to fail or target the wrong component.