Implementing Streaming Message Handling with Model Client Streaming in AutoGen
AutoGen's model client streaming enables real-time text delivery by yielding incremental chunks through ModelClientStreamingChunkEvent while processing the final CreateResult separately, activated by setting model_client_stream=True on any AssistantAgent.
AutoGen's model client streaming architecture allows agents to stream partial LLM outputs as they are generated rather than waiting for complete responses. This guide covers implementing streaming message handling with model client streaming in the microsoft/autogen repository, demonstrating how to enable real-time UI updates and progressive content delivery across different interfaces.
Enabling Streaming Mode on Agents
To activate streaming, pass model_client_stream=True when constructing an AssistantAgent or any BaseChatAgent subclass. This flag switches the internal logic from synchronous result waiting to asynchronous chunk processing.
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
agent = AssistantAgent(
name="streaming_assistant",
model_client=OpenAIChatCompletionClient(model="gpt-4o-mini"),
model_client_stream=True, # Enable streaming mode
)
When this flag is set, the agent's _call_llm method in python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py routes execution to the streaming path, iterating over model_client.create_stream() instead of calling create().
The Model Client Streaming Contract
All LLM wrappers implement the ChatCompletionClient abstract base class, which declares the streaming interface in python/packages/autogen-core/src/autogen_core/models/_model_client.py. The create_stream method yields raw string chunks followed by a final result object:
def create_stream(
self,
messages: Sequence[LLMMessage],
*,
tools: Sequence[Tool | ToolSchema] = [],
tool_choice: Tool | Literal["auto", "required", "none"] = "auto",
json_output: Optional[bool | type[BaseModel]] = None,
extra_create_args: Mapping[str, Any] = {},
cancellation_token: Optional[CancellationToken] = None,
) -> AsyncGenerator[Union[str, CreateResult], None]:
"""Creates a stream of string chunks from the model ending with a CreateResult."""
Concrete implementations (OpenAI, Azure, Anthropic) adapt vendor-specific streaming APIs to this unified contract. The generator yields plain str objects representing incremental text content, terminating with a CreateResult containing the final assembled message, usage statistics, and any tool-call payloads.
Streaming Event Architecture
As the agent consumes chunks from the model client, it wraps each string in a typed event structure defined in python/packages/autogen-agentchat/src/autogen_agentchat/messages.py:
class ModelClientStreamingChunkEvent(BaseAgentEvent):
"""An event signaling a text output chunk from a model client in streaming mode."""
content: str
full_message_id: str | None = None
type: Literal["ModelClientStreamingChunkEvent"] = "ModelClientStreamingChunkEvent"
The full_message_id field starts as None during streaming but is populated later to correlate chunks with their final message. Inside _assistant_agent.py, the agent yields these events immediately upon receiving string chunks:
async for chunk in model_client.create_stream(...):
if isinstance(chunk, CreateResult):
model_result = chunk
elif isinstance(chunk, str):
yield ModelClientStreamingChunkEvent(
content=chunk,
source=agent_name,
full_message_id=message_id,
)
Consuming Streaming Events
Consumer code iterates over agent.run_stream() and handles two distinct event types: ModelClientStreamingChunkEvent for incremental updates and TaskResult for final assembly.
Command-Line Interface Streaming
For terminal applications, print chunks as they arrive without newlines:
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.replay import ReplayChatCompletionClient
from autogen_agentchat.messages import ModelClientStreamingChunkEvent, TaskResult
async def main():
mock_client = ReplayChatCompletionClient(
["Hello, ", "world!", " This is a streamed reply."],
)
agent = AssistantAgent(
name="streamer",
model_client=mock_client,
model_client_stream=True,
)
async for event in agent.run_stream(task="demo"):
if isinstance(event, ModelClientStreamingChunkEvent):
print(event.content, end="", flush=True)
elif isinstance(event, TaskResult):
print("\n\nFinal message assembled.")
asyncio.run(main())
The ReplayChatCompletionClient mimics real LLM behavior by returning configured strings followed by a CreateResult, making it ideal for testing streaming implementations without API costs.
FastAPI Server-Sent Events
For web interfaces, stream chunks directly to the client using StreamingResponse:
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.messages import ModelClientStreamingChunkEvent
app = FastAPI()
openai_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
assistant = AssistantAgent(
name="fastapi_assistant",
model_client=openai_client,
model_client_stream=True,
)
@app.post("/chat")
async def chat(request: Request):
body = await request.json()
prompt = body["prompt"]
async def stream():
async for ev in assistant.run_stream(task=prompt):
if isinstance(ev, ModelClientStreamingChunkEvent):
yield ev.content
return StreamingResponse(stream(), media_type="text/plain")
Chainlit UI Integration
For rich web UIs, update message content incrementally:
import chainlit as cl
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.replay import ReplayChatCompletionClient
from autogen_agentchat.messages import ModelClientStreamingChunkEvent
mock = ReplayChatCompletionClient(["Streaming ", "via ", "Chainlit!"])
agent = AssistantAgent(
name="chainlit_bot",
model_client=mock,
model_client_stream=True,
)
@cl.on_message
async def main(message: cl.Message):
ui_msg = cl.Message(content="", author="assistant")
async for ev in agent.run_stream(task=message.content):
if isinstance(ev, ModelClientStreamingChunkEvent):
ui_msg.content += ev.content
await ui_msg.update()
await ui_msg.send()
Correlating Chunks with Final Messages
When the stream completes, the final CreateResult is converted into a Response with a stable message ID. The agent updates the full_message_id on all previously emitted chunks to match this final ID, enabling consumers to associate streaming fragments with their completed message container.
This correlation mechanism is validated in python/packages/autogen-agentchat/tests/test_streaming_message_id_correlation.py, which asserts that every chunk's full_message_id equals the completed message's id after the stream finishes. The logic resides in _process_model_result within the agent implementation, ensuring that partial content displayed during streaming correctly links to the final persisted message.
Summary
- Enable streaming by setting
model_client_stream=TrueonAssistantAgentinstances to switch from blocking to incremental output mode. - Model clients implement
create_stream()inpython/packages/autogen-core/src/autogen_core/models/_model_client.py, yieldingstrchunks terminated by aCreateResultobject. - Event wrapping occurs in
python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py, where raw strings becomeModelClientStreamingChunkEventobjects with optionalfull_message_idannotations. - Consumer differentiation requires checking
isinstance(event, ModelClientStreamingChunkEvent)for incremental updates versusTaskResultfor final message assembly. - Message correlation happens automatically when the final result is processed, allowing chunks to reference the completed message ID for UI consistency.
Frequently Asked Questions
How do I enable streaming on an existing AssistantAgent?
Pass model_client_stream=True during agent instantiation. This boolean flag switches the internal _call_llm logic to use model_client.create_stream() instead of the standard create() method, immediately yielding text chunks as they arrive from the LLM provider.
What is the difference between string chunks and CreateResult in the stream?
The AsyncGenerator yields plain str objects containing incremental text content during generation, followed by a single CreateResult object at termination. The CreateResult contains the final assembled message content, token usage statistics, and any tool calls, while the preceding strings represent partial content suitable for live UI updates.
How can I correlate streaming chunks with the final message?
Each ModelClientStreamingChunkEvent carries a full_message_id field that is initially None but gets populated with the final message's ID when the agent processes the concluding CreateResult. Consumers can store this ID from chunks and match it against the ID in the final TaskResult to associate partial content with the completed message.
Which model clients support streaming in AutoGen?
All official clients in the autogen_ext package implement the create_stream method, including OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient, and AnthropicChatCompletionClient. Additionally, ReplayChatCompletionClient provides deterministic streaming for testing, while custom clients can support streaming by implementing the abstract create_stream method defined in the core interface.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →