-
Notifications
You must be signed in to change notification settings - Fork 58
Expand file tree
/
Copy pathhandler.py
More file actions
271 lines (222 loc) · 9.46 KB
/
handler.py
File metadata and controls
271 lines (222 loc) · 9.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# SPDX-License-Identifier: MIT
# Copyright (c) 2026 LlamaIndex Inc.
from __future__ import annotations
import asyncio
import functools
import warnings
from collections.abc import Generator
from typing import TYPE_CHECKING, Any, AsyncGenerator, Awaitable
from workflows.runtime.types.plugin import (
ExternalRunAdapter,
as_v2_runtime_compatibility_shim,
)
from .errors import WorkflowCancelledByUser, WorkflowRuntimeError
from .events import Event, InternalDispatchEvent, StopEvent, WorkflowCancelledEvent
from .types import RunResultT
if TYPE_CHECKING:
from .context import Context
from .workflow import Workflow
class WorkflowHandler(Awaitable[RunResultT]):
"""
Stable interface for communicating with a running workflow. Is awaitable and streamable, and supports things like cancellation.
"""
_ctx: Context
async def _await_result(self) -> RunResultT:
stop_event = await self.stop_event_result()
return stop_event.result if type(stop_event) is StopEvent else stop_event
def __await__(self) -> Generator[Any, Any, RunResultT]:
return self._await_result().__await__()
def __init__(
self,
workflow: "Workflow",
external_adapter: ExternalRunAdapter,
ctx: "Context[Any] | None" = None,
) -> None:
from .context import Context
self._workflow = workflow
self._external_adapter = external_adapter
# TODO(v3): Remove ctx parameter. The handler will just be the external face.
self._ctx = (
ctx
if ctx is not None
else Context._create_external(
workflow=workflow, external_adapter=external_adapter
)
)
self.run_id = external_adapter.run_id
self._all_events_consumed = False
self._result: StopEvent | None = None
self._result_exception: BaseException | None = None
self._result_task = asyncio.create_task(self._wait_for_result())
self._result_task.add_done_callback(self._handle_result_task_done)
async def _wait_for_result(self) -> StopEvent:
result = await self._external_adapter.get_result()
self._result = result
return result
def _handle_result_task_done(self, task: asyncio.Task[StopEvent]) -> None:
if task.cancelled():
return
try:
exc = task.exception()
except asyncio.CancelledError:
return
if exc is None:
return
self._result_exception = exc
if isinstance(exc, WorkflowCancelledByUser) and self._result is None:
# Preserve cancellation in handler state without changing await semantics.
self._result = WorkflowCancelledEvent()
@property
def ctx(self) -> Context:
"""The workflow [Context][workflows.context.context.Context] for this run."""
return self._ctx
def get_stop_event(self) -> StopEvent | None:
"""The stop event for this run. Always defined once the future is done. In a future major release, this will be removed, and the result will be the stop event itself."""
return self._result
async def stop_event_result(self) -> StopEvent:
"""Get the stop event for this run. Always defined once the future is done. In a future major release, this will be removed, and the result will be the stop event itself."""
return await self._result_task
def __str__(self) -> str:
return f"WorkflowHandler(workflow={self._workflow.workflow_name}, run_id={self.run_id}, result={self._result})"
def is_done(self) -> bool:
"""Return True when the workflow has completed."""
return self._result_task.done()
async def stream_events(
self, expose_internal: bool = False
) -> AsyncGenerator[Event, None]:
"""
Stream events from the workflow execution as they occur.
This method provides real-time access to events generated during workflow
execution, allowing for monitoring and processing of intermediate results.
Events are yielded in the order they are generated by the workflow.
The stream includes all events written to the context's streaming queue,
and terminates when a [StopEvent][workflows.events.StopEvent] is
encountered, indicating the workflow has completed.
Args:
expose_internal (bool): Whether to expose internal events.
Returns:
AsyncGenerator[Event, None]: An async generator that yields Event objects
as they are produced by the workflow.
Raises:
ValueError: If the context is not set on the handler.
WorkflowRuntimeError: If all events have already been consumed by a
previous call to `stream_events()` on the same handler instance.
Examples:
```python
handler = workflow.run()
# Stream and process events in real-time
async for event in handler.stream_events():
if isinstance(event, StopEvent):
print(f"Workflow completed with result: {event.result}")
else:
print(f"Received event: {event}")
# Get final result
result = await handler
```
Note:
Events can only be streamed once per handler instance. Subsequent
calls to `stream_events()` will raise a WorkflowRuntimeError.
"""
# Check if we already consumed all the streamed events
if self._all_events_consumed:
msg = "All the streamed events have already been consumed."
raise WorkflowRuntimeError(msg)
async for ev in self.ctx.stream_events():
if isinstance(ev, InternalDispatchEvent) and not expose_internal:
continue
yield ev
if isinstance(ev, StopEvent):
self._all_events_consumed = True
break
def done(self) -> bool:
"""Return True when the workflow has completed."""
_warn_done_deprecated()
return self._result_task.done()
def cancel(self) -> None:
"""Cancel the running workflow."""
_warn_cancel_deprecated()
shim = as_v2_runtime_compatibility_shim(self._external_adapter)
if shim is None:
raise NotImplementedError(
"Hard cancel is not supported by this runtime. "
"Use await handler.cancel_run() for graceful cancellation."
)
shim.abort()
self._result_task.cancel()
def exception(self) -> BaseException | None:
"""Get the exception for this run. Always defined once the future is done."""
_warn_exception_deprecated()
try:
return self._result_task.exception()
except asyncio.CancelledError:
return None
def cancelled(self) -> bool:
"""Return True when the underlying workflow has been cancelled."""
_warn_cancelled_deprecated()
if self._result_task.cancelled():
return True
exc = self.exception()
if exc is not None and isinstance(exc, WorkflowCancelledByUser):
return True
stop_event = self.get_stop_event()
if stop_event is not None and isinstance(stop_event, WorkflowCancelledEvent):
return True
return False
async def cancel_run(self, *, timeout: float = 5.0) -> None:
"""Cancel the running workflow.
Signals the underlying context to raise
[WorkflowCancelledByUser][workflows.errors.WorkflowCancelledByUser],
which will be caught by the workflow and gracefully end the run.
Examples:
```python
handler = workflow.run()
await handler.cancel_run()
```
"""
try:
await self._external_adapter.cancel()
except Exception:
pass
try:
await asyncio.wait_for(self._result_task, timeout=timeout)
except asyncio.TimeoutError:
pass
except asyncio.CancelledError:
pass
except Exception:
pass
async def send_event(self, event: Event, step: str | None = None) -> None:
"""Send an event into the workflow.
Args:
event: The event to send into the workflow.
step: Optional step name to target. If None, broadcasts to all.
"""
self.ctx.send_event(event, step)
@functools.lru_cache(maxsize=1)
def _warn_done_deprecated() -> None:
warnings.warn(
"WorkflowHandler.done() is deprecated and will be removed in a future release",
DeprecationWarning,
stacklevel=2,
)
@functools.lru_cache(maxsize=1)
def _warn_cancel_deprecated() -> None:
warnings.warn(
"WorkflowHandler.cancel() is deprecated and will be removed in a future release. Prefer to cancel the underlying workflow with await handler.cancel_run(), and then awaiting the result with await handler to obtain the cancellation exception.",
DeprecationWarning,
stacklevel=2,
)
@functools.lru_cache(maxsize=1)
def _warn_exception_deprecated() -> None:
warnings.warn(
"WorkflowHandler.exception() is deprecated and will be removed in a future release",
DeprecationWarning,
stacklevel=2,
)
@functools.lru_cache(maxsize=1)
def _warn_cancelled_deprecated() -> None:
warnings.warn(
"WorkflowHandler.cancelled() is deprecated and will be removed in a future release",
DeprecationWarning,
stacklevel=2,
)