# Implementing Streaming Message Handling with Model Client Streaming in AutoGen

> Implement streaming message handling with AutoGen's model client streaming for real-time text. Activate this feature by setting model_client_stream=True on your AssistantAgent for incremental updates.

- Repository: [Microsoft/autogen](https://github.com/microsoft/autogen)
- Tags: how-to-guide
- Published: 2026-03-07

---

**AutoGen's model client streaming enables real-time text delivery by yielding incremental chunks through `ModelClientStreamingChunkEvent` while processing the final `CreateResult` separately, activated by setting `model_client_stream=True` on any `AssistantAgent`.**

AutoGen's model client streaming architecture allows agents to stream partial LLM outputs as they are generated rather than waiting for complete responses. This guide covers implementing streaming message handling with model client streaming in the `microsoft/autogen` repository, demonstrating how to enable real-time UI updates and progressive content delivery across different interfaces.

## Enabling Streaming Mode on Agents

To activate streaming, pass `model_client_stream=True` when constructing an `AssistantAgent` or any `BaseChatAgent` subclass. This flag switches the internal logic from synchronous result waiting to asynchronous chunk processing.

```python
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient

agent = AssistantAgent(
    name="streaming_assistant",
    model_client=OpenAIChatCompletionClient(model="gpt-4o-mini"),
    model_client_stream=True,  # Enable streaming mode

)

```

When this flag is set, the agent's `_call_llm` method in [`python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py`](https://github.com/microsoft/autogen/blob/main/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py) routes execution to the streaming path, iterating over `model_client.create_stream()` instead of calling `create()`.

## The Model Client Streaming Contract

All LLM wrappers implement the `ChatCompletionClient` abstract base class, which declares the streaming interface in [`python/packages/autogen-core/src/autogen_core/models/_model_client.py`](https://github.com/microsoft/autogen/blob/main/python/packages/autogen-core/src/autogen_core/models/_model_client.py). The `create_stream` method yields raw string chunks followed by a final result object:

```python
def create_stream(
    self,
    messages: Sequence[LLMMessage],
    *,
    tools: Sequence[Tool | ToolSchema] = [],
    tool_choice: Tool | Literal["auto", "required", "none"] = "auto",
    json_output: Optional[bool | type[BaseModel]] = None,
    extra_create_args: Mapping[str, Any] = {},
    cancellation_token: Optional[CancellationToken] = None,
) -> AsyncGenerator[Union[str, CreateResult], None]:
    """Creates a stream of string chunks from the model ending with a CreateResult."""

```

Concrete implementations (OpenAI, Azure, Anthropic) adapt vendor-specific streaming APIs to this unified contract. The generator yields plain `str` objects representing incremental text content, terminating with a `CreateResult` containing the final assembled message, usage statistics, and any tool-call payloads.

## Streaming Event Architecture

As the agent consumes chunks from the model client, it wraps each string in a typed event structure defined in [`python/packages/autogen-agentchat/src/autogen_agentchat/messages.py`](https://github.com/microsoft/autogen/blob/main/python/packages/autogen-agentchat/src/autogen_agentchat/messages.py):

```python
class ModelClientStreamingChunkEvent(BaseAgentEvent):
    """An event signaling a text output chunk from a model client in streaming mode."""
    content: str
    full_message_id: str | None = None
    type: Literal["ModelClientStreamingChunkEvent"] = "ModelClientStreamingChunkEvent"

```

The `full_message_id` field starts as `None` during streaming but is populated later to correlate chunks with their final message. Inside [`_assistant_agent.py`](https://github.com/microsoft/autogen/blob/main/_assistant_agent.py), the agent yields these events immediately upon receiving string chunks:

```python
async for chunk in model_client.create_stream(...):
    if isinstance(chunk, CreateResult):
        model_result = chunk
    elif isinstance(chunk, str):
        yield ModelClientStreamingChunkEvent(
            content=chunk,
            source=agent_name,
            full_message_id=message_id,
        )

```

## Consuming Streaming Events

Consumer code iterates over `agent.run_stream()` and handles two distinct event types: `ModelClientStreamingChunkEvent` for incremental updates and `TaskResult` for final assembly.

### Command-Line Interface Streaming

For terminal applications, print chunks as they arrive without newlines:

```python
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.replay import ReplayChatCompletionClient
from autogen_agentchat.messages import ModelClientStreamingChunkEvent, TaskResult

async def main():
    mock_client = ReplayChatCompletionClient(
        ["Hello, ", "world!", " This is a streamed reply."],
    )
    agent = AssistantAgent(
        name="streamer",
        model_client=mock_client,
        model_client_stream=True,
    )

    async for event in agent.run_stream(task="demo"):
        if isinstance(event, ModelClientStreamingChunkEvent):
            print(event.content, end="", flush=True)
        elif isinstance(event, TaskResult):
            print("\n\nFinal message assembled.")

asyncio.run(main())

```

The `ReplayChatCompletionClient` mimics real LLM behavior by returning configured strings followed by a `CreateResult`, making it ideal for testing streaming implementations without API costs.

### FastAPI Server-Sent Events

For web interfaces, stream chunks directly to the client using StreamingResponse:

```python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.messages import ModelClientStreamingChunkEvent

app = FastAPI()
openai_client = OpenAIChatCompletionClient(model="gpt-4o-mini")

assistant = AssistantAgent(
    name="fastapi_assistant",
    model_client=openai_client,
    model_client_stream=True,
)

@app.post("/chat")
async def chat(request: Request):
    body = await request.json()
    prompt = body["prompt"]

    async def stream():
        async for ev in assistant.run_stream(task=prompt):
            if isinstance(ev, ModelClientStreamingChunkEvent):
                yield ev.content

    return StreamingResponse(stream(), media_type="text/plain")

```

### Chainlit UI Integration

For rich web UIs, update message content incrementally:

```python
import chainlit as cl
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.replay import ReplayChatCompletionClient
from autogen_agentchat.messages import ModelClientStreamingChunkEvent

mock = ReplayChatCompletionClient(["Streaming ", "via ", "Chainlit!"])
agent = AssistantAgent(
    name="chainlit_bot",
    model_client=mock,
    model_client_stream=True,
)

@cl.on_message
async def main(message: cl.Message):
    ui_msg = cl.Message(content="", author="assistant")
    async for ev in agent.run_stream(task=message.content):
        if isinstance(ev, ModelClientStreamingChunkEvent):
            ui_msg.content += ev.content
            await ui_msg.update()
    await ui_msg.send()

```

## Correlating Chunks with Final Messages

When the stream completes, the final `CreateResult` is converted into a `Response` with a stable message ID. The agent updates the `full_message_id` on all previously emitted chunks to match this final ID, enabling consumers to associate streaming fragments with their completed message container.

This correlation mechanism is validated in [`python/packages/autogen-agentchat/tests/test_streaming_message_id_correlation.py`](https://github.com/microsoft/autogen/blob/main/python/packages/autogen-agentchat/tests/test_streaming_message_id_correlation.py), which asserts that every chunk's `full_message_id` equals the completed message's `id` after the stream finishes. The logic resides in `_process_model_result` within the agent implementation, ensuring that partial content displayed during streaming correctly links to the final persisted message.

## Summary

- **Enable streaming** by setting `model_client_stream=True` on `AssistantAgent` instances to switch from blocking to incremental output mode.
- **Model clients** implement `create_stream()` in [`python/packages/autogen-core/src/autogen_core/models/_model_client.py`](https://github.com/microsoft/autogen/blob/main/python/packages/autogen-core/src/autogen_core/models/_model_client.py), yielding `str` chunks terminated by a `CreateResult` object.
- **Event wrapping** occurs in [`python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py`](https://github.com/microsoft/autogen/blob/main/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py), where raw strings become `ModelClientStreamingChunkEvent` objects with optional `full_message_id` annotations.
- **Consumer differentiation** requires checking `isinstance(event, ModelClientStreamingChunkEvent)` for incremental updates versus `TaskResult` for final message assembly.
- **Message correlation** happens automatically when the final result is processed, allowing chunks to reference the completed message ID for UI consistency.

## Frequently Asked Questions

### How do I enable streaming on an existing AssistantAgent?

Pass `model_client_stream=True` during agent instantiation. This boolean flag switches the internal `_call_llm` logic to use `model_client.create_stream()` instead of the standard `create()` method, immediately yielding text chunks as they arrive from the LLM provider.

### What is the difference between string chunks and CreateResult in the stream?

The `AsyncGenerator` yields plain `str` objects containing incremental text content during generation, followed by a single `CreateResult` object at termination. The `CreateResult` contains the final assembled message content, token usage statistics, and any tool calls, while the preceding strings represent partial content suitable for live UI updates.

### How can I correlate streaming chunks with the final message?

Each `ModelClientStreamingChunkEvent` carries a `full_message_id` field that is initially `None` but gets populated with the final message's ID when the agent processes the concluding `CreateResult`. Consumers can store this ID from chunks and match it against the ID in the final `TaskResult` to associate partial content with the completed message.

### Which model clients support streaming in AutoGen?

All official clients in the `autogen_ext` package implement the `create_stream` method, including `OpenAIChatCompletionClient`, `AzureOpenAIChatCompletionClient`, and `AnthropicChatCompletionClient`. Additionally, `ReplayChatCompletionClient` provides deterministic streaming for testing, while custom clients can support streaming by implementing the abstract `create_stream` method defined in the core interface.