How to Stream Agent Responses and Handle Streaming Events in openai-agents-python
To stream agent responses and handle streaming events in openai-agents-python, use Runner.run_streamed() to obtain a RunResultStreaming object, then iterate over result.stream_events() to receive typed events including raw LLM deltas, tool calls, and agent handoffs.
The openai-agents-python SDK provides a robust streaming architecture that lets you stream agent responses and handle streaming events in real-time. Unlike standard synchronous execution, streaming mode delivers incremental semantic events—such as reasoning deltas, tool invocations, and handoff notifications—as they occur. This guide walks through the three-layer architecture, event types, and practical implementation patterns using the official source code.
The Three-Layer Streaming Architecture
Streaming in openai-agents-python is built on three distinct layers that convert raw LLM output into consumable events:
| Layer | Responsibility | Core File |
|---|---|---|
| Event definitions | Typed dataclasses representing raw LLM deltas, run-item updates, and agent switches | src/agents/stream_events.py |
| Queueing logic | Converts newly created RunItem objects into StreamEvent instances and pushes them onto an async queue |
src/agents/run_internal/streaming.py |
| Runner entry point | Starts a run, creates the streaming queue, and returns a RunResultStreaming that exposes async for event in result.stream_events() |
src/agents/run.py (method Agent.run_streamed) |
The consumer simply iterates over result.stream_events() and reacts to the type field of each event.
Streaming Event Types
All streaming events inherit from the StreamEvent union defined in src/agents/stream_events.py【/cache/repos/github.com/openai/openai-agents-python/main/src/agents/stream_events.py#L10-L61】.
The three primary event types are:
RawResponsesStreamEvent— Direct delta payloads from the LLM (e.g.,response.reasoning_summary_text.delta). Key field:data: TResponseStreamEvent.RunItemStreamEvent— High-level items derived from the LLM response, including messages, tool calls, handoffs, and reasoning items. Key fields:name(e.g.,"message_output_created","tool_called"),item: RunItem.AgentUpdatedStreamEvent— Indicates the active agent has been swapped via a handoff. Key field:new_agent: Agent[Any].
Pattern matching is straightforward using the type attribute:
if event.type == "raw_response_event":
# low-level token/character deltas
elif event.type == "run_item_stream_event":
# higher-level semantic items
elif event.type == "agent_updated_stream_event":
# handoff occurred
Queueing Logic and Event Generation
When the internal turn-resolution logic creates new RunItem objects (message outputs, tool calls, etc.), the streaming helper stream_step_items_to_queue in src/agents/run_internal/streaming.py walks the list and emits the appropriate RunItemStreamEvent onto the async queue【/cache/repos/github.com/openai/openai-agents-python/main/src/agents/run_internal/streaming.py#L27-L66】.
Key implementation details:
- Each
RunItemsubclass maps to a distinctnameliteral (e.g.,ToolCallItemmaps to"tool_called"). - Approval placeholders (
ToolApprovalItem) are deliberately omitted from the stream—they surface later viaresult.interruptions. - The queue is consumed by the public
RunResultStreamingiterator.
Starting a Streamed Run
The class method Agent.run_streamed (or Runner.run_streamed) in src/agents/run.py is the public entry point【/cache/repos/github.com/openai/openai-agents-python/main/src/agents/run.py#L51-L67】. It delegates to the default AgentRunner, which builds a RunResultStreaming object that:
- Holds an
asyncio.Queue[StreamEvent | QueueCompleteSentinel]. - Provides
async def stream_events(self) -> AsyncIterator[StreamEvent]to iterate until a sentinel is received. - Tracks interruptions separately via
result.interruptions.
Typical invocation pattern:
result = Agent.run_streamed(agent, prompt)
async for event in result.stream_events():
# handle event
Handling Streaming Events in Practice
Processing High-Level Run Items
The examples/basic/stream_items.py script demonstrates a minimal consumer that ignores raw deltas and prints high-level events【/cache/repos/github.com/openai/openai-agents-python/main/examples/basic/stream_items.py#L25-L42】.
result = Runner.run_streamed(agent, input="Hello")
async for event in result.stream_events():
if event.type == "raw_response_event":
continue
elif event.type == "agent_updated_stream_event":
print(f"Agent updated: {event.new_agent.name}")
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
print(f"-- Tool was called: {getattr(event.item.raw_item, 'name', 'Unknown')}")
elif event.item.type == "tool_call_output_item":
print(f"-- Tool output: {event.item.output}")
elif event.item.type == "message_output_item":
print(f"-- Message output:\n {ItemHelpers.text_message_output(event.item)}")
Key takeaways:
- Use
event.typeto drive control flow. - Use
event.item.typeto refine handling forRunItemStreamEvent. - Use
ItemHelpers.text_message_outputto render messages.
Real-Time WebSocket Streaming with HITL
The examples/basic/stream_ws.py example shows a real-world scenario where raw reasoning and output deltas are displayed, tool calls are printed, and human-in-the-loop (HITL) approvals are processed via the interrupt loop【/cache/repos/github.com/openai/openai-agents-python/main/examples/basic/stream_ws.py#L90-L52】.
Key implementation sections:
while True:
async for event in result.stream_events():
if event.type == "raw_response_event":
raw = event.data
if raw.type == "response.reasoning_summary_text.delta":
# print incremental reasoning text
elif raw.type == "response.output_text.delta":
# print incremental assistant output
continue
if event.type != "run_item_stream_event":
continue
item = event.item
if item.type == "tool_call_item":
print(f"[tool call] {tool_name}({tool_args})")
elif item.type == "tool_call_output_item":
print(f"[tool result] {item.output}")
After the stream finishes, check result.interruptions. If any exist (e.g., a tool needs approval), convert the run into a resumable state (result.to_state()), ask the user for approval, call state.approve(interruption) (or reject), and restart the streamed run with the updated state:
state = result.to_state()
for interruption in result.interruptions:
if ask_approval(...):
state.approve(interruption)
else:
state.reject(interruption)
result = ws.run_streamed(agent, state) # resume
Key Source Files
| File | Description |
|---|---|
src/agents/stream_events.py |
Definition of RawResponsesStreamEvent, RunItemStreamEvent, and AgentUpdatedStreamEvent【/cache/repos/github.com/openai/openai-agents-python/main/src/agents/stream_events.py#L10-L61】 |
src/agents/run_internal/streaming.py |
stream_step_items_to_queue and stream_step_result_to_queue which push events onto the queue【/cache/repos/github.com/openai/openai-agents-python/main/src/agents/run_internal/streaming.py#L27-L66】 |
src/agents/run.py |
Public Agent.run_streamed method that returns RunResultStreaming【/cache/repos/github.com/openai/openai-agents-python/main/src/agents/run.py#L51-L67】 |
src/agents/run_internal/run_loop.py |
Where the queue is populated during a streamed turn (calls stream_step_items_to_queue)【/cache/repos/github.com/openai/openai-agents-python/main/src/agents/run_internal/run_loop.py#L77-L80】 |
examples/basic/stream_items.py |
Minimal consumer that prints tool calls and messages【/cache/repos/github.com/openai/openai-agents-python/main/examples/basic/stream_items.py#L25-L42】 |
examples/basic/stream_ws.py |
Full-featured WebSocket-style consumer with reasoning deltas and HITL approvals【/cache/repos/github.com/openai/openai-agents-python/main/examples/basic/stream_ws.py#L90-L52】 |
src/agents/responses_websocket_session.py |
Helper class for exposing streaming results over WebSockets (used by the stream_ws example) |
Common Pitfalls When Streaming
| Symptom | Cause | Fix |
|---|---|---|
No events appear after the first raw_response_event. |
You’re consuming the queue inside a while True loop but exiting early. Ensure you break only after result.interruptions is empty or result.is_complete is True. |
Follow the pattern in stream_ws.py where the outer while True continues if there are pending interruptions. |
| Tool call events are missing. | The stream_step_items_to_queue function skips ToolApprovalItem (approval placeholders) – these become interruptions, not streamed items. |
Handle result.interruptions to surface the approval request, then resume the streamed run. |
agent_updated_stream_event never fires. |
Handoffs are only generated when an agent explicitly returns a NextStepHandoff. Verify your agent’s handoff logic. |
Use Agent.as_tool or custom handoff code; the event will be emitted automatically. |
Summary
- Streaming in openai-agents-python is built on typed events (
RawResponsesStreamEvent,RunItemStreamEvent,AgentUpdatedStreamEvent). - The queueing layer converts each new
RunIteminto aRunItemStreamEventwhile ignoring approval placeholders. - The public API (
Agent.run_streamedorRunner.run_streamed) returns aRunResultStreamingwhosestream_events()async iterator yields events in order. - Consumers can react to raw deltas, high-level items, and agent swaps—all illustrated in the
stream_items.pyandstream_ws.pyexamples. - Interruptions (e.g., HITL approvals) are surfaced separately; after handling them you resume the streamed run with
result.to_state().
Frequently Asked Questions
What is the difference between raw response events and run item events?
Raw response events (RawResponsesStreamEvent) contain low-level token deltas straight from the LLM, such as response.reasoning_summary_text.delta or response.output_text.delta. These are useful for displaying real-time typing indicators or reasoning chains. Run item events (RunItemStreamEvent) are higher-level semantic objects like ToolCallItem, MessageOutputItem, or AgentUpdatedStreamEvent that represent completed actions or state changes in the agent loop.
How do I handle human-in-the-loop approvals when streaming?
When a tool requires approval, the SDK does not emit it as a stream event; instead, it pauses the run and adds a ToolApprovalItem to result.interruptions. After the stream ends, check result.interruptions, convert the run to a resumable state with state = result.to_state(), then call state.approve(interruption) or state.reject(interruption) based on user input. Finally, resume by passing the updated state back into Runner.run_streamed(agent, state).
Why are some tool call events missing from the stream?
The internal function stream_step_items_to_queue in src/agents/run_internal/streaming.py deliberately skips ToolApprovalItem instances because these represent interruptions requiring external input rather than streamable progress. If you expect a tool call but do not see it in stream_events(), check result.interruptions instead. This design ensures that streaming consumers only receive definitive actions while pending approvals are handled separately.
How do I resume a streamed run after an interruption?
After processing interruptions, convert the completed run into a serializable state using state = result.to_state(). This state object captures the conversation history, pending tool calls, and agent context. Update the state by calling state.approve(interruption) or state.reject(interruption) for each interruption, then pass the modified state as the input to a new Runner.run_streamed(agent, state) call. The runner will automatically restore context and continue processing from where it left off.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →