# How to Stream Agent Responses and Handle Streaming Events in openai-agents-python

> Learn to stream agent responses and handle streaming events with openai-agents-python. Use Runner.run_streamed() to access typed events like LLM deltas, tool calls, and agent handoffs.

- Repository: [OpenAI/openai-agents-python](https://github.com/openai/openai-agents-python)
- Tags: how-to-guide
- Published: 2026-04-17

---

**To stream agent responses and handle streaming events in openai-agents-python, use `Runner.run_streamed()` to obtain a `RunResultStreaming` object, then iterate over `result.stream_events()` to receive typed events including raw LLM deltas, tool calls, and agent handoffs.**

The **openai-agents-python** SDK provides a robust streaming architecture that lets you stream agent responses and handle streaming events in real-time. Unlike standard synchronous execution, streaming mode delivers incremental semantic events—such as reasoning deltas, tool invocations, and handoff notifications—as they occur. This guide walks through the three-layer architecture, event types, and practical implementation patterns using the official source code.

## The Three-Layer Streaming Architecture

Streaming in openai-agents-python is built on three distinct layers that convert raw LLM output into consumable events:

| Layer | Responsibility | Core File |
|-------|---------------|-----------|
| **Event definitions** | Typed dataclasses representing raw LLM deltas, run-item updates, and agent switches | [`src/agents/stream_events.py`](https://github.com/openai/openai-agents-python/blob/main/src/agents/stream_events.py) |
| **Queueing logic** | Converts newly created `RunItem` objects into `StreamEvent` instances and pushes them onto an async queue | [`src/agents/run_internal/streaming.py`](https://github.com/openai/openai-agents-python/blob/main/src/agents/run_internal/streaming.py) |
| **Runner entry point** | Starts a run, creates the streaming queue, and returns a `RunResultStreaming` that exposes `async for event in result.stream_events()` | [`src/agents/run.py`](https://github.com/openai/openai-agents-python/blob/main/src/agents/run.py) (method `Agent.run_streamed`) |

The consumer simply iterates over `result.stream_events()` and reacts to the `type` field of each event.

## Streaming Event Types

All streaming events inherit from the `StreamEvent` union defined in **[`src/agents/stream_events.py`](https://github.com/openai/openai-agents-python/blob/main/src/agents/stream_events.py)**【/cache/repos/github.com/openai/openai-agents-python/main/src/agents/stream_events.py#L10-L61】.

The three primary event types are:

- **`RawResponsesStreamEvent`** — Direct delta payloads from the LLM (e.g., `response.reasoning_summary_text.delta`). Key field: `data: TResponseStreamEvent`.
- **`RunItemStreamEvent`** — High-level items derived from the LLM response, including messages, tool calls, handoffs, and reasoning items. Key fields: `name` (e.g., `"message_output_created"`, `"tool_called"`), `item: RunItem`.
- **`AgentUpdatedStreamEvent`** — Indicates the active agent has been swapped via a handoff. Key field: `new_agent: Agent[Any]`.

Pattern matching is straightforward using the `type` attribute:

```python
if event.type == "raw_response_event":
    # low-level token/character deltas

elif event.type == "run_item_stream_event":
    # higher-level semantic items

elif event.type == "agent_updated_stream_event":
    # handoff occurred

```

## Queueing Logic and Event Generation

When the internal turn-resolution logic creates new `RunItem` objects (message outputs, tool calls, etc.), the streaming helper `stream_step_items_to_queue` in **[`src/agents/run_internal/streaming.py`](https://github.com/openai/openai-agents-python/blob/main/src/agents/run_internal/streaming.py)** walks the list and emits the appropriate `RunItemStreamEvent` onto the async queue【/cache/repos/github.com/openai/openai-agents-python/main/src/agents/run_internal/streaming.py#L27-L66】.

Key implementation details:

- Each `RunItem` subclass maps to a distinct `name` literal (e.g., `ToolCallItem` maps to `"tool_called"`).
- **Approval placeholders** (`ToolApprovalItem`) are deliberately omitted from the stream—they surface later via `result.interruptions`.
- The queue is consumed by the public `RunResultStreaming` iterator.

## Starting a Streamed Run

The class method `Agent.run_streamed` (or `Runner.run_streamed`) in **[`src/agents/run.py`](https://github.com/openai/openai-agents-python/blob/main/src/agents/run.py)** is the public entry point【/cache/repos/github.com/openai/openai-agents-python/main/src/agents/run.py#L51-L67】. It delegates to the default `AgentRunner`, which builds a `RunResultStreaming` object that:

- Holds an `asyncio.Queue[StreamEvent | QueueCompleteSentinel]`.
- Provides `async def stream_events(self) -> AsyncIterator[StreamEvent]` to iterate until a sentinel is received.
- Tracks interruptions separately via `result.interruptions`.

Typical invocation pattern:

```python
result = Agent.run_streamed(agent, prompt)
async for event in result.stream_events():
    # handle event

```

## Handling Streaming Events in Practice

### Processing High-Level Run Items

The **[`examples/basic/stream_items.py`](https://github.com/openai/openai-agents-python/blob/main/examples/basic/stream_items.py)** script demonstrates a minimal consumer that ignores raw deltas and prints high-level events【/cache/repos/github.com/openai/openai-agents-python/main/examples/basic/stream_items.py#L25-L42】.

```python
result = Runner.run_streamed(agent, input="Hello")
async for event in result.stream_events():
    if event.type == "raw_response_event":
        continue
    elif event.type == "agent_updated_stream_event":
        print(f"Agent updated: {event.new_agent.name}")
    elif event.type == "run_item_stream_event":
        if event.item.type == "tool_call_item":
            print(f"-- Tool was called: {getattr(event.item.raw_item, 'name', 'Unknown')}")
        elif event.item.type == "tool_call_output_item":
            print(f"-- Tool output: {event.item.output}")
        elif event.item.type == "message_output_item":
            print(f"-- Message output:\n {ItemHelpers.text_message_output(event.item)}")

```

Key takeaways:

- Use `event.type` to drive control flow.
- Use `event.item.type` to refine handling for `RunItemStreamEvent`.
- Use `ItemHelpers.text_message_output` to render messages.

### Real-Time WebSocket Streaming with HITL

The **[`examples/basic/stream_ws.py`](https://github.com/openai/openai-agents-python/blob/main/examples/basic/stream_ws.py)** example shows a real-world scenario where raw reasoning and output deltas are displayed, tool calls are printed, and human-in-the-loop (HITL) approvals are processed via the interrupt loop【/cache/repos/github.com/openai/openai-agents-python/main/examples/basic/stream_ws.py#L90-L52】.

Key implementation sections:

```python
while True:
    async for event in result.stream_events():
        if event.type == "raw_response_event":
            raw = event.data
            if raw.type == "response.reasoning_summary_text.delta":
                # print incremental reasoning text

            elif raw.type == "response.output_text.delta":
                # print incremental assistant output

            continue

        if event.type != "run_item_stream_event":
            continue

        item = event.item
        if item.type == "tool_call_item":
            print(f"[tool call] {tool_name}({tool_args})")
        elif item.type == "tool_call_output_item":
            print(f"[tool result] {item.output}")

```

After the stream finishes, check `result.interruptions`. If any exist (e.g., a tool needs approval), convert the run into a resumable state (`result.to_state()`), ask the user for approval, call `state.approve(interruption)` (or `reject`), and restart the streamed run with the updated state:

```python
state = result.to_state()
for interruption in result.interruptions:
    if ask_approval(...):
        state.approve(interruption)
    else:
        state.reject(interruption)

result = ws.run_streamed(agent, state)   # resume

```

## Key Source Files

| File | Description |
|------|-------------|
| [`src/agents/stream_events.py`](https://github.com/openai/openai-agents-python/blob/main/src/agents/stream_events.py) | Definition of `RawResponsesStreamEvent`, `RunItemStreamEvent`, and `AgentUpdatedStreamEvent`【/cache/repos/github.com/openai/openai-agents-python/main/src/agents/stream_events.py#L10-L61】 |
| [`src/agents/run_internal/streaming.py`](https://github.com/openai/openai-agents-python/blob/main/src/agents/run_internal/streaming.py) | `stream_step_items_to_queue` and `stream_step_result_to_queue` which push events onto the queue【/cache/repos/github.com/openai/openai-agents-python/main/src/agents/run_internal/streaming.py#L27-L66】 |
| [`src/agents/run.py`](https://github.com/openai/openai-agents-python/blob/main/src/agents/run.py) | Public `Agent.run_streamed` method that returns `RunResultStreaming`【/cache/repos/github.com/openai/openai-agents-python/main/src/agents/run.py#L51-L67】 |
| [`src/agents/run_internal/run_loop.py`](https://github.com/openai/openai-agents-python/blob/main/src/agents/run_internal/run_loop.py) | Where the queue is populated during a streamed turn (calls `stream_step_items_to_queue`)【/cache/repos/github.com/openai/openai-agents-python/main/src/agents/run_internal/run_loop.py#L77-L80】 |
| [`examples/basic/stream_items.py`](https://github.com/openai/openai-agents-python/blob/main/examples/basic/stream_items.py) | Minimal consumer that prints tool calls and messages【/cache/repos/github.com/openai/openai-agents-python/main/examples/basic/stream_items.py#L25-L42】 |
| [`examples/basic/stream_ws.py`](https://github.com/openai/openai-agents-python/blob/main/examples/basic/stream_ws.py) | Full-featured WebSocket-style consumer with reasoning deltas and HITL approvals【/cache/repos/github.com/openai/openai-agents-python/main/examples/basic/stream_ws.py#L90-L52】 |
| [`src/agents/responses_websocket_session.py`](https://github.com/openai/openai-agents-python/blob/main/src/agents/responses_websocket_session.py) | Helper class for exposing streaming results over WebSockets (used by the `stream_ws` example) |

## Common Pitfalls When Streaming

| Symptom | Cause | Fix |
|---------|-------|-----|
| No events appear after the first `raw_response_event`. | You’re consuming the queue inside a `while True` loop but exiting early. Ensure you break only after `result.interruptions` is empty or `result.is_complete` is `True`. | Follow the pattern in [`stream_ws.py`](https://github.com/openai/openai-agents-python/blob/main/stream_ws.py) where the outer `while True` continues if there are pending interruptions. |
| Tool call events are missing. | The `stream_step_items_to_queue` function skips `ToolApprovalItem` (approval placeholders) – these become interruptions, not streamed items. | Handle `result.interruptions` to surface the approval request, then resume the streamed run. |
| `agent_updated_stream_event` never fires. | Handoffs are only generated when an agent explicitly returns a `NextStepHandoff`. Verify your agent’s handoff logic. | Use `Agent.as_tool` or custom handoff code; the event will be emitted automatically. |

## Summary

- **Streaming** in openai-agents-python is built on typed events (`RawResponsesStreamEvent`, `RunItemStreamEvent`, `AgentUpdatedStreamEvent`).
- The **queueing layer** converts each new `RunItem` into a `RunItemStreamEvent` while ignoring approval placeholders.
- The **public API** (`Agent.run_streamed` or `Runner.run_streamed`) returns a `RunResultStreaming` whose `stream_events()` async iterator yields events in order.
- Consumers can react to raw deltas, high-level items, and agent swaps—all illustrated in the [`stream_items.py`](https://github.com/openai/openai-agents-python/blob/main/stream_items.py) and [`stream_ws.py`](https://github.com/openai/openai-agents-python/blob/main/stream_ws.py) examples.
- Interruptions (e.g., HITL approvals) are surfaced separately; after handling them you resume the streamed run with `result.to_state()`.

## Frequently Asked Questions

### What is the difference between raw response events and run item events?

**Raw response events** (`RawResponsesStreamEvent`) contain low-level token deltas straight from the LLM, such as `response.reasoning_summary_text.delta` or `response.output_text.delta`. These are useful for displaying real-time typing indicators or reasoning chains. **Run item events** (`RunItemStreamEvent`) are higher-level semantic objects like `ToolCallItem`, `MessageOutputItem`, or `AgentUpdatedStreamEvent` that represent completed actions or state changes in the agent loop.

### How do I handle human-in-the-loop approvals when streaming?

When a tool requires approval, the SDK does not emit it as a stream event; instead, it pauses the run and adds a `ToolApprovalItem` to `result.interruptions`. After the stream ends, check `result.interruptions`, convert the run to a resumable state with `state = result.to_state()`, then call `state.approve(interruption)` or `state.reject(interruption)` based on user input. Finally, resume by passing the updated state back into `Runner.run_streamed(agent, state)`.

### Why are some tool call events missing from the stream?

The internal function `stream_step_items_to_queue` in [`src/agents/run_internal/streaming.py`](https://github.com/openai/openai-agents-python/blob/main/src/agents/run_internal/streaming.py) deliberately skips `ToolApprovalItem` instances because these represent interruptions requiring external input rather than streamable progress. If you expect a tool call but do not see it in `stream_events()`, check `result.interruptions` instead. This design ensures that streaming consumers only receive definitive actions while pending approvals are handled separately.

### How do I resume a streamed run after an interruption?

After processing interruptions, convert the completed run into a serializable state using `state = result.to_state()`. This state object captures the conversation history, pending tool calls, and agent context. Update the state by calling `state.approve(interruption)` or `state.reject(interruption)` for each interruption, then pass the modified state as the input to a new `Runner.run_streamed(agent, state)` call. The runner will automatically restore context and continue processing from where it left off.