How to Implement Custom Middleware in Agent Framework for Request/Response Interception
To implement custom middleware in Agent Framework, subclass the appropriate abstract base class (AgentMiddleware, FunctionMiddleware, or ChatMiddleware) from agent_framework/_middleware.py, implement the async process(context, call_next) method to mutate the context before and after await call_next(), and register the instance in the middleware= list when constructing an Agent or ChatClient.
The microsoft/agent-framework provides three extensible middleware pipelines that let you intercept and modify requests and responses at different architectural layers. By implementing custom middleware, you can inject logging, caching, policy enforcement, and message transformations without altering the core agent logic. All middleware types follow a purely declarative pattern where communication happens through a mutable context object rather than return values.
Understanding the Three Middleware Layers
Agent Framework defines three distinct middleware abstractions in python/packages/core/agent_framework/_middleware.py (lines 357-425):
-
AgentMiddleware– Intercepts agent invocations usingAgentContext. Use this to manipulate the agent’s context, inject system prompts, implement retry logic, or override results before the LLM call executes. -
FunctionMiddleware– Intercepts tool or function executions usingFunctionInvocationContext. Use this to cache function results, validate arguments, or enforce policies before a tool runs. -
ChatMiddleware– Intercepts chat client requests usingChatContext. Use this to adjust message lists, add headers, log requests and responses, or short-circuit chat calls entirely.
Each layer operates on a specific context type, but all share the same implementation pattern.
Implementing the process Method
Create a subclass and implement the asynchronous process method with the signature async def process(self, context, call_next). The call_next parameter is a callable that returns an awaitable (Callable[[], Awaitable[None]]), and context is a mutable object specific to the middleware layer (e.g., AgentContext).
Pre-processing and Post-processing Pattern
The standard implementation follows a four-step onion pattern:
- Pre-processing – Read or mutate
contextattributes (e.g.,context.messages,context.metadata). - Await next – Call
await call_next()to hand execution to the next middleware or final executor. - Post-processing – Read or mutate
context.resultafter the underlying call completes. - Silent return – Return nothing; all communication persists in the mutated
context.
from agent_framework import AgentMiddleware, AgentContext
from typing import Awaitable, Callable
class LoggingMiddleware(AgentMiddleware):
async def process(self, context: AgentContext, call_next: Callable[[], Awaitable[None]]):
# Pre-processing: inspect incoming request
print(f"[Middleware] Processing {len(context.messages)} messages")
# Forward to next layer/agent
await call_next()
# Post-processing: inspect result
print(f"[Middleware] Result type: {type(context.result)}")
Early Termination with MiddlewareTermination
To short-circuit the pipeline and prevent further execution, raise MiddlewareTermination() after setting context.result to your desired value:
from agent_framework import AgentMiddleware, MiddlewareTermination, AgentResponse
class BlocklistMiddleware(AgentMiddleware):
async def process(self, context, call_next):
if any("blocked" in msg.contents for msg in context.messages):
context.result = AgentResponse(error="Message blocked by policy")
raise MiddlewareTermination() # Stops pipeline immediately
await call_next()
Registering Middleware with Agents
Attach middleware instances via the middleware parameter when constructing an Agent, ChatClient, or tool wrapper. The framework stores these in a pipeline implemented in python/packages/core/agent_framework/_middleware.py (lines 735-756).
from agent_framework import Agent, OpenAIClient
client = OpenAIClient(model="gpt-4o-mini")
agent = Agent(
client=client,
name="assistant",
middleware=[LoggingMiddleware(), BlocklistMiddleware()],
)
Execution order matters. Pre-processing runs in the order provided in the list, while post-processing runs in reverse order. If you register [A, B], the execution flow is: A’s pre-processing → B’s pre-processing → agent execution → B’s post-processing → A’s post-processing.
Handling Streaming Responses
When context.stream is True, context.result becomes an async iterable. To transform streaming chunks in real-time, append async functions to context.stream_transform_hooks as defined in the AgentContext documentation (lines 46-55 in _middleware.py):
class UpperCaseStreamMiddleware(AgentMiddleware):
async def process(self, context, call_next):
if context.stream:
async def upper(chunk):
chunk.content = chunk.content.upper()
return chunk
context.stream_transform_hooks.append(upper)
await call_next()
For non-streaming modifications, simply mutate context.result in the post-processing phase after await call_next() completes.
Complete Working Example
Below is a runnable example demonstrating a caching middleware that injects system prompts and short-circuits duplicate requests. This pattern is adapted from the shared-state sample in python/samples/02-agents/middleware/shared_state_middleware.py.
import hashlib
from agent_framework import (
Agent,
AgentMiddleware,
AgentContext,
AgentResponse,
Message,
MiddlewareTermination,
OpenAIClient,
)
from typing import Awaitable, Callable
class CacheMiddleware(AgentMiddleware):
def __init__(self):
self._cache: dict[str, str] = {}
async def process(self, context: AgentContext, call_next: Callable[[], Awaitable[None]]):
# Generate cache key from message contents
content_str = "".join(str(m.contents[0]) for m in context.messages)
key = hashlib.sha256(content_str.encode()).hexdigest()
# Pre-processing: check cache
if key in self._cache:
context.result = AgentResponse(contents=[self._cache[key]])
raise MiddlewareTermination()
# Inject system prompt if missing
if not any(m.role == "system" for m in context.messages):
context.messages.insert(0, Message(role="system", contents=["You are a helpful assistant."]))
await call_next()
# Post-processing: store result
if context.result and hasattr(context.result, "contents"):
self._cache[key] = str(context.result.contents[0])
# Usage
client = OpenAIClient(model="gpt-4o-mini")
agent = Agent(
client=client,
name="cached_assistant",
middleware=[CacheMiddleware()],
)
# First call hits the LLM; subsequent identical calls return cached results
messages = [Message(role="user", contents=["What is the capital of France?"])]
response = await agent.run(messages=messages)
Summary
-
Subclass the appropriate ABC – Choose
AgentMiddleware,FunctionMiddleware, orChatMiddlewarefrompython/packages/core/agent_framework/_middleware.pybased on which layer you need to intercept. -
Implement
process(context, call_next)– Perform pre-processing mutations, awaitcall_next(), then perform post-processing mutations oncontext.result. -
Communicate via context – Modify the mutable
contextobject directly; theprocessmethod returns no values. -
Register declaratively – Pass instances to the
middleware=parameter when constructing Agents or ChatClients; order determines execution sequence. -
Handle advanced scenarios – Raise
MiddlewareTerminationfor early exits, and usecontext.stream_transform_hooksto manipulate streaming responses chunk by chunk.
For implementation details, examine the abstract definitions in python/packages/core/agent_framework/_middleware.py and the unit tests in python/packages/core/tests/core/test_middleware.py which illustrate ordering, termination, and result-override behaviors.
Frequently Asked Questions
What is the difference between AgentMiddleware and FunctionMiddleware?
AgentMiddleware operates on AgentContext and intercepts the entire agent invocation, allowing you to modify messages before they reach the LLM and transform the final response. FunctionMiddleware operates on FunctionInvocationContext and wraps individual tool or function executions, enabling you to validate arguments, cache tool outputs, or enforce access policies at the tool level.
How do I stop request processing early from within middleware?
Raise MiddlewareTermination() after setting context.result to your desired output value. This immediately halts the pipeline, skips all remaining middleware and the underlying executor, and returns the result you placed in context.result to the caller.
Can I modify streaming responses in real-time?
Yes. When context.stream is True, append async transformation functions to context.stream_transform_hooks. Each hook receives and returns a chunk, allowing you to mutate content, add metadata, or filter tokens as the stream flows through the pipeline. For post-stream processing, inspect context.result after await call_next() completes.
In what order does multiple middleware execute?
Pre-processing stages execute in the order you provide in the middleware= list. Post-processing stages execute in reverse order, creating an onion-like pattern where the first middleware to see the request is the last to see the response. This ensures symmetric resource management (e.g., logging, timing) when wrapping calls.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →