Structured Outputs by Example Star on GitHub
Stay updated when new content is added and get tips from the Instructor team
Streaming

Streaming Basics

Edit
Get started with streaming responses in Instructor for real-time processing.
Streaming allows you to receive partial responses from LLMs as they're being generated,# rather than waiting for the complete response.## Instructor offers two main ways to stream structured data:## 1. Partial: Stream a single object as it's being populated field by field# 2. Iterable: Stream multiple complete objects one at a time
import instructor
from openai import OpenAI
from pydantic import BaseModel

class User(BaseModel):
    name: str
    age: int
    bio: str

Patch the OpenAI client
client = instructor.from_openai(OpenAI())

Create a basic streaming response and process the chunks
def stream_user_info():
    stream = client.chat.completions.create(
        model="gpt-3.5-turbo",
        response_model=User,
        stream=True,  # Enable streaming
        messages=[
            {"role": "user", "content": "Generate a profile for a fictional user named Alice who is 28 years old."}
        ]
    )

    # Each chunk contains the partial model constructed so far
    for chunk in stream:
        print(f"Received chunk: {chunk}")
        
    # Return the final complete object
    return chunk

user = stream_user_info()
print(f"\nFinal result: {user}")

from instructor import Partial

Stream with Partial objects for field-by-field progress tracking
def stream_user_with_partial():
    user_stream = client.chat.completions.create_partial(
        model="gpt-3.5-turbo",
        response_model=User,
        messages=[
            {"role": "user", "content": "Generate a profile for a fictional user named Bob who is 35 years old and works as a software developer."}
        ]
    )

    # Show progress as each field gets filled in
    print("Streaming user data:")

    for partial_user in user_stream:
        # Fields appear as they're generated by the model
        print(f"Current state: name={partial_user.name}, age={partial_user.age}, bio={partial_user.bio!r}")

Example output:# Current state: name=None, age=None, bio=None# Current state: name='Bob', age=None, bio=None# Current state: name='Bob', age=35, bio=None# Current state: name='Bob', age=35, bio='Software developer with 10 years of experience...'
from typing import Dict, Any

class ProgressTracker:
    def __init__(self):
        self.progress = {}

Monitor completion percentage and track field updates
def update(self, partial_user: Partial[User]):
        # Calculate what percentage of fields are now populated
        total_fields = len(User.model_fields)
        populated = sum(1 for v in [partial_user.name, partial_user.age, partial_user.bio] if v is not None)
        completion = int(populated / total_fields * 100)
        
        # Build a dictionary of only the fields that have values
        data = {}
        if partial_user.name is not None:
            data["name"] = partial_user.name
        if partial_user.age is not None:
            data["age"] = partial_user.age
        if partial_user.bio is not None:
            data["bio"] = partial_user.bio

        self.progress = {
            "completion": f"{completion}%",
            "data": data
        }

        return self.progress

def stream_with_progress():
    tracker = ProgressTracker()

    user_stream = client.chat.completions.create_partial(
        model="gpt-3.5-turbo",
        response_model=User,
        messages=[
            {"role": "user", "content": "Generate a profile for a fictional user named Carol who is 42 years old."}
        ]
    )

    for partial_user in user_stream:
        progress = tracker.update(partial_user)
        print(f"Progress: {progress['completion']} - Current data: {progress['data']}")

Example output:# Progress: 33% - Current data: {'name': 'Carol'}# Progress: 66% - Current data: {'name': 'Carol', 'age': 42}# Progress: 100% - Current data: {'name': 'Carol', 'age': 42, 'bio': 'Carol is a passionate...'}
import asyncio
from openai import AsyncOpenAI

Demonstrate async streaming with await syntax
async def stream_async():
    async_client = instructor.from_openai(AsyncOpenAI())

    # Use async/await pattern for non-blocking streaming
    user_stream = await async_client.chat.completions.create_partial(
        model="gpt-3.5-turbo",
        response_model=User,
        messages=[
            {"role": "user", "content": "Generate a profile for a fictional user named Dave who is 31 years old."}
        ]
    )

    # Process stream with async for loop
    async for partial_user in user_stream:
        print(f"Async stream update: {partial_user}")

Run the async function
asyncio.run(stream_async())

Running the Example

First, install Instructor and any dependencies
$ pip install instructor pydantic openai
Run the Python script to see streaming in action
$ python streaming-basics.py

Further Information