How to Implement an OpenAI Stream for Real-Time Responses in Python

Set stream=True when calling any completion method to receive a Stream or AsyncStream object that yields typed response chunks incrementally as the model generates them.

The openai-python SDK implements a high-performance streaming architecture in src/openai/_streaming.py that processes server-sent events (SSE) in real-time. By leveraging the Stream and AsyncStream iterator classes, your application can process tokens immediately rather than waiting for the complete response payload. This guide demonstrates the internal mechanics and production-ready patterns for implementing openai stream functionality.

Understanding the Streaming Architecture

The SDK's streaming implementation centers on two generic iterator classes that handle the complete lifecycle of an SSE connection.

The Stream and AsyncStream Classes

When you enable streaming, the SDK instantiates Stream[T] (synchronous) or AsyncStream[T] (asynchronous) as defined in src/openai/_streaming.py. These classes implement the Python iterator protocol through __iter__ and __aiter__ methods, where T represents the specific response model (e.g., Completion).

The iterator performs five critical operations internally:

  1. Creates an SSE decoder via client._make_sse_decoder() to parse the raw HTTP/2 byte stream
  2. Iterates over SSE events using self._decoder.iter_bytes(self.response.iter_bytes())
  3. Detects the [DONE] sentinel to terminate the stream gracefully
  4. Raises APIError for error events or Assistants-specific "thread." events encountered mid-stream
  5. Transforms JSON payloads into typed SDK models through client._process_response_data

SSE Decoding and Resource Management

The SSEDecoder and SSEBytesDecoder classes (lines 68-124 in src/openai/_streaming.py) parse raw SSE protocol data into ServerSentEvent objects. Each event exposes event, data, id, and retry attributes, along with a json() helper method for payload deserialization.

Resource cleanup is guaranteed through finally blocks at lines 58-66, which ensure the underlying HTTP response is closed when iteration completes—either via response.close() for synchronous streams or await response.aclose() for asynchronous streams.

Implementing Synchronous OpenAI Streams

For blocking applications, use the standard OpenAI client with stream=True. The method returns a Stream[Completion] object that you can iterate manually or automatically.

from openai import OpenAI

client = OpenAI()  # Reads OPENAI_API_KEY from environment

response = client.completions.create(
    model="gpt-3.5-turbo-instruct",
    prompt="Explain streaming in one sentence.",
    max_tokens=50,
    temperature=0,
    stream=True,  # Enable streaming

)

# Manual iteration for custom first-chunk handling

first_chunk = next(response)
print("First chunk:", first_chunk.to_json())

# Automatic iteration until [DONE] sentinel

for chunk in response:
    print(chunk.to_json())

Each chunk is a fully-typed Completion object containing the fields received up to that point in the generation.

Implementing Asynchronous OpenAI Streams

For non-blocking applications, the AsyncOpenAI client returns an AsyncStream[Completion] that supports async for loops or manual async iteration.

import asyncio
from openai import AsyncOpenAI

async def run():
    client = AsyncOpenAI()
    response = await client.completions.create(
        model="gpt-3.5-turbo-instruct",
        prompt="Explain streaming in one sentence.",
        max_tokens=50,
        temperature=0,
        stream=True,  # Enable streaming

    )

    # Manual async iteration

    first_chunk = await response.__anext__()  # Python 3.10+: await anext(response)

    print("First async chunk:", first_chunk.to_json())

    # Automatic async iteration

    async for chunk in response:
        print(chunk.to_json())

asyncio.run(run())

Error Handling and Edge Cases

Wrap your iteration loops in try/except blocks to catch APIError exceptions defined in src/openai/_exceptions.py. The SDK raises these when the server returns error payloads inside streaming chunks or when processing Assistants API thread events.

For Assistants-specific implementations, inspect the event field on chunks to handle lifecycle events like thread.run.created. The streaming architecture automatically manages connection state, but you should explicitly break from iteration loops if your application logic requires early termination to ensure the finally blocks execute and release the HTTP connection.

Summary

  • Stream[T] and AsyncStream[T] in src/openai/_streaming.py provide the core iterator implementation for synchronous and asynchronous openai stream processing.
  • SSE decoding happens transparently through SSEDecoder, which parses server-sent events into ServerSentEvent objects with proper handling of the [DONE] sentinel.
  • Resource safety is guaranteed by finally blocks (lines 58-66) that close HTTP responses automatically when iteration completes or raises exceptions.
  • Usage pattern requires only setting stream=True and iterating over the returned object with standard for or async for loops.

Frequently Asked Questions

How does the SDK parse raw SSE bytes into Python objects?

The SSEDecoder class in src/openai/_streaming.py aggregates multi-line data fields according to the SSE specification, respecting event, id, and retry fields. It yields ServerSentEvent objects, which the stream then passes to client._process_response_data to deserialize into typed models like Completion.

What happens when the server returns an error mid-stream?

The iterator detects error events in the SSE payload and raises an APIError from src/openai/_exceptions.py immediately, halting the stream. You should wrap your iteration logic in try/except APIError blocks to handle these failures gracefully without crashing your application.

Can I manually control iteration instead of using a for loop?

Yes. Both Stream and AsyncStream support manual iteration. Use next(response) for synchronous streams or await response.__anext__() (Python 3.10+ await anext(response)) for asynchronous streams. This pattern is useful when you need special handling for the first chunk or conditional termination logic.

How do I ensure connections close properly when streaming?

The SDK handles connection cleanup automatically through finally blocks at lines 58-66 of src/openai/_streaming.py, which call response.close() or await response.aclose(). However, you should avoid breaking out of iteration loops without proper exception handling, and always exhaust the iterator or let it raise an exception to trigger the cleanup code.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →