Skip to content

Commit ce8c078

Browse files
committed
Introduce NexusOperationError to wrap nexus op errors
1 parent 18803ad commit ce8c078

File tree

4 files changed

+244
-16
lines changed

4 files changed

+244
-16
lines changed

temporalio/converter.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -901,13 +901,56 @@ def _error_to_failure(
901901
failure.child_workflow_execution_failure_info.retry_state = (
902902
temporalio.api.enums.v1.RetryState.ValueType(error.retry_state or 0)
903903
)
904+
elif isinstance(error, temporalio.exceptions.NexusOperationError):
905+
failure.nexus_operation_execution_failure_info.SetInParent()
906+
failure.nexus_operation_execution_failure_info.operation_token = (
907+
error.operation_token
908+
)
904909

905910
def from_failure(
906911
self,
907912
failure: temporalio.api.failure.v1.Failure,
908913
payload_converter: PayloadConverter,
909914
) -> BaseException:
910915
"""See base class."""
916+
917+
# message Failure {
918+
# string message = 1;
919+
# // The source this Failure originated in, e.g. TypeScriptSDK / JavaSDK
920+
# // In some SDKs this is used to rehydrate the stack trace into an exception object.
921+
# string source = 2;
922+
# string stack_trace = 3;
923+
# // Alternative way to supply `message` and `stack_trace` and possibly other attributes, used for encryption of
924+
# // errors originating in user code which might contain sensitive information.
925+
# // The `encoded_attributes` Payload could represent any serializable object, e.g. JSON object or a `Failure` proto
926+
# // message.
927+
# //
928+
# // SDK authors:
929+
# // - The SDK should provide a default `encodeFailureAttributes` and `decodeFailureAttributes` implementation that:
930+
# // - Uses a JSON object to represent `{ message, stack_trace }`.
931+
# // - Overwrites the original message with "Encoded failure" to indicate that more information could be extracted.
932+
# // - Overwrites the original stack_trace with an empty string.
933+
# // - The resulting JSON object is converted to Payload using the default PayloadConverter and should be processed
934+
# // by the user-provided PayloadCodec
935+
# //
936+
# // - If there's demand, we could allow overriding the default SDK implementation to encode other opaque Failure attributes.
937+
# // (-- api-linter: core::0203::optional=disabled --)
938+
# temporal.api.common.v1.Payload encoded_attributes = 20;
939+
# Failure cause = 4;
940+
# oneof failure_info {
941+
# ApplicationFailureInfo application_failure_info = 5;
942+
# TimeoutFailureInfo timeout_failure_info = 6;
943+
# CanceledFailureInfo canceled_failure_info = 7;
944+
# TerminatedFailureInfo terminated_failure_info = 8;
945+
# ServerFailureInfo server_failure_info = 9;
946+
# ResetWorkflowFailureInfo reset_workflow_failure_info = 10;
947+
# ActivityFailureInfo activity_failure_info = 11;
948+
# ChildWorkflowExecutionFailureInfo child_workflow_execution_failure_info = 12;
949+
# NexusOperationFailureInfo nexus_operation_execution_failure_info = 13;
950+
# NexusHandlerFailureInfo nexus_handler_failure_info = 14;
951+
# }
952+
# }
953+
911954
# If encoded attributes are present and have the fields we expect,
912955
# extract them
913956
if failure.HasField("encoded_attributes"):
@@ -978,6 +1021,15 @@ def from_failure(
9781021
else None,
9791022
)
9801023
elif failure.HasField("child_workflow_execution_failure_info"):
1024+
# message ChildWorkflowExecutionFailureInfo {
1025+
# string namespace = 1;
1026+
# temporal.api.common.v1.WorkflowExecution workflow_execution = 2;
1027+
# temporal.api.common.v1.WorkflowType workflow_type = 3;
1028+
# int64 initiated_event_id = 4;
1029+
# int64 started_event_id = 5;
1030+
# temporal.api.enums.v1.RetryState retry_state = 6;
1031+
# }
1032+
9811033
child_info = failure.child_workflow_execution_failure_info
9821034
err = temporalio.exceptions.ChildWorkflowError(
9831035
failure.message or "Child workflow error",
@@ -993,6 +1045,45 @@ def from_failure(
9931045
if child_info.retry_state
9941046
else None,
9951047
)
1048+
elif failure.HasField("nexus_handler_failure_info"):
1049+
# message NexusHandlerFailureInfo {
1050+
# // The Nexus error type as defined in the spec:
1051+
# // https://github.com/nexus-rpc/api/blob/main/SPEC.md#predefined-handler-errors.
1052+
# string type = 1;
1053+
# // Retry behavior, defaults to the retry behavior of the error type as defined in the spec.
1054+
# temporal.api.enums.v1.NexusHandlerErrorRetryBehavior retry_behavior = 2;
1055+
# }
1056+
# TODO(dan): core never sends this currently?
1057+
raise NotImplementedError(
1058+
"TODO: Nexus handler failure info not implemented"
1059+
)
1060+
elif failure.HasField("nexus_operation_execution_failure_info"):
1061+
# message NexusOperationFailureInfo {
1062+
# // The NexusOperationScheduled event ID.
1063+
# int64 scheduled_event_id = 1;
1064+
# // Endpoint name.
1065+
# string endpoint = 2;
1066+
# // Service name.
1067+
# string service = 3;
1068+
# // Operation name.
1069+
# string operation = 4;
1070+
# // Operation ID - may be empty if the operation completed synchronously.
1071+
# //
1072+
# // Deprecated: Renamed to operation_token.
1073+
# string operation_id = 5;
1074+
# // Operation token - may be empty if the operation completed synchronously.
1075+
# string operation_token = 6;
1076+
# }
1077+
# TODO(dan)
1078+
nexus_op_failure_info = failure.nexus_operation_execution_failure_info
1079+
err = temporalio.exceptions.NexusOperationError(
1080+
failure.message or "Nexus operation error",
1081+
scheduled_event_id=nexus_op_failure_info.scheduled_event_id,
1082+
endpoint=nexus_op_failure_info.endpoint,
1083+
service=nexus_op_failure_info.service,
1084+
operation=nexus_op_failure_info.operation,
1085+
operation_token=nexus_op_failure_info.operation_token,
1086+
)
9961087
else:
9971088
err = temporalio.exceptions.FailureError(failure.message or "Failure error")
9981089
err._failure = failure

temporalio/exceptions.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,15 @@ def retry_state(self) -> Optional[RetryState]:
284284
class ChildWorkflowError(FailureError):
285285
"""Error raised on child workflow failure."""
286286

287+
# message ChildWorkflowExecutionFailureInfo {
288+
# string namespace = 1;
289+
# temporal.api.common.v1.WorkflowExecution workflow_execution = 2;
290+
# temporal.api.common.v1.WorkflowType workflow_type = 3;
291+
# int64 initiated_event_id = 4;
292+
# int64 started_event_id = 5;
293+
# temporal.api.enums.v1.RetryState retry_state = 6;
294+
# }
295+
287296
def __init__(
288297
self,
289298
message: str,
@@ -342,6 +351,70 @@ def retry_state(self) -> Optional[RetryState]:
342351
return self._retry_state
343352

344353

354+
class NexusOperationError(FailureError):
355+
"""Error raised on Nexus operation failure."""
356+
357+
# message NexusOperationFailureInfo {
358+
# // The NexusOperationScheduled event ID.
359+
# int64 scheduled_event_id = 1;
360+
# // Endpoint name.
361+
# string endpoint = 2;
362+
# // Service name.
363+
# string service = 3;
364+
# // Operation name.
365+
# string operation = 4;
366+
# // Operation ID - may be empty if the operation completed synchronously.
367+
# //
368+
# // Deprecated: Renamed to operation_token.
369+
# string operation_id = 5;
370+
# // Operation token - may be empty if the operation completed synchronously.
371+
# string operation_token = 6;
372+
# }
373+
374+
def __init__(
375+
self,
376+
message: str,
377+
*,
378+
scheduled_event_id: int,
379+
endpoint: str,
380+
service: str,
381+
operation: str,
382+
operation_token: str,
383+
):
384+
"""Initialize a Nexus operation error."""
385+
super().__init__(message)
386+
self._scheduled_event_id = scheduled_event_id
387+
self._endpoint = endpoint
388+
self._service = service
389+
self._operation = operation
390+
self._operation_token = operation_token
391+
392+
@property
393+
def scheduled_event_id(self) -> int:
394+
"""The NexusOperationScheduled event ID for the failed operation."""
395+
return self._scheduled_event_id
396+
397+
@property
398+
def endpoint(self) -> str:
399+
"""The endpoint name for the failed operation."""
400+
return self._endpoint
401+
402+
@property
403+
def service(self) -> str:
404+
"""The service name for the failed operation."""
405+
return self._service
406+
407+
@property
408+
def operation(self) -> str:
409+
"""The name of the failed operation."""
410+
return self._operation
411+
412+
@property
413+
def operation_token(self) -> str:
414+
"""The operation token returned by the failed operation."""
415+
return self._operation_token
416+
417+
345418
def is_cancelled_exception(exception: BaseException) -> bool:
346419
"""Check whether the given exception is considered a cancellation exception
347420
according to Temporal.

temporalio/worker/_workflow_instance.py

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,22 @@ def _apply_resolve_child_workflow_execution(
796796
self,
797797
job: temporalio.bridge.proto.workflow_activation.ResolveChildWorkflowExecution,
798798
) -> None:
799+
# message ResolveChildWorkflowExecution {
800+
# // Sequence number as provided by lang in the corresponding StartChildWorkflowExecution command
801+
# uint32 seq = 1;
802+
# child_workflow.ChildWorkflowResult result = 2;
803+
# }
799804
# No matter the result, we know we want to pop
805+
806+
# // Used by core to resolve child workflow executions.
807+
# message ChildWorkflowResult {
808+
# oneof status {
809+
# Success completed = 1;
810+
# Failure failed = 2;
811+
# Cancellation cancelled = 3;
812+
# }
813+
# }
814+
800815
handle = self._pending_child_workflows.pop(job.seq, None)
801816
if not handle:
802817
raise RuntimeError(
@@ -813,6 +828,11 @@ def _apply_resolve_child_workflow_execution(
813828
ret = ret_vals[0]
814829
handle._resolve_success(ret)
815830
elif job.result.HasField("failed"):
831+
# // Used in ChildWorkflowResult to report non successful outcomes such as
832+
# // application failures, timeouts, terminations, and cancellations.
833+
# message Failure {
834+
# temporal.api.failure.v1.Failure failure = 1;
835+
# }
816836
handle._resolve_failure(
817837
self._failure_converter.from_failure(
818838
job.result.failed.failure, self._payload_converter
@@ -926,20 +946,18 @@ def _apply_resolve_nexus_operation(
926946
self,
927947
job: temporalio.bridge.proto.workflow_activation.ResolveNexusOperation,
928948
) -> None:
949+
# message ResolveNexusOperation {
950+
# // Sequence number as provided by lang in the corresponding ScheduleNexusOperation command
951+
# uint32 seq = 1;
952+
# nexus.NexusOperationResult result = 2;
953+
# }
929954
handle = self._pending_nexus_operations.get(job.seq)
930955
if not handle:
931956
raise RuntimeError(
932957
f"Failed to find nexus operation handle for job sequence number {job.seq}"
933958
)
934959

935960
result = job.result
936-
# Handle the four oneof variants of NexusOperationResult
937-
938-
# message ResolveNexusOperation {
939-
# // Sequence number as provided by lang in the corresponding ScheduleNexusOperation command
940-
# uint32 seq = 1;
941-
# nexus.NexusOperationResult result = 2;
942-
# }
943961
# message NexusOperationResult {
944962
# oneof status {
945963
# temporal.api.common.v1.Payload completed = 1;
@@ -948,8 +966,13 @@ def _apply_resolve_nexus_operation(
948966
# temporal.api.failure.v1.Failure timed_out = 4;
949967
# }
950968
# }
969+
# Handle the four oneof variants of NexusOperationResult
951970

952971
if result.HasField("completed"):
972+
# message Payload {
973+
# map<string,bytes> metadata = 1;
974+
# bytes data = 2;
975+
# }
953976
[output] = self._convert_payloads(
954977
[result.completed],
955978
[handle._input.output_type] if handle._input.output_type else None,

tests/worker/test_nexus.py

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,15 @@
1919
import temporalio.nexus
2020
import temporalio.nexus.handler
2121
from temporalio import workflow
22-
from temporalio.client import Client, WorkflowExecutionStatus, WorkflowHandle
22+
from temporalio.client import (
23+
Client,
24+
WithStartWorkflowOperation,
25+
WorkflowExecutionStatus,
26+
WorkflowFailureError,
27+
WorkflowHandle,
28+
)
29+
from temporalio.common import WorkflowIDConflictPolicy
30+
from temporalio.exceptions import CancelledError, NexusOperationError
2331
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
2432

2533

@@ -292,9 +300,9 @@ async def test_async_response(
292300
await create_nexus_endpoint(task_queue, client)
293301
operation_workflow_id = "default-workflow-id"
294302

295-
# Start the caller workflow.
303+
# Start the caller workflow and wait until it confirms the Nexus operation has started.
296304
block_forever_waiting_for_cancellation = request_cancel
297-
caller_wf_handle = await client.start_workflow(
305+
start_op = WithStartWorkflowOperation(
298306
MyCallerWorkflow.run,
299307
args=[
300308
MyInput(
@@ -312,12 +320,15 @@ async def test_async_response(
312320
],
313321
id=str(uuid.uuid4()),
314322
task_queue=task_queue,
323+
id_conflict_policy=WorkflowIDConflictPolicy.FAIL,
315324
)
316325

317-
# Wait until the caller wf knows that the Nexus operation has started.
318-
await caller_wf_handle.execute_update(
319-
MyCallerWorkflow.wait_nexus_operation_started
326+
await client.execute_update_with_start_workflow(
327+
MyCallerWorkflow.wait_nexus_operation_started,
328+
start_workflow_operation=start_op,
320329
)
330+
caller_wf_handle = await start_op.workflow_handle()
331+
321332
# check that the operation-backing workflow now exists, and that (a) the handler
322333
# workflow accepted the link to the calling Nexus event, and that (b) the caller
323334
# workflow NexusOperationStarted event received in return a link to the
@@ -339,11 +350,25 @@ async def test_async_response(
339350
if request_cancel:
340351
# The operation response was asynchronous and so request_cancel is honored. See
341352
# explanation below.
342-
# TODO: what error should be raised here?
343-
with pytest.raises(BaseException) as ei:
353+
with pytest.raises(WorkflowFailureError) as ei:
344354
await caller_wf_handle.result()
345355
e = ei.value
346-
print(f"🌈 workflow failed: {e.__class__.__name__}({e})")
356+
assert isinstance(e, WorkflowFailureError)
357+
assert isinstance(e.__cause__, NexusOperationError)
358+
assert isinstance(e.__cause__.__cause__, CancelledError)
359+
# ID of first command after update accepted
360+
assert e.__cause__.scheduled_event_id == 6
361+
assert e.__cause__.endpoint == make_nexus_endpoint_name(task_queue)
362+
assert e.__cause__.service == "MyService"
363+
assert (
364+
e.__cause__.operation == "my_async_operation"
365+
if use_shorthand_defined_operation
366+
else "my_sync_or_async_operation"
367+
)
368+
assert temporalio.nexus.handler.StartWorkflowOperationResult._decode_token(
369+
e.__cause__.operation_token
370+
) == (handler_wf_handle.id, handler_wf_info.run_id)
371+
# Check that the handler workflow was canceled
347372
handler_wf_info = await handler_wf_handle.describe()
348373
assert handler_wf_info.status == WorkflowExecutionStatus.CANCELED
349374
else:
@@ -427,6 +452,22 @@ async def assert_handler_workflow_has_link_to_caller_workflow(
427452

428453
# TODO(dan): test exceptions in Nexus worker are raised
429454

455+
456+
async def print_history(handle: WorkflowHandle):
457+
print("\n\n")
458+
history = await handle.fetch_history()
459+
for event in history.events:
460+
try:
461+
event_type_name = temporalio.api.enums.v1.EventType.Name(
462+
event.event_type
463+
).replace("EVENT_TYPE_", "")
464+
except ValueError:
465+
# Handle unknown event types
466+
event_type_name = f"Unknown({event.event_type})"
467+
print(f"{event.event_id}. {event_type_name}")
468+
print("\n\n")
469+
470+
430471
# When request_cancel is True, the NexusOperationHandle in the workflow evolves
431472
# through the following states:
432473
# start_fut result_fut handle_task w/ fut_waiter (task._must_cancel)

0 commit comments

Comments
 (0)