# How to Implement an OpenAI Stream for Real-Time Responses in Python

> Learn how to implement an OpenAI stream in Python for real-time responses. Set stream=True in your API calls to receive incremental model output.

- Repository: [OpenAI/openai-python](https://github.com/openai/openai-python)
- Tags: how-to-guide
- Published: 2026-02-18

---

**Set `stream=True` when calling any completion method to receive a `Stream` or `AsyncStream` object that yields typed response chunks incrementally as the model generates them.**

The `openai-python` SDK implements a high-performance streaming architecture in [`src/openai/_streaming.py`](https://github.com/openai/openai-python/blob/main/src/openai/_streaming.py) that processes server-sent events (SSE) in real-time. By leveraging the `Stream` and `AsyncStream` iterator classes, your application can process tokens immediately rather than waiting for the complete response payload. This guide demonstrates the internal mechanics and production-ready patterns for implementing openai stream functionality.

## Understanding the Streaming Architecture

The SDK's streaming implementation centers on two generic iterator classes that handle the complete lifecycle of an SSE connection.

### The Stream and AsyncStream Classes

When you enable streaming, the SDK instantiates **`Stream[T]`** (synchronous) or **`AsyncStream[T]`** (asynchronous) as defined in [`src/openai/_streaming.py`](https://github.com/openai/openai-python/blob/main/src/openai/_streaming.py). These classes implement the Python iterator protocol through `__iter__` and `__aiter__` methods, where `T` represents the specific response model (e.g., `Completion`).

The iterator performs five critical operations internally:

1. **Creates an SSE decoder** via `client._make_sse_decoder()` to parse the raw HTTP/2 byte stream
2. **Iterates over SSE events** using `self._decoder.iter_bytes(self.response.iter_bytes())`
3. **Detects the `[DONE]` sentinel** to terminate the stream gracefully
4. **Raises `APIError`** for error events or Assistants-specific "thread." events encountered mid-stream
5. **Transforms JSON payloads** into typed SDK models through `client._process_response_data`

### SSE Decoding and Resource Management

The **`SSEDecoder`** and **`SSEBytesDecoder`** classes (lines 68-124 in [`src/openai/_streaming.py`](https://github.com/openai/openai-python/blob/main/src/openai/_streaming.py)) parse raw SSE protocol data into `ServerSentEvent` objects. Each event exposes `event`, `data`, `id`, and `retry` attributes, along with a `json()` helper method for payload deserialization.

Resource cleanup is guaranteed through `finally` blocks at lines 58-66, which ensure the underlying HTTP response is closed when iteration completes—either via `response.close()` for synchronous streams or `await response.aclose()` for asynchronous streams.

## Implementing Synchronous OpenAI Streams

For blocking applications, use the standard `OpenAI` client with `stream=True`. The method returns a `Stream[Completion]` object that you can iterate manually or automatically.

```python
from openai import OpenAI

client = OpenAI()  # Reads OPENAI_API_KEY from environment

response = client.completions.create(
    model="gpt-3.5-turbo-instruct",
    prompt="Explain streaming in one sentence.",
    max_tokens=50,
    temperature=0,
    stream=True,  # Enable streaming

)

# Manual iteration for custom first-chunk handling

first_chunk = next(response)
print("First chunk:", first_chunk.to_json())

# Automatic iteration until [DONE] sentinel

for chunk in response:
    print(chunk.to_json())

```

Each `chunk` is a fully-typed `Completion` object containing the fields received up to that point in the generation.

## Implementing Asynchronous OpenAI Streams

For non-blocking applications, the `AsyncOpenAI` client returns an `AsyncStream[Completion]` that supports `async for` loops or manual async iteration.

```python
import asyncio
from openai import AsyncOpenAI

async def run():
    client = AsyncOpenAI()
    response = await client.completions.create(
        model="gpt-3.5-turbo-instruct",
        prompt="Explain streaming in one sentence.",
        max_tokens=50,
        temperature=0,
        stream=True,  # Enable streaming

    )

    # Manual async iteration

    first_chunk = await response.__anext__()  # Python 3.10+: await anext(response)

    print("First async chunk:", first_chunk.to_json())

    # Automatic async iteration

    async for chunk in response:
        print(chunk.to_json())

asyncio.run(run())

```

## Error Handling and Edge Cases

Wrap your iteration loops in `try/except` blocks to catch **`APIError`** exceptions defined in [`src/openai/_exceptions.py`](https://github.com/openai/openai-python/blob/main/src/openai/_exceptions.py). The SDK raises these when the server returns error payloads inside streaming chunks or when processing Assistants API thread events.

For Assistants-specific implementations, inspect the `event` field on chunks to handle lifecycle events like `thread.run.created`. The streaming architecture automatically manages connection state, but you should explicitly break from iteration loops if your application logic requires early termination to ensure the `finally` blocks execute and release the HTTP connection.

## Summary

- **`Stream[T]` and `AsyncStream[T]`** in [`src/openai/_streaming.py`](https://github.com/openai/openai-python/blob/main/src/openai/_streaming.py) provide the core iterator implementation for synchronous and asynchronous openai stream processing.
- **SSE decoding** happens transparently through `SSEDecoder`, which parses server-sent events into `ServerSentEvent` objects with proper handling of the `[DONE]` sentinel.
- **Resource safety** is guaranteed by `finally` blocks (lines 58-66) that close HTTP responses automatically when iteration completes or raises exceptions.
- **Usage pattern** requires only setting `stream=True` and iterating over the returned object with standard `for` or `async for` loops.

## Frequently Asked Questions

### How does the SDK parse raw SSE bytes into Python objects?

The `SSEDecoder` class in [`src/openai/_streaming.py`](https://github.com/openai/openai-python/blob/main/src/openai/_streaming.py) aggregates multi-line `data` fields according to the SSE specification, respecting `event`, `id`, and `retry` fields. It yields `ServerSentEvent` objects, which the stream then passes to `client._process_response_data` to deserialize into typed models like `Completion`.

### What happens when the server returns an error mid-stream?

The iterator detects error events in the SSE payload and raises an **`APIError`** from [`src/openai/_exceptions.py`](https://github.com/openai/openai-python/blob/main/src/openai/_exceptions.py) immediately, halting the stream. You should wrap your iteration logic in `try/except APIError` blocks to handle these failures gracefully without crashing your application.

### Can I manually control iteration instead of using a for loop?

Yes. Both `Stream` and `AsyncStream` support manual iteration. Use `next(response)` for synchronous streams or `await response.__anext__()` (Python 3.10+ `await anext(response)`) for asynchronous streams. This pattern is useful when you need special handling for the first chunk or conditional termination logic.

### How do I ensure connections close properly when streaming?

The SDK handles connection cleanup automatically through `finally` blocks at lines 58-66 of [`src/openai/_streaming.py`](https://github.com/openai/openai-python/blob/main/src/openai/_streaming.py), which call `response.close()` or `await response.aclose()`. However, you should avoid breaking out of iteration loops without proper exception handling, and always exhaust the iterator or let it raise an exception to trigger the cleanup code.