How to Build Agent-to-Agent (A2A) Communication Protocols in the Microsoft Agent Framework

The Microsoft Agent Framework enables seamless agent-to-agent communication by wrapping any A2A-compatible service as a standard BaseAgent using the A2AAgent class, which automatically handles JSON-RPC/SSE protocol translation, bidirectional message conversion, and long-running task management via continuation tokens.

The framework provides a first-class A2A integration layer that abstracts the underlying wire protocol while preserving full access to streaming and asynchronous capabilities. By implementing the A2AAgent class located in python/packages/a2a/agent_framework_a2a/_agent.py, you can integrate remote A2A services into your agentic workflows without managing raw HTTP connections or JSON-RPC payloads manually.

Architecture of the A2A Integration Layer

The A2A implementation centers on the A2AAgent class, which acts as a bidirectional adapter between the framework's native Message/Content model and the A2A JSON-RPC / SSE protocol. Internally, A2AAgent instantiates an a2a.client.Client to manage transport connections and delegates message serialization to two critical private methods:

  • _prepare_message_for_a2a – Converts framework Message objects into A2AMessage instances, handling text content, URI references, data blobs, and hosted file attachments.
  • _parse_contents_from_a2a – Transforms A2A Parts back into framework Content objects, ensuring type-safe deserialization of remote responses.

For long-running operations, the framework utilizes the A2AContinuationToken dataclass (defined in the same _agent.py file) to persist task_id and context_id state. This enables durable task execution across network interruptions or process restarts.

Implementing Agent-to-Agent Communication

Discovering Remote Agents with A2ACardResolver

Before establishing communication, you must resolve the remote agent's capabilities via the AgentCard endpoint (/.well-known/agent.json). The A2ACardResolver utility fetches this metadata to configure transport and authentication parameters:

import os
import asyncio
import httpx
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent

async def discover_and_connect():
    async with httpx.AsyncClient() as http:
        resolver = A2ACardResolver(
            httpx_client=http, 
            base_url=os.getenv("A2A_AGENT_HOST")
        )
        card = await resolver.get_agent_card()

    async with A2AAgent(
        name=card.name,
        description=card.description,
        agent_card=card,
        url=os.getenv("A2A_AGENT_HOST"),
    ) as agent:
        response = await agent.run("What are your capabilities?")
        for msg in response.messages:
            print(msg.text)

asyncio.run(discover_and_connect())

The A2AAgent constructor requires the resolved AgentCard and target URL. When used as an async context manager, it automatically manages the underlying httpx.AsyncClient lifecycle and connection pooling.

Basic Synchronous Calls

By default, the run() method blocks until the remote A2A server reaches a terminal task state (completed, failed, or canceled). This mode is optimal for short-lived requests where immediate results are required:

async with A2AAgent(...) as agent:
    response = await agent.run("Generate a summary of the quarterly report")
    # Blocks until task completion

    print(f"Received {len(response.messages)} messages")

Under the hood, A2AAgent.run invokes _map_a2a_stream to process the JSON-RPC response, parsing final artifacts via _parse_messages_from_task and assembling a complete AgentResponse object.

Real-Time Streaming with Server-Sent Events

For interactive applications requiring progress visibility, enable streaming mode by passing stream=True. This opens an SSE connection that yields AgentResponseUpdate objects as the remote agent generates intermediate outputs:

async with A2AAgent(...) as agent:
    stream = agent.run(
        "Explain the A2A protocol step-by-step", 
        stream=True
    )
    
    async for update in stream:
        for content in update.contents:
            if content.text:
                print("⏱️", content.text)  # Incremental output

    
    final = await stream.get_final_response()
    print(f"✅ Final message count: {len(final.messages)}")

The _map_a2a_stream method consumes raw SSE events (either A2AMessage or (Task, Event) tuples) and maps status updates such as working or input_required into framework-native update objects.

Handling Long-Running Tasks with Background Mode

When dealing with extended computations that risk connection timeouts, use background mode by setting background=True alongside stream=True. This configuration yields a continuation token immediately while offloading task execution to the remote server:

async with A2AAgent(...) as agent:
    stream = agent.run(
        "Generate a comprehensive 50-page technical analysis",
        stream=True,
        background=True  # Emit token, don't block

    )
    
    async for update in stream:
        if update.continuation_token:
            token = update.continuation_token
            print(f"🔄 Task in progress – token: {token}")
            break

# Later: Poll for status or resume waiting

response = await agent.poll_task(token)
print(f"Current status: {response.task_status}")

# Or resume the stream

final_response = await agent.run(continuation_token=token)

The continuation_token encapsulates the A2AContinuationToken data required to reconstruct session state, allowing you to persist tokens in databases or distributed caches for durable orchestration.

Hosting Your Own A2A Service

While A2AAgent focuses on consumption, the framework supports hosting via the agent_definitions.py pattern found in python/samples/04-hosting/a2a/. Implement a concrete Agent subclass and register it to expose your agent through the standard A2A JSON-RPC endpoint:


# python/samples/04-hosting/a2a/agent_definitions.py

from agent_framework import Agent, Content

class PolicyAgent(Agent):
    """Handles compliance and policy queries via A2A protocol."""
    
    async def _run(self, messages):
        query = messages[-1].text
        return [Content.from_text(f"Policy analysis for: {query}")]

The companion a2a_server.py script loads these definitions and serves the /.well-known/agent.json discovery document, enabling other A2AAgent instances to resolve and invoke your service.

Summary

  • Use A2AAgent in python/packages/a2a/agent_framework_a2a/_agent.py to wrap remote A2A services as standard framework agents, handling all JSON-RPC/SSE protocol details automatically.
  • Discover capabilities via A2ACardResolver and the /.well-known/agent.json endpoint before establishing connections.
  • Enable streaming with stream=True to receive real-time AgentResponseUpdate objects via Server-Sent Events for interactive user experiences.
  • Manage durability using background=True to obtain A2AContinuationToken instances that support poll_task() and run(continuation_token=...) for long-running workflows.
  • Host custom agents by implementing the Agent base class and registering implementations in agent_definitions.py for the sample A2A server.

Frequently Asked Questions

What is the difference between streaming mode and background mode in A2AAgent?

Streaming mode (stream=True) maintains an open SSE connection and yields intermediate updates as the remote agent works, while background mode (background=True) immediately returns a continuation token and releases the connection, allowing you to poll status asynchronously without holding network resources. You can combine both flags to stream updates from a backgrounded task initially, then resume later using the token.

How does A2AAgent convert between framework types and A2A protocol formats?

The _prepare_message_for_a2a method serializes framework Message objects into A2A JSON-RPC-compliant messages, handling text, URI, data, and hosted file content types. Conversely, _parse_contents_from_a2a deserializes A2A Part objects back into framework Content instances, with strict validation that raises ValueError for malformed data URIs or unknown part kinds.

Where are the core A2A protocol implementations located in the repository?

The primary implementation resides in python/packages/a2a/agent_framework_a2a/_agent.py, which contains the A2AAgent class, message conversion utilities, and streaming handlers. Reference implementations and integration samples are available in python/samples/04-hosting/a2a/, including agent_with_a2a.py for consumption patterns and a2a_server.py for hosting.

How do I resume an A2A task after a network disconnection or process restart?

Store the continuation_token object emitted when using background=True, which encapsulates the remote task_id and context_id. Later, invoke await agent.poll_task(token) to check current status without blocking, or await agent.run(continuation_token=token) to reattach the stream and wait for completion. This mechanism leverages the A2AContinuationToken dataclass to maintain session continuity across distributed systems.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →