Skip to content

Commit 30048a6

Browse files
committed
Add openai_agents streaming sample
Demonstrates buffered token streaming for OpenAI Agents-backed workflows via temporalio.contrib.workflow_streams (experimental, contrib/pubsub branch of sdk-python). The OpenAI Agents plugin's ModelActivityParameters carries a streaming_event_topic; the model activity publishes raw stream events to that topic with a configurable flush interval (default 100ms), and the workflow emits a sentinel on a "done" topic when Runner.run_streamed finishes. Subscribers iterate (events, done) and break on the sentinel — race_with_workflow handles the case where the workflow fails before publishing the sentinel. Two scenarios: - stream_text: text-delta events from a simple haiku agent - stream_items: agent-update / handoff / tool-call events across a multi-agent workflow with a joke-rating activity
1 parent 57b104e commit 30048a6

12 files changed

Lines changed: 513 additions & 0 deletions

openai_agents/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,6 @@ Each directory contains a complete example with its own README for detailed inst
3636
- **[Customer Service](./customer_service/README.md)** - Interactive customer service agent with escalation capabilities, demonstrating conversational workflows.
3737
- **[Reasoning Content](./reasoning_content/README.md)** - Example of how to retrieve the thought process of reasoning models.
3838
- **[Financial Research Agent](./financial_research_agent/README.md)** - Multi-agent financial research system with planner, search, analyst, writer, and verifier agents collaborating.
39+
- **[Streaming](./streaming/README.md)** - Buffered token streaming (events coalesced into batches over a configurable flush interval, default 100ms) via `temporalio.contrib.workflow_streams`. **Experimental — requires the [`contrib/pubsub` branch][workflow-streams-branch] of sdk-python.**
40+
41+
[workflow-streams-branch]: https://github.com/temporalio/sdk-python/tree/contrib/pubsub

