# How to Build Agent-to-Agent (A2A) Communication Protocols in the Microsoft Agent Framework

> Learn to build agent-to-agent A2A communication protocols in the Microsoft Agent Framework using the A2AAgent class for seamless JSON-RPC/SSE translation and bidirectional messaging.

- Repository: [Microsoft/agent-framework](https://github.com/microsoft/agent-framework)
- Tags: how-to-guide
- Published: 2026-04-05

---

**The Microsoft Agent Framework enables seamless agent-to-agent communication by wrapping any A2A-compatible service as a standard `BaseAgent` using the `A2AAgent` class, which automatically handles JSON-RPC/SSE protocol translation, bidirectional message conversion, and long-running task management via continuation tokens.**

The framework provides a first-class **A2A integration layer** that abstracts the underlying wire protocol while preserving full access to streaming and asynchronous capabilities. By implementing the `A2AAgent` class located in [`python/packages/a2a/agent_framework_a2a/_agent.py`](https://github.com/microsoft/agent-framework/blob/main/python/packages/a2a/agent_framework_a2a/_agent.py), you can integrate remote A2A services into your agentic workflows without managing raw HTTP connections or JSON-RPC payloads manually.

## Architecture of the A2A Integration Layer

The A2A implementation centers on the `A2AAgent` class, which acts as a bidirectional adapter between the framework's native `Message`/`Content` model and the **A2A JSON-RPC / SSE protocol**. Internally, `A2AAgent` instantiates an `a2a.client.Client` to manage transport connections and delegates message serialization to two critical private methods:

- **`_prepare_message_for_a2a`** – Converts framework `Message` objects into `A2AMessage` instances, handling text content, URI references, data blobs, and hosted file attachments.
- **`_parse_contents_from_a2a`** – Transforms A2A **Parts** back into framework `Content` objects, ensuring type-safe deserialization of remote responses.

For long-running operations, the framework utilizes the **`A2AContinuationToken`** dataclass (defined in the same [`_agent.py`](https://github.com/microsoft/agent-framework/blob/main/_agent.py) file) to persist `task_id` and `context_id` state. This enables durable task execution across network interruptions or process restarts.

## Implementing Agent-to-Agent Communication

### Discovering Remote Agents with A2ACardResolver

Before establishing communication, you must resolve the remote agent's capabilities via the **AgentCard** endpoint ([`/.well-known/agent.json`](https://github.com/microsoft/agent-framework/blob/main//.well-known/agent.json)). The `A2ACardResolver` utility fetches this metadata to configure transport and authentication parameters:

```python
import os
import asyncio
import httpx
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent

async def discover_and_connect():
    async with httpx.AsyncClient() as http:
        resolver = A2ACardResolver(
            httpx_client=http, 
            base_url=os.getenv("A2A_AGENT_HOST")
        )
        card = await resolver.get_agent_card()

    async with A2AAgent(
        name=card.name,
        description=card.description,
        agent_card=card,
        url=os.getenv("A2A_AGENT_HOST"),
    ) as agent:
        response = await agent.run("What are your capabilities?")
        for msg in response.messages:
            print(msg.text)

asyncio.run(discover_and_connect())

```

The `A2AAgent` constructor requires the resolved `AgentCard` and target URL. When used as an async context manager, it automatically manages the underlying `httpx.AsyncClient` lifecycle and connection pooling.

### Basic Synchronous Calls

By default, the `run()` method blocks until the remote A2A server reaches a terminal task state (`completed`, `failed`, or `canceled`). This mode is optimal for short-lived requests where immediate results are required:

```python
async with A2AAgent(...) as agent:
    response = await agent.run("Generate a summary of the quarterly report")
    # Blocks until task completion

    print(f"Received {len(response.messages)} messages")

```

Under the hood, `A2AAgent.run` invokes `_map_a2a_stream` to process the JSON-RPC response, parsing final artifacts via `_parse_messages_from_task` and assembling a complete `AgentResponse` object.

### Real-Time Streaming with Server-Sent Events

For interactive applications requiring progress visibility, enable **streaming mode** by passing `stream=True`. This opens an SSE connection that yields `AgentResponseUpdate` objects as the remote agent generates intermediate outputs:

```python
async with A2AAgent(...) as agent:
    stream = agent.run(
        "Explain the A2A protocol step-by-step", 
        stream=True
    )
    
    async for update in stream:
        for content in update.contents:
            if content.text:
                print("⏱️", content.text)  # Incremental output

    
    final = await stream.get_final_response()
    print(f"✅ Final message count: {len(final.messages)}")

```

The `_map_a2a_stream` method consumes raw SSE events (either `A2AMessage` or `(Task, Event)` tuples) and maps status updates such as `working` or `input_required` into framework-native update objects.

### Handling Long-Running Tasks with Background Mode

When dealing with extended computations that risk connection timeouts, use **background mode** by setting `background=True` alongside `stream=True`. This configuration yields a continuation token immediately while offloading task execution to the remote server:

```python
async with A2AAgent(...) as agent:
    stream = agent.run(
        "Generate a comprehensive 50-page technical analysis",
        stream=True,
        background=True  # Emit token, don't block

    )
    
    async for update in stream:
        if update.continuation_token:
            token = update.continuation_token
            print(f"🔄 Task in progress – token: {token}")
            break

# Later: Poll for status or resume waiting

response = await agent.poll_task(token)
print(f"Current status: {response.task_status}")

# Or resume the stream

final_response = await agent.run(continuation_token=token)

```

The `continuation_token` encapsulates the `A2AContinuationToken` data required to reconstruct session state, allowing you to persist tokens in databases or distributed caches for durable orchestration.

## Hosting Your Own A2A Service

While `A2AAgent` focuses on consumption, the framework supports hosting via the [`agent_definitions.py`](https://github.com/microsoft/agent-framework/blob/main/agent_definitions.py) pattern found in `python/samples/04-hosting/a2a/`. Implement a concrete `Agent` subclass and register it to expose your agent through the standard A2A JSON-RPC endpoint:

```python

# python/samples/04-hosting/a2a/agent_definitions.py

from agent_framework import Agent, Content

class PolicyAgent(Agent):
    """Handles compliance and policy queries via A2A protocol."""
    
    async def _run(self, messages):
        query = messages[-1].text
        return [Content.from_text(f"Policy analysis for: {query}")]

```

The companion [`a2a_server.py`](https://github.com/microsoft/agent-framework/blob/main/a2a_server.py) script loads these definitions and serves the [`/.well-known/agent.json`](https://github.com/microsoft/agent-framework/blob/main//.well-known/agent.json) discovery document, enabling other `A2AAgent` instances to resolve and invoke your service.

## Summary

- **Use `A2AAgent`** in [`python/packages/a2a/agent_framework_a2a/_agent.py`](https://github.com/microsoft/agent-framework/blob/main/python/packages/a2a/agent_framework_a2a/_agent.py) to wrap remote A2A services as standard framework agents, handling all JSON-RPC/SSE protocol details automatically.
- **Discover capabilities** via `A2ACardResolver` and the [`/.well-known/agent.json`](https://github.com/microsoft/agent-framework/blob/main//.well-known/agent.json) endpoint before establishing connections.
- **Enable streaming** with `stream=True` to receive real-time `AgentResponseUpdate` objects via Server-Sent Events for interactive user experiences.
- **Manage durability** using `background=True` to obtain `A2AContinuationToken` instances that support `poll_task()` and `run(continuation_token=...)` for long-running workflows.
- **Host custom agents** by implementing the `Agent` base class and registering implementations in [`agent_definitions.py`](https://github.com/microsoft/agent-framework/blob/main/agent_definitions.py) for the sample A2A server.

## Frequently Asked Questions

### What is the difference between streaming mode and background mode in A2AAgent?

Streaming mode (`stream=True`) maintains an open SSE connection and yields intermediate updates as the remote agent works, while background mode (`background=True`) immediately returns a continuation token and releases the connection, allowing you to poll status asynchronously without holding network resources. You can combine both flags to stream updates from a backgrounded task initially, then resume later using the token.

### How does A2AAgent convert between framework types and A2A protocol formats?

The `_prepare_message_for_a2a` method serializes framework `Message` objects into A2A JSON-RPC-compliant messages, handling text, URI, data, and hosted file content types. Conversely, `_parse_contents_from_a2a` deserializes A2A Part objects back into framework `Content` instances, with strict validation that raises `ValueError` for malformed data URIs or unknown part kinds.

### Where are the core A2A protocol implementations located in the repository?

The primary implementation resides in [`python/packages/a2a/agent_framework_a2a/_agent.py`](https://github.com/microsoft/agent-framework/blob/main/python/packages/a2a/agent_framework_a2a/_agent.py), which contains the `A2AAgent` class, message conversion utilities, and streaming handlers. Reference implementations and integration samples are available in `python/samples/04-hosting/a2a/`, including [`agent_with_a2a.py`](https://github.com/microsoft/agent-framework/blob/main/agent_with_a2a.py) for consumption patterns and [`a2a_server.py`](https://github.com/microsoft/agent-framework/blob/main/a2a_server.py) for hosting.

### How do I resume an A2A task after a network disconnection or process restart?

Store the `continuation_token` object emitted when using `background=True`, which encapsulates the remote `task_id` and `context_id`. Later, invoke `await agent.poll_task(token)` to check current status without blocking, or `await agent.run(continuation_token=token)` to reattach the stream and wait for completion. This mechanism leverages the `A2AContinuationToken` dataclass to maintain session continuity across distributed systems.