Implementing Message Filtering with MessageFilterAgent and PerSourceFilter in AutoGen

Use MessageFilterAgent to wrap any BaseChatAgent and apply PerSourceFilter rules that expose only specific subsets of messages—such as the last N messages from a particular source—before forwarding them to the inner agent.

Implementing message filtering with MessageFilterAgent and PerSourceFilter gives you surgical control over conversational context in the microsoft/autogen framework. This lightweight wrapper intercepts incoming chat history, applies configurable per-source rules, and passes only the filtered message slice to the wrapped agent. It is essential for managing context windows in looping multi-agent graphs and hiding irrelevant traffic from downstream participants.

Understanding the MessageFilterAgent Architecture

The filtering system consists of two primary layers: the data models that define what to filter, and the wrapper agent that executes the filtering logic.

Core Data Models: PerSourceFilter and MessageFilterConfig

At the heart of the implementation are Pydantic models defined in python/packages/autogen-agentchat/src/autogen_agentchat/agents/_message_filter_agent.py.

PerSourceFilter defines a single rule:

class PerSourceFilter(BaseModel):
    source: str                              # e.g., "user", "system", "AgentA"

    position: Optional[Literal["first", "last"]] = None
    count: Optional[int] = None
  • source: The message origin to match.
  • position: Slice from the "first" or "last" of the matched set; None keeps all.
  • count: How many messages to include when position is specified.

MessageFilterConfig aggregates rules:

class MessageFilterConfig(BaseModel):
    per_source: List[PerSourceFilter]       # Empty list blocks all messages

An empty per_source list results in all messages being blocked, which is useful for creating silent agents that only respond to specific triggers.

The Filtering Algorithm

The _apply_filter method implements the selection logic:

def _apply_filter(self, messages: Sequence[BaseChatMessage]) -> Sequence[BaseChatMessage]:
    result: List[BaseChatMessage] = []
    for source_filter in self._filter.per_source:
        msgs = [m for m in messages if m.source == source_filter.source]

        if source_filter.position == "first" and source_filter.count:
            msgs = msgs[: source_filter.count]
        elif source_filter.position == "last" and source_filter.count:
            msgs = msgs[-source_filter.count :]

        result.extend(msgs)
    return result

The algorithm preserves the original message order as it iterates through the configured rules, appending matched subsets to the result list. This ensures that downstream agents receive a coherent, chronologically consistent view of the conversation even when filtering is aggressive.

Implementing Message Filtering in Practice

Basic Usage with a Wrapped Agent

To implement message filtering, wrap any BaseChatAgent with MessageFilterAgent and supply a MessageFilterConfig. The following example, based on the test suite in python/packages/autogen-agentchat/tests/test_group_chat_graph.py, demonstrates filtering for the last user message and the first system message:

from autogen_agentchat.agents import MessageFilterAgent, MessageFilterConfig, PerSourceFilter
from autogen_agentchat.messages import TextMessage
from autogen_core import CancellationToken

# Minimal inner agent that records received messages

class EchoAgent(BaseChatAgent):
    def __init__(self, name: str):
        super().__init__(name=name, description="Echo")
        self.received_messages = []

    async def on_messages(self, messages, token: CancellationToken):
        self.received_messages.extend(messages)
        return Response()  # dummy response

inner = EchoAgent("inner")

# Wrap with filtering rules

wrapper = MessageFilterAgent(
    name="filtered",
    wrapped_agent=inner,
    filter=MessageFilterConfig(
        per_source=[
            PerSourceFilter(source="user", position="last", count=1),
            PerSourceFilter(source="system", position="first", count=1)
        ]
    ),
)

# Simulate incoming messages

msgs = [
    TextMessage(source="user", content="first user"),
    TextMessage(source="user", content="second user"),
    TextMessage(source="system", content="system init"),
    TextMessage(source="system", content="system follow-up"),
]

await wrapper.on_messages(msgs, CancellationToken())
print([m.content for m in inner.received_messages])

# Output: ['second user', 'system init']

The EchoAgent only receives 'second user' (the last user message) and 'system init' (the first system message), demonstrating how MessageFilterAgent shields the inner agent from full context.

Integrating Filters in Graph-Based Workflows

In complex multi-agent graphs, MessageFilterAgent prevents context explosion in loops. When agents pass messages cyclically, unfiltered history can grow indefinitely. By wrapping each node with specific PerSourceFilter rules, you ensure each agent sees only the messages necessary for its decision.

The following pattern, derived from test_group_chat_graph.py, shows how to configure filters for a three-agent graph where A and B loop, and B eventually exits to C:

from autogen_agentchat.teams import DiGraph, DiGraphNode, DiGraphEdge

# Agent A: sees only the first user message and the last message from B

agent_a = MessageFilterAgent(
    name="A",
    wrapped_agent=A_inner,
    filter=MessageFilterConfig(
        per_source=[
            PerSourceFilter(source="user", position="first", count=1),
            PerSourceFilter(source="B", position="last", count=1),
        ]
    ),
)

