Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/api_docs/docs/api_reference/context.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
show_root_heading: true
show_root_full_path: false
members:
- __init__
- collect_events
- from_dict
- get_result
Expand Down
10 changes: 3 additions & 7 deletions docs/src/content/docs/workflows/managing_events.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,11 @@ async for event in handler.stream_events():
result = await handler
```

There is also the possibility of streaming internal events (such as changes occurring while the step is running, modifications of the workflow state or variations in the size of the internal queues). In order to do so, you need to pass `expose_internal = True` to `stream_events`.
There is also the possibility of streaming internal events (such as changes occurring while the step is running and modifications of the workflow state). In order to do so, you need to pass `expose_internal = True` to `stream_events`.

All the internal events are instances of `InternalDispatchEvent`, but they can be divided into two sub-classes:
All the internal events are instances of `InternalDispatchEvent`. The primary internal event is:

- `StepStateChanged`: exposes internal changes in the state of the event, including whether the step is running or in progress, what worker it is running on and what events it takes as input and output, as well as changes in the workflow state.
- `EventsQueueChanged`: reports the state of the internal queues.
- `StepStateChanged`: exposes internal changes in the state of the event, including whether the step is PREPARING (queued), RUNNING, or NOT_RUNNING, what worker it is running on and what events it takes as input and output.

Here is how you can stream these internal events:

Expand All @@ -138,9 +137,6 @@ async for event in handler.stream_events(expose_internal=True):
print("Input event for current step:", event.input_event_name)
print("Workflow state at current step:", event.context_state or "No state reported")
print("Output event of current step:", event.output_event_name or "No output event yet")
elif isinstance(event, EventsQueueChanged):
print("Queue name:", event.name)
print("Queue size:", event.size)
# other event streaming logic here

result = await handler
Expand Down
191 changes: 42 additions & 149 deletions examples/streaming_internal_events.ipynb

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ dev = [
"hatch>=1.14.1",
"pyyaml>=6.0.2",
"packaging>=21.0",
"pytest-xdist>=3.8.0"
"pytest-xdist>=3.8.0",
"pytest-timeout>=2.4.0"
]

[project]
Expand All @@ -32,6 +33,7 @@ dependencies = [
[project.optional-dependencies]
server = ["starlette>=0.39.0", "uvicorn>=0.32.0"]
client = ["httpx>=0.28.1,<1"]
dbos = ["dbos>=1.14.0"]

[tool.basedpyright]
typeCheckingMode = "standard"
Expand Down
Loading