openai_agents/streaming/README.md

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# Streaming OpenAI Agents
2+
3+
> **Experimental.** These samples target the streaming hooks added to
4+
> `temporalio.contrib.openai_agents` on the [`contrib/pubsub` branch of
5+
> sdk-python][branch], which is not yet released. Install sdk-python
6+
> from that branch (e.g. `uv pip install -e <path-to-sdk-python>` after
7+
> checking out the branch) to run them locally.
8+
9+
[branch]: https://github.com/temporalio/sdk-python/tree/contrib/pubsub
10+
11+
The OpenAI Agents SDK supports streaming via `Runner.run_streamed`, which
12+
yields `TResponseStreamEvent`s as the model produces them. Inside a
13+
Temporal workflow the model call runs in an activity, so the workflow
14+
cannot iterate the live HTTP stream directly. The plugin's streaming
15+
support runs `model.stream_response()` in the activity and publishes
16+
each event to the workflow's `temporalio.contrib.workflow_streams`. The
17+
publisher coalesces events into batches over `streaming_event_batch_interval`
18+
(default 100ms) before sending them as a signal — call this **buffered
19+
token streaming**: deltas reach external subscribers within a batch
20+
window of being produced, not on every byte. At typical model speeds a
21+
single batch carries multiple tokens, so output arrives in small bursts
22+
rather than glyph-by-glyph — close enough for most UIs, though the
23+
cadence is visible next to a true per-token render. Tune
24+
`streaming_event_batch_interval` to trade signal volume for smoothness.
25+
26+
The two samples here mirror the upstream openai-agents-python basic
27+
streaming examples.
28+
29+
## `stream_text` — buffered text deltas
30+
31+
Adapted from [`examples/basic/stream_text.py`][upstream-text]. Subscribes
32+
to `ResponseTextDeltaEvent`s and prints them as they arrive (batched at
33+
the broker's flush interval, see above).
34+
35+
[upstream-text]: https://github.com/openai/openai-agents-python/blob/main/examples/basic/stream_text.py
36+
37+
```bash
38+
# Terminal 1
39+
uv run openai_agents/streaming/run_worker.py
40+
41+
# Terminal 2
42+
uv run openai_agents/streaming/run_stream_text_workflow.py
43+
```
44+
45+
## `stream_items` — agent-level events with a tool call
46+
47+
Adapted from [`examples/basic/stream_items.py`][upstream-items]. Renders
48+
agent updates, tool calls, tool outputs, and message outputs as a
49+
play-by-play.
50+
51+
[upstream-items]: https://github.com/openai/openai-agents-python/blob/main/examples/basic/stream_items.py
52+
53+
```bash
54+
uv run openai_agents/streaming/run_stream_items_workflow.py
55+
```
56+
57+
## How it works
58+
59+
1. The workflow constructs a `WorkflowStream` from `@workflow.init`.
60+
2. The plugin's `OpenAIAgentsPlugin` is configured with
61+
`streaming_event_topic="events"`. The plugin routes
62+
`Runner.run_streamed` calls to `invoke_model_activity_streaming`.
63+
3. Inside that activity, each `TResponseStreamEvent` from the live HTTP
64+
stream is appended to a list (returned to the workflow when the
65+
activity completes) **and** published to the stream via
66+
`WorkflowStreamClient.from_within_activity()`.
67+
4. The workflow publishes a sentinel to a separate `done` topic right
68+
before returning, so the subscriber knows the stream is finished.
69+
5. External code subscribes with `WorkflowStreamClient.create(...).subscribe(
70+
["events", "done"])` and breaks on the `done` event. We leave
71+
`result_type` unset and decode events manually because the two
72+
topics carry different types. The runner also races the consumer
73+
against `handle.result()` so a workflow failure surfaces as an
74+
exception rather than blocking the subscriber forever.
75+
76+
In the workflow, `stream_events()` resolves only after the activity
77+
returns, so the workflow itself does not see deltas as they arrive — the
78+
streaming benefit is for external observers. If you want the workflow to
79+
react incrementally, subscribe from a child workflow or activity rather
80+
than from the workflow that hosts the stream.
81+
82+
## Notes
83+
84+
* `streaming_event_topic` defaults to `None` (no publishing). Set it on
85+
`ModelActivityParameters` to a topic such as `"events"` to publish raw
86+
stream events.
87+
* Streaming is incompatible with `use_local_activity=True`: local
88+
activities can neither heartbeat nor send signals back to the workflow.
89+
* The workflow must host a `WorkflowStream`. Without one, the plugin's
90+
publish signals are unhandled and silently dropped by Temporal.

openai_agents/streaming/__init__.py

Whitespace-only changes.

openai_agents/streaming/activities/__init__.py

Whitespace-only changes.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from __future__ import annotations
2+
3+
import random
4+
5+
from temporalio import activity
6+
7+
8+
@activity.defn
9+
async def how_many_jokes() -> int:
10+
"""Return a random integer of jokes to tell between 1 and 10 (inclusive)."""
11+
return random.randint(1, 10)
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import uuid
5+
6+
from agents import ItemHelpers
7+
from agents.items import TResponseStreamEvent
8+
from temporalio.api.common.v1 import Payload
9+
from temporalio.client import Client
10+
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
11+
from temporalio.contrib.workflow_streams import WorkflowStreamClient
12+
13+
from openai_agents.streaming.shared import (
14+
TASK_QUEUE,
15+
TOPIC_DONE,
16+
TOPIC_EVENTS,
17+
race_with_workflow,
18+
)
19+
from openai_agents.streaming.workflows.stream_items_workflow import (
20+
StreamItemsInput,
21+
StreamItemsWorkflow,
22+
)
23+
24+
25+
async def main() -> None:
26+
client = await Client.connect(
27+
"localhost:7233",
28+
plugins=[OpenAIAgentsPlugin()],
29+
)
30+
31+
workflow_id = f"stream-items-{uuid.uuid4().hex[:8]}"
32+
handle = await client.start_workflow(
33+
StreamItemsWorkflow.run,
34+
StreamItemsInput(),
35+
id=workflow_id,
36+
task_queue=TASK_QUEUE,
37+
)
38+
39+
stream = WorkflowStreamClient.create(client, workflow_id)
40+
converter = client.data_converter.payload_converter
41+
42+
async def render() -> None:
43+
print("=== Run starting ===")
44+
async for item in stream.subscribe([TOPIC_EVENTS, TOPIC_DONE]):
45+
if item.topic == TOPIC_DONE:
46+
return
47+
assert isinstance(item.data, Payload)
48+
event = converter.from_payload(item.data, TResponseStreamEvent)
49+
if event.type == "raw_response_event":
50+
continue
51+
if event.type == "agent_updated_stream_event":
52+
print(f"Agent updated: {event.new_agent.name}")
53+
elif event.type == "run_item_stream_event":
54+
if event.item.type == "tool_call_item":
55+
name = getattr(event.item.raw_item, "name", "Unknown Tool")
56+
print(f"-- Tool was called: {name}")
57+
elif event.item.type == "tool_call_output_item":
58+
print(f"-- Tool output: {event.item.output}")
59+
elif event.item.type == "message_output_item":
60+
print(
61+
"-- Message output:\n "
62+
f"{ItemHelpers.text_message_output(event.item)}"
63+
)
64+
65+
await race_with_workflow(render(), handle)
66+
print("=== Run complete ===")
67+
68+
69+
if __name__ == "__main__":
70+
asyncio.run(main())
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import uuid
5+
6+
from agents.items import TResponseStreamEvent
7+
from openai.types.responses import ResponseTextDeltaEvent
8+
from temporalio.api.common.v1 import Payload
9+
from temporalio.client import Client
10+
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
11+
from temporalio.contrib.workflow_streams import WorkflowStreamClient
12+
13+
from openai_agents.streaming.shared import (
14+
TASK_QUEUE,
15+
TOPIC_DONE,
16+
TOPIC_EVENTS,
17+
race_with_workflow,
18+
)
19+
from openai_agents.streaming.workflows.stream_text_workflow import (
20+
StreamTextInput,
21+
StreamTextWorkflow,
22+
)
23+
24+
25+
async def main() -> None:
26+
client = await Client.connect(
27+
"localhost:7233",
28+
plugins=[OpenAIAgentsPlugin()],
29+
)
30+
31+
workflow_id = f"stream-text-{uuid.uuid4().hex[:8]}"
32+
handle = await client.start_workflow(
33+
StreamTextWorkflow.run,
34+
StreamTextInput(prompt="Please tell me 5 jokes."),
35+
id=workflow_id,
36+
task_queue=TASK_QUEUE,
37+
)
38+
39+
stream = WorkflowStreamClient.create(client, workflow_id)
40+
converter = client.data_converter.payload_converter
41+
42+
async def render() -> None:
43+
# Subscribe to both the streaming-event topic and the workflow's
44+
# done-sentinel so we can break cleanly without racing
45+
# handle.result() against the next poll. result_type is left
46+
# unset (we get raw Payloads) because the two topics carry
47+
# different types — we decode based on item.topic.
48+
async for item in stream.subscribe([TOPIC_EVENTS, TOPIC_DONE]):
49+
if item.topic == TOPIC_DONE:
50+
return
51+
assert isinstance(item.data, Payload)
52+
event = converter.from_payload(item.data, TResponseStreamEvent)
53+
if event.type == "raw_response_event" and isinstance(
54+
event.data, ResponseTextDeltaEvent
55+
):
56+
print(event.data.delta, end="", flush=True)
57+
58+
result = await race_with_workflow(render(), handle)
59+
print("\n--- final result ---")
60+
print(result)
61+
62+
63+
if __name__ == "__main__":
64+
asyncio.run(main())
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import logging
5+
from datetime import timedelta
6+
7+
from temporalio.client import Client
8+
from temporalio.contrib.openai_agents import (
9+
ModelActivityParameters,
10+
OpenAIAgentsPlugin,
11+
)
12+
from temporalio.worker import Worker
13+
14+
from openai_agents.streaming.activities.joke_activities import how_many_jokes
15+
from openai_agents.streaming.shared import TASK_QUEUE, TOPIC_EVENTS
16+
from openai_agents.streaming.workflows.stream_items_workflow import (
17+
StreamItemsWorkflow,
18+
)
19+
from openai_agents.streaming.workflows.stream_text_workflow import (
20+
StreamTextWorkflow,
21+
)
22+
23+
24+
async def main() -> None:
25+
logging.basicConfig(level=logging.INFO)
26+
client = await Client.connect(
27+
"localhost:7233",
28+
plugins=[
29+
OpenAIAgentsPlugin(
30+
model_params=ModelActivityParameters(
31+
# Streaming relies on heartbeats to detect a stuck
32+
# LLM call. Pick a heartbeat_timeout comfortably
33+
# larger than the expected delta cadence.
34+
heartbeat_timeout=timedelta(seconds=10),
35+
start_to_close_timeout=timedelta(minutes=5),
36+
# streaming_event_topic defaults to None (no
37+
# publishing). Set to a topic to publish raw stream
38+
# events for external subscribers.
39+
streaming_event_topic=TOPIC_EVENTS,
40+
),
41+
),
42+
],
43+
)
44+
45+
worker = Worker(
46+
client,
47+
task_queue=TASK_QUEUE,
48+
workflows=[StreamTextWorkflow, StreamItemsWorkflow],
49+
activities=[how_many_jokes],
50+
)
51+
await worker.run()
52+
53+
54+
if __name__ == "__main__":
55+
asyncio.run(main())

openai_agents/streaming/shared.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from collections.abc import Coroutine
5+
from typing import Any, TypeVar
6+
7+
from temporalio.client import WorkflowHandle
8+
9+
TASK_QUEUE = "openai-agents-streaming-task-queue"
10+
11+
# Topic the plugin publishes raw model stream events to. Must match
12+
# OpenAIAgentsPlugin(model_params=ModelActivityParameters(streaming_event_topic=...)).
13+
TOPIC_EVENTS = "events"
14+
15+
# Sentinel topic the workflow publishes to once Runner.run_streamed has
16+
# finished. Subscribers iterate (events, done) and break on the done
17+
# event — this avoids racing handle.result() against the subscriber's
18+
# poll cycle.
19+
TOPIC_DONE = "done"
20+
21+
22+
T = TypeVar("T")
23+
24+
25+
async def race_with_workflow(
26+
consumer: Coroutine[Any, Any, None],
27+
handle: WorkflowHandle[Any, T],
28+
) -> T:
29+
"""Run a subscriber concurrently with the workflow.
30+
31+
If the workflow finishes (success or failure) before the subscriber
32+
sees its sentinel, cancel the subscriber and surface the workflow
33+
result. If the subscriber finishes first (clean sentinel exit),
34+
wait for the workflow result. A non-cancellation failure in the
35+
subscriber is propagated either way.
36+
37+
Without this, a workflow that raises before publishing the sentinel
38+
would leave the subscriber blocked on its next poll forever.
39+
"""
40+
consumer_task = asyncio.create_task(consumer)
41+
result_task = asyncio.create_task(handle.result())
42+
we_cancelled_consumer = False
43+
try:
44+
await asyncio.wait(
45+
[consumer_task, result_task],
46+
return_when=asyncio.FIRST_COMPLETED,
47+
)
48+
# Stop the subscriber whether it reached its sentinel or not.
49+
if not consumer_task.done():
50+
consumer_task.cancel()
51+
we_cancelled_consumer = True
52+
# gather(return_exceptions=True) drains both tasks. Child
53+
# cancellation surfaces as a returned CancelledError; only
54+
# cancellation we initiated is expected — anything else
55+
# (including a third party cancelling the consumer behind
56+
# our back) propagates.
57+
consumer_outcome, workflow_outcome = await asyncio.gather(
58+
consumer_task, result_task, return_exceptions=True
59+
)
60+
if isinstance(consumer_outcome, asyncio.CancelledError):
61+
if not we_cancelled_consumer:
62+
raise consumer_outcome
63+
elif isinstance(consumer_outcome, BaseException):
64+
raise consumer_outcome
65+
if isinstance(workflow_outcome, BaseException):
66+
raise workflow_outcome
67+
return workflow_outcome
68+
finally:
69+
# Idempotent cleanup. try/finally re-raises the in-flight
70+
# exception (if any) after finally completes, so swallowing
71+
# cleanup failures here is safe.
72+
for task in (consumer_task, result_task):
73+
if not task.done():
74+
task.cancel()
75+
for task in (consumer_task, result_task):
76+
try:
77+
await task
78+
except BaseException:
79+
pass

openai_agents/streaming/workflows/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)