# How to Implement Custom Middleware in Agent Framework for Request/Response Interception

> Learn to implement custom middleware in Agent Framework for request/response interception. Subclass base classes, process context, and register your middleware to enhance your agents.

- Repository: [Microsoft/agent-framework](https://github.com/microsoft/agent-framework)
- Tags: how-to-guide
- Published: 2026-04-05

---

**To implement custom middleware in Agent Framework, subclass the appropriate abstract base class (`AgentMiddleware`, `FunctionMiddleware`, or `ChatMiddleware`) from [`agent_framework/_middleware.py`](https://github.com/microsoft/agent-framework/blob/main/agent_framework/_middleware.py), implement the async `process(context, call_next)` method to mutate the context before and after `await call_next()`, and register the instance in the `middleware=` list when constructing an Agent or ChatClient.**

The `microsoft/agent-framework` provides three extensible middleware pipelines that let you intercept and modify requests and responses at different architectural layers. By implementing custom middleware, you can inject logging, caching, policy enforcement, and message transformations without altering the core agent logic. All middleware types follow a purely declarative pattern where communication happens through a mutable `context` object rather than return values.

## Understanding the Three Middleware Layers

Agent Framework defines three distinct middleware abstractions in [`python/packages/core/agent_framework/_middleware.py`](https://github.com/microsoft/agent-framework/blob/main/python/packages/core/agent_framework/_middleware.py) (lines 357-425):

- **`AgentMiddleware`** – Intercepts agent invocations using `AgentContext`. Use this to manipulate the agent’s context, inject system prompts, implement retry logic, or override results before the LLM call executes.

- **`FunctionMiddleware`** – Intercepts tool or function executions using `FunctionInvocationContext`. Use this to cache function results, validate arguments, or enforce policies before a tool runs.

- **`ChatMiddleware`** – Intercepts chat client requests using `ChatContext`. Use this to adjust message lists, add headers, log requests and responses, or short-circuit chat calls entirely.

Each layer operates on a specific context type, but all share the same implementation pattern.

## Implementing the process Method

Create a subclass and implement the asynchronous `process` method with the signature `async def process(self, context, call_next)`. The `call_next` parameter is a callable that returns an awaitable (`Callable[[], Awaitable[None]]`), and `context` is a mutable object specific to the middleware layer (e.g., `AgentContext`).

### Pre-processing and Post-processing Pattern

The standard implementation follows a four-step onion pattern:

1. **Pre-processing** – Read or mutate `context` attributes (e.g., `context.messages`, `context.metadata`).
2. **Await next** – Call `await call_next()` to hand execution to the next middleware or final executor.
3. **Post-processing** – Read or mutate `context.result` after the underlying call completes.
4. **Silent return** – Return nothing; all communication persists in the mutated `context`.

```python
from agent_framework import AgentMiddleware, AgentContext
from typing import Awaitable, Callable

class LoggingMiddleware(AgentMiddleware):
    async def process(self, context: AgentContext, call_next: Callable[[], Awaitable[None]]):
        # Pre-processing: inspect incoming request

        print(f"[Middleware] Processing {len(context.messages)} messages")
        
        # Forward to next layer/agent

        await call_next()
        
        # Post-processing: inspect result

        print(f"[Middleware] Result type: {type(context.result)}")

```

### Early Termination with MiddlewareTermination

To short-circuit the pipeline and prevent further execution, raise `MiddlewareTermination()` after setting `context.result` to your desired value:

```python
from agent_framework import AgentMiddleware, MiddlewareTermination, AgentResponse

class BlocklistMiddleware(AgentMiddleware):
    async def process(self, context, call_next):
        if any("blocked" in msg.contents for msg in context.messages):
            context.result = AgentResponse(error="Message blocked by policy")
            raise MiddlewareTermination()  # Stops pipeline immediately

        await call_next()

```

## Registering Middleware with Agents

Attach middleware instances via the `middleware` parameter when constructing an `Agent`, `ChatClient`, or tool wrapper. The framework stores these in a pipeline implemented in [`python/packages/core/agent_framework/_middleware.py`](https://github.com/microsoft/agent-framework/blob/main/python/packages/core/agent_framework/_middleware.py) (lines 735-756).

```python
from agent_framework import Agent, OpenAIClient

client = OpenAIClient(model="gpt-4o-mini")
agent = Agent(
    client=client,
    name="assistant",
    middleware=[LoggingMiddleware(), BlocklistMiddleware()],
)

```

**Execution order matters.** Pre-processing runs in the order provided in the list, while post-processing runs in reverse order. If you register `[A, B]`, the execution flow is: A’s pre-processing → B’s pre-processing → agent execution → B’s post-processing → A’s post-processing.

## Handling Streaming Responses

When `context.stream` is `True`, `context.result` becomes an async iterable. To transform streaming chunks in real-time, append async functions to `context.stream_transform_hooks` as defined in the `AgentContext` documentation (lines 46-55 in [`_middleware.py`](https://github.com/microsoft/agent-framework/blob/main/_middleware.py)):

```python
class UpperCaseStreamMiddleware(AgentMiddleware):
    async def process(self, context, call_next):
        if context.stream:
            async def upper(chunk):
                chunk.content = chunk.content.upper()
                return chunk
            context.stream_transform_hooks.append(upper)
        await call_next()

```

For non-streaming modifications, simply mutate `context.result` in the post-processing phase after `await call_next()` completes.

## Complete Working Example

Below is a runnable example demonstrating a caching middleware that injects system prompts and short-circuits duplicate requests. This pattern is adapted from the shared-state sample in [`python/samples/02-agents/middleware/shared_state_middleware.py`](https://github.com/microsoft/agent-framework/blob/main/python/samples/02-agents/middleware/shared_state_middleware.py).

```python
import hashlib
from agent_framework import (
    Agent,
    AgentMiddleware,
    AgentContext,
    AgentResponse,
    Message,
    MiddlewareTermination,
    OpenAIClient,
)
from typing import Awaitable, Callable

class CacheMiddleware(AgentMiddleware):
    def __init__(self):
        self._cache: dict[str, str] = {}
    
    async def process(self, context: AgentContext, call_next: Callable[[], Awaitable[None]]):
        # Generate cache key from message contents

        content_str = "".join(str(m.contents[0]) for m in context.messages)
        key = hashlib.sha256(content_str.encode()).hexdigest()
        
        # Pre-processing: check cache

        if key in self._cache:
            context.result = AgentResponse(contents=[self._cache[key]])
            raise MiddlewareTermination()
        
        # Inject system prompt if missing

        if not any(m.role == "system" for m in context.messages):
            context.messages.insert(0, Message(role="system", contents=["You are a helpful assistant."]))
        
        await call_next()
        
        # Post-processing: store result

        if context.result and hasattr(context.result, "contents"):
            self._cache[key] = str(context.result.contents[0])

# Usage

client = OpenAIClient(model="gpt-4o-mini")
agent = Agent(
    client=client,
    name="cached_assistant",
    middleware=[CacheMiddleware()],
)

# First call hits the LLM; subsequent identical calls return cached results

messages = [Message(role="user", contents=["What is the capital of France?"])]
response = await agent.run(messages=messages)

```

## Summary

- **Subclass the appropriate ABC** – Choose `AgentMiddleware`, `FunctionMiddleware`, or `ChatMiddleware` from [`python/packages/core/agent_framework/_middleware.py`](https://github.com/microsoft/agent-framework/blob/main/python/packages/core/agent_framework/_middleware.py) based on which layer you need to intercept.

- **Implement `process(context, call_next)`** – Perform pre-processing mutations, await `call_next()`, then perform post-processing mutations on `context.result`.

- **Communicate via context** – Modify the mutable `context` object directly; the `process` method returns no values.

- **Register declaratively** – Pass instances to the `middleware=` parameter when constructing Agents or ChatClients; order determines execution sequence.

- **Handle advanced scenarios** – Raise `MiddlewareTermination` for early exits, and use `context.stream_transform_hooks` to manipulate streaming responses chunk by chunk.

For implementation details, examine the abstract definitions in [`python/packages/core/agent_framework/_middleware.py`](https://github.com/microsoft/agent-framework/blob/main/python/packages/core/agent_framework/_middleware.py) and the unit tests in [`python/packages/core/tests/core/test_middleware.py`](https://github.com/microsoft/agent-framework/blob/main/python/packages/core/tests/core/test_middleware.py) which illustrate ordering, termination, and result-override behaviors.

## Frequently Asked Questions

### What is the difference between AgentMiddleware and FunctionMiddleware?

**`AgentMiddleware`** operates on `AgentContext` and intercepts the entire agent invocation, allowing you to modify messages before they reach the LLM and transform the final response. **`FunctionMiddleware`** operates on `FunctionInvocationContext` and wraps individual tool or function executions, enabling you to validate arguments, cache tool outputs, or enforce access policies at the tool level.

### How do I stop request processing early from within middleware?

Raise **`MiddlewareTermination()`** after setting `context.result` to your desired output value. This immediately halts the pipeline, skips all remaining middleware and the underlying executor, and returns the result you placed in `context.result` to the caller.

### Can I modify streaming responses in real-time?

Yes. When `context.stream` is `True`, append async transformation functions to **`context.stream_transform_hooks`**. Each hook receives and returns a chunk, allowing you to mutate content, add metadata, or filter tokens as the stream flows through the pipeline. For post-stream processing, inspect `context.result` after `await call_next()` completes.

### In what order does multiple middleware execute?

Pre-processing stages execute in the order you provide in the `middleware=` list. Post-processing stages execute in **reverse order**, creating an onion-like pattern where the first middleware to see the request is the last to see the response. This ensures symmetric resource management (e.g., logging, timing) when wrapping calls.