# Agent B: sees first user, last from A, and last 10 from itself (for context)

agent_b = MessageFilterAgent(
    name="B",
    wrapped_agent=B_inner,
    filter=MessageFilterConfig(
        per_source=[
            PerSourceFilter(source="user", position="first", count=1),
            PerSourceFilter(source="A", position="last", count=1),
            PerSourceFilter(source="B", position="last", count=10),
        ]
    ),
)

# Agent C: sees first user and last from B

agent_c = MessageFilterAgent(
    name="C",
    wrapped_agent=C_inner,
    filter=MessageFilterConfig(
        per_source=[
            PerSourceFilter(source="user", position="first", count=1),
            PerSourceFilter(source="B", position="last", count=1),
        ]
    ),
)

graph = DiGraph(
    nodes={
        "A": DiGraphNode(name="A", edges=[DiGraphEdge(target="B")]),
        "B": DiGraphNode(
            name="B",
            edges=[
                DiGraphEdge(target="C", condition="exit"),
                DiGraphEdge(target="A", condition="loop"),
            ],
        ),
        "C": DiGraphNode(name="C", edges=[]),
    },
    default_start_node="A",
)

This configuration ensures that even if agents A and B loop dozens of times, each invocation receives a bounded, relevant context rather than the full, ever-growing message history.

Serialization and Component Reuse

MessageFilterAgent fully supports AutoGen’s component model for persistence and graph reconstruction. You can serialize the wrapper and its inner agent, then reload them later without losing configuration.


# Dump to a component model (e.g., for persistence or graph reconstruction)

config = wrapper.dump_component()

# Later… reload the exact same wrapper (including the inner agent)

restored = MessageFilterAgent.load_component(config)

assert restored.name == wrapper.name
assert restored._filter == wrapper._filter

The dump_component() method returns a MessageFilterAgentConfig containing the wrapper’s name, the wrapped agent’s serialized component model, and the filter configuration. This enables version-controlled agent definitions and reproducible multi-agent deployments.

Key Source Files and Implementation Details

The implementation resides in the autogen-agentchat package. Refer to these files for the exact method signatures and internal logic:

File Contents
python/packages/autogen-agentchat/src/autogen_agentchat/agents/_message_filter_agent.py Defines PerSourceFilter, MessageFilterConfig, MessageFilterAgentConfig, and the MessageFilterAgent class with _apply_filter, on_messages, and component serialization methods.
python/packages/autogen-agentchat/src/autogen_agentchat/agents/__init__.py Public API exports for MessageFilterAgent, MessageFilterConfig, and PerSourceFilter.
python/packages/autogen-agentchat/tests/test_group_chat_graph.py Unit tests verifying empty filter behavior, position=None handling, and integration within DiGraph workflows.

Summary

  • MessageFilterAgent wraps any BaseChatAgent to intercept and prune message history before invocation.
  • PerSourceFilter rules specify which sources to include, with optional position (first/last) and count constraints.
  • An empty MessageFilterConfig blocks all messages, useful for creating conditional or silent agents.
  • The wrapper supports full component serialization via dump_component() and load_component(), enabling reusable, version-controlled agent graphs.
  • Integration with DiGraph workflows prevents context explosion in cyclic multi-agent topologies by ensuring each node receives only relevant, bounded history.

Frequently Asked Questions

How does MessageFilterAgent handle messages when position is set to None?

When position is None in a PerSourceFilter, the filter includes all messages from the specified source regardless of their position in the history. The count parameter is ignored in this case because the filter does not slice the result. This is useful when you want an agent to see the complete conversation from a specific participant while still filtering out noise from others.

Can I use MessageFilterAgent to block all incoming messages?

Yes. To block all messages, initialize MessageFilterAgent with a MessageFilterConfig that has an empty per_source list (per_source=[]). When the filter configuration contains no rules, the _apply_filter method returns an empty list, ensuring the wrapped agent receives no context. This pattern is useful for creating agents that respond only to internal state or specific external triggers rather than conversational history.

Does MessageFilterAgent support streaming responses?

Yes. MessageFilterAgent implements both on_messages and on_messages_stream. In the streaming case, it first applies the filter to the incoming message history using _apply_filter, then delegates to the wrapped agent's on_messages_stream method with the pruned context. The streaming iterator yields tokens from the inner agent while maintaining the filtered view of history, ensuring consistent context management regardless of whether you use synchronous or asynchronous streaming patterns.

How do I persist and reload a MessageFilterAgent configuration?

Use the component model serialization methods. Call dump_component() on a MessageFilterAgent instance to obtain a MessageFilterAgentConfig containing the wrapper's name, the serialized inner agent, and the filter rules. Later, pass this config to MessageFilterAgent.load_component() to reconstruct the exact same wrapper with its inner agent and filtering rules intact. This enables version-controlled agent definitions and reproducible deployment in production graphs.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →