How to Build Agent-to-Agent (A2A) Communication Protocols in the Microsoft Agent Framework
The Microsoft Agent Framework enables seamless agent-to-agent communication by wrapping any A2A-compatible service as a standard BaseAgent using the A2AAgent class, which automatically handles JSON-RPC/SSE protocol translation, bidirectional message conversion, and long-running task management via continuation tokens.
The framework provides a first-class A2A integration layer that abstracts the underlying wire protocol while preserving full access to streaming and asynchronous capabilities. By implementing the A2AAgent class located in python/packages/a2a/agent_framework_a2a/_agent.py, you can integrate remote A2A services into your agentic workflows without managing raw HTTP connections or JSON-RPC payloads manually.
Architecture of the A2A Integration Layer
The A2A implementation centers on the A2AAgent class, which acts as a bidirectional adapter between the framework's native Message/Content model and the A2A JSON-RPC / SSE protocol. Internally, A2AAgent instantiates an a2a.client.Client to manage transport connections and delegates message serialization to two critical private methods:
_prepare_message_for_a2a– Converts frameworkMessageobjects intoA2AMessageinstances, handling text content, URI references, data blobs, and hosted file attachments._parse_contents_from_a2a– Transforms A2A Parts back into frameworkContentobjects, ensuring type-safe deserialization of remote responses.
For long-running operations, the framework utilizes the A2AContinuationToken dataclass (defined in the same _agent.py file) to persist task_id and context_id state. This enables durable task execution across network interruptions or process restarts.
Implementing Agent-to-Agent Communication
Discovering Remote Agents with A2ACardResolver
Before establishing communication, you must resolve the remote agent's capabilities via the AgentCard endpoint (/.well-known/agent.json). The A2ACardResolver utility fetches this metadata to configure transport and authentication parameters:
import os
import asyncio
import httpx
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent
async def discover_and_connect():
async with httpx.AsyncClient() as http:
resolver = A2ACardResolver(
httpx_client=http,
base_url=os.getenv("A2A_AGENT_HOST")
)
card = await resolver.get_agent_card()
async with A2AAgent(
name=card.name,
description=card.description,
agent_card=card,
url=os.getenv("A2A_AGENT_HOST"),
) as agent:
response = await agent.run("What are your capabilities?")
for msg in response.messages:
print(msg.text)
asyncio.run(discover_and_connect())
The A2AAgent constructor requires the resolved AgentCard and target URL. When used as an async context manager, it automatically manages the underlying httpx.AsyncClient lifecycle and connection pooling.
Basic Synchronous Calls
By default, the run() method blocks until the remote A2A server reaches a terminal task state (completed, failed, or canceled). This mode is optimal for short-lived requests where immediate results are required:
async with A2AAgent(...) as agent:
response = await agent.run("Generate a summary of the quarterly report")
# Blocks until task completion
print(f"Received {len(response.messages)} messages")
Under the hood, A2AAgent.run invokes _map_a2a_stream to process the JSON-RPC response, parsing final artifacts via _parse_messages_from_task and assembling a complete AgentResponse object.
Real-Time Streaming with Server-Sent Events
For interactive applications requiring progress visibility, enable streaming mode by passing stream=True. This opens an SSE connection that yields AgentResponseUpdate objects as the remote agent generates intermediate outputs:
async with A2AAgent(...) as agent:
stream = agent.run(
"Explain the A2A protocol step-by-step",
stream=True
)
async for update in stream:
for content in update.contents:
if content.text:
print("⏱️", content.text) # Incremental output
final = await stream.get_final_response()
print(f"✅ Final message count: {len(final.messages)}")
The _map_a2a_stream method consumes raw SSE events (either A2AMessage or (Task, Event) tuples) and maps status updates such as working or input_required into framework-native update objects.
Handling Long-Running Tasks with Background Mode
When dealing with extended computations that risk connection timeouts, use background mode by setting background=True alongside stream=True. This configuration yields a continuation token immediately while offloading task execution to the remote server:
async with A2AAgent(...) as agent:
stream = agent.run(
"Generate a comprehensive 50-page technical analysis",
stream=True,
background=True # Emit token, don't block
)
async for update in stream:
if update.continuation_token:
token = update.continuation_token
print(f"🔄 Task in progress – token: {token}")
break
# Later: Poll for status or resume waiting
response = await agent.poll_task(token)
print(f"Current status: {response.task_status}")
# Or resume the stream
final_response = await agent.run(continuation_token=token)
The continuation_token encapsulates the A2AContinuationToken data required to reconstruct session state, allowing you to persist tokens in databases or distributed caches for durable orchestration.
Hosting Your Own A2A Service
While A2AAgent focuses on consumption, the framework supports hosting via the agent_definitions.py pattern found in python/samples/04-hosting/a2a/. Implement a concrete Agent subclass and register it to expose your agent through the standard A2A JSON-RPC endpoint:
# python/samples/04-hosting/a2a/agent_definitions.py
from agent_framework import Agent, Content
class PolicyAgent(Agent):
"""Handles compliance and policy queries via A2A protocol."""
async def _run(self, messages):
query = messages[-1].text
return [Content.from_text(f"Policy analysis for: {query}")]
The companion a2a_server.py script loads these definitions and serves the /.well-known/agent.json discovery document, enabling other A2AAgent instances to resolve and invoke your service.
Summary
- Use
A2AAgentinpython/packages/a2a/agent_framework_a2a/_agent.pyto wrap remote A2A services as standard framework agents, handling all JSON-RPC/SSE protocol details automatically. - Discover capabilities via
A2ACardResolverand the/.well-known/agent.jsonendpoint before establishing connections. - Enable streaming with
stream=Trueto receive real-timeAgentResponseUpdateobjects via Server-Sent Events for interactive user experiences. - Manage durability using
background=Trueto obtainA2AContinuationTokeninstances that supportpoll_task()andrun(continuation_token=...)for long-running workflows. - Host custom agents by implementing the
Agentbase class and registering implementations inagent_definitions.pyfor the sample A2A server.
Frequently Asked Questions
What is the difference between streaming mode and background mode in A2AAgent?
Streaming mode (stream=True) maintains an open SSE connection and yields intermediate updates as the remote agent works, while background mode (background=True) immediately returns a continuation token and releases the connection, allowing you to poll status asynchronously without holding network resources. You can combine both flags to stream updates from a backgrounded task initially, then resume later using the token.
How does A2AAgent convert between framework types and A2A protocol formats?
The _prepare_message_for_a2a method serializes framework Message objects into A2A JSON-RPC-compliant messages, handling text, URI, data, and hosted file content types. Conversely, _parse_contents_from_a2a deserializes A2A Part objects back into framework Content instances, with strict validation that raises ValueError for malformed data URIs or unknown part kinds.
Where are the core A2A protocol implementations located in the repository?
The primary implementation resides in python/packages/a2a/agent_framework_a2a/_agent.py, which contains the A2AAgent class, message conversion utilities, and streaming handlers. Reference implementations and integration samples are available in python/samples/04-hosting/a2a/, including agent_with_a2a.py for consumption patterns and a2a_server.py for hosting.
How do I resume an A2A task after a network disconnection or process restart?
Store the continuation_token object emitted when using background=True, which encapsulates the remote task_id and context_id. Later, invoke await agent.poll_task(token) to check current status without blocking, or await agent.run(continuation_token=token) to reattach the stream and wait for completion. This mechanism leverages the A2AContinuationToken dataclass to maintain session continuity across distributed systems.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →