@@ -56,13 +56,15 @@ class OpDefinitionType(IntEnum):
56
56
class SyncResponse :
57
57
op_definition_type : OpDefinitionType
58
58
use_async_def : bool
59
+ exception_in_operation_start : bool
59
60
60
61
61
62
@dataclass
62
63
class AsyncResponse :
63
64
operation_workflow_id : str
64
65
block_forever_waiting_for_cancellation : bool
65
66
op_definition_type : OpDefinitionType
67
+ exception_in_operation_start : bool
66
68
67
69
68
70
# The order of the two types in this union is critical since the data converter matches
@@ -101,6 +103,10 @@ class ServiceInterface:
101
103
#
102
104
103
105
106
+ class CustomError (Exception ):
107
+ pass
108
+
109
+
104
110
@dataclass
105
111
class HandlerWfInput :
106
112
op_input : OpInput
@@ -136,6 +142,8 @@ async def start(
136
142
OpOutput ,
137
143
temporalio .nexus .handler .StartWorkflowOperationResult [HandlerWfOutput ],
138
144
]:
145
+ if input .response_type .exception_in_operation_start :
146
+ raise CustomError ("Error in Nexus operation start method" )
139
147
if isinstance (input .response_type , SyncResponse ):
140
148
return OpOutput (
141
149
value = "sync response" ,
@@ -178,6 +186,8 @@ async def sync_operation(
178
186
self , input : OpInput , options : nexusrpc .handler .StartOperationOptions
179
187
) -> OpOutput :
180
188
assert isinstance (input .response_type , SyncResponse )
189
+ if input .response_type .exception_in_operation_start :
190
+ raise CustomError ("Error in Nexus operation start method" )
181
191
return OpOutput (
182
192
value = "sync response" ,
183
193
start_options_received_by_handler = options ,
@@ -188,6 +198,8 @@ def non_async_sync_operation(
188
198
self , input : OpInput , options : nexusrpc .handler .StartOperationOptions
189
199
) -> OpOutput :
190
200
assert isinstance (input .response_type , SyncResponse )
201
+ if input .response_type .exception_in_operation_start :
202
+ raise CustomError ("Error in Nexus operation start method" )
191
203
return OpOutput (
192
204
value = "sync response" ,
193
205
start_options_received_by_handler = options ,
@@ -198,6 +210,8 @@ async def async_operation(
198
210
self , input : OpInput , options : nexusrpc .handler .StartOperationOptions
199
211
) -> temporalio .nexus .handler .StartWorkflowOperationResult [HandlerWfOutput ]:
200
212
assert isinstance (input .response_type , AsyncResponse )
213
+ if input .response_type .exception_in_operation_start :
214
+ raise CustomError ("Error in Nexus operation start method" )
201
215
return await temporalio .nexus .handler .start_workflow (
202
216
HandlerWorkflow .run ,
203
217
args = [HandlerWfInput (op_input = input ), options ],
@@ -424,6 +438,8 @@ async def run(
424
438
# TODO(dan): nexus endpoint pytest fixture?
425
439
# TODO(dan): get rid of UnsandboxedWorkflowRunner (due to xray)
426
440
# TODO(dan): test headers
441
+ # TODO(dan): enable True for exception_in_operation_start
442
+ @pytest .mark .parametrize ("exception_in_operation_start" , [False ])
427
443
@pytest .mark .parametrize ("request_cancel" , [False , True ])
428
444
@pytest .mark .parametrize (
429
445
"op_definition_type" , [OpDefinitionType .SHORTHAND , OpDefinitionType .LONGHAND ]
@@ -434,6 +450,7 @@ async def run(
434
450
)
435
451
async def test_sync_response (
436
452
client : Client ,
453
+ exception_in_operation_start : bool ,
437
454
request_cancel : bool ,
438
455
op_definition_type : OpDefinitionType ,
439
456
caller_reference : CallerReference ,
@@ -453,7 +470,11 @@ async def test_sync_response(
453
470
args = [
454
471
CallerWfInput (
455
472
op_input = OpInput (
456
- response_type = SyncResponse (op_definition_type , True ),
473
+ response_type = SyncResponse (
474
+ op_definition_type = op_definition_type ,
475
+ use_async_def = True ,
476
+ exception_in_operation_start = exception_in_operation_start ,
477
+ ),
457
478
start_options = nexusrpc .handler .StartOperationOptions (
458
479
headers = {"header-key" : "header-value" },
459
480
),
@@ -469,11 +490,30 @@ async def test_sync_response(
469
490
470
491
# The operation result is returned even when request_cancel=True, because the
471
492
# response was synchronous and it could not be cancelled. See explanation below.
472
- result = await caller_wf_handle .result ()
473
- assert result .op_output .value == "sync response"
474
- assert result .op_output .start_options_received_by_handler
493
+ if exception_in_operation_start :
494
+ with pytest .raises (WorkflowFailureError ) as ei :
495
+ await caller_wf_handle .result ()
496
+ e = ei .value
497
+ assert isinstance (e , WorkflowFailureError )
498
+ assert isinstance (e .__cause__ , NexusOperationError )
499
+ assert isinstance (e .__cause__ .__cause__ , CustomError )
500
+ # ID of first command after update accepted
501
+ assert e .__cause__ .scheduled_event_id == 6
502
+ assert e .__cause__ .endpoint == make_nexus_endpoint_name (task_queue )
503
+ assert e .__cause__ .service == "ServiceInterface"
504
+ assert (
505
+ e .__cause__ .operation == "sync_operation"
506
+ if op_definition_type == OpDefinitionType .SHORTHAND
507
+ else "sync_or_async_operation"
508
+ )
509
+ else :
510
+ result = await caller_wf_handle .result ()
511
+ assert result .op_output .value == "sync response"
512
+ assert result .op_output .start_options_received_by_handler
475
513
476
514
515
+ # TODO(dan): enable True for exception_in_operation_start
516
+ @pytest .mark .parametrize ("exception_in_operation_start" , [False ])
477
517
@pytest .mark .parametrize ("request_cancel" , [False , True ])
478
518
@pytest .mark .parametrize (
479
519
"op_definition_type" , [OpDefinitionType .SHORTHAND , OpDefinitionType .LONGHAND ]
@@ -484,6 +524,7 @@ async def test_sync_response(
484
524
)
485
525
async def test_async_response (
486
526
client : Client ,
527
+ exception_in_operation_start : bool ,
487
528
request_cancel : bool ,
488
529
op_definition_type : OpDefinitionType ,
489
530
caller_reference : CallerReference ,
@@ -499,7 +540,12 @@ async def test_async_response(
499
540
workflow_failure_exception_types = [Exception ],
500
541
):
501
542
caller_wf_handle , handler_wf_handle = await _start_wf_and_nexus_op (
502
- client , task_queue , request_cancel , op_definition_type , caller_reference
543
+ client ,
544
+ task_queue ,
545
+ exception_in_operation_start ,
546
+ request_cancel ,
547
+ op_definition_type ,
548
+ caller_reference ,
503
549
)
504
550
# TODO(dan): race here? How do we know it hasn't been canceled already?
505
551
handler_wf_info = await handler_wf_handle .describe ()
@@ -514,7 +560,23 @@ async def test_async_response(
514
560
caller_wf_handle , handler_wf_handle
515
561
)
516
562
517
- if request_cancel :
563
+ if exception_in_operation_start :
564
+ with pytest .raises (WorkflowFailureError ) as ei :
565
+ await caller_wf_handle .result ()
566
+ e = ei .value
567
+ assert isinstance (e , WorkflowFailureError )
568
+ assert isinstance (e .__cause__ , NexusOperationError )
569
+ assert isinstance (e .__cause__ .__cause__ , CustomError )
570
+ # ID of first command after update accepted
571
+ assert e .__cause__ .scheduled_event_id == 6
572
+ assert e .__cause__ .endpoint == make_nexus_endpoint_name (task_queue )
573
+ assert e .__cause__ .service == "ServiceInterface"
574
+ assert (
575
+ e .__cause__ .operation == "async_operation"
576
+ if op_definition_type == OpDefinitionType .SHORTHAND
577
+ else "sync_or_async_operation"
578
+ )
579
+ elif request_cancel :
518
580
# The operation response was asynchronous and so request_cancel is honored. See
519
581
# explanation below.
520
582
with pytest .raises (WorkflowFailureError ) as ei :
@@ -549,6 +611,7 @@ async def test_async_response(
549
611
async def _start_wf_and_nexus_op (
550
612
client : Client ,
551
613
task_queue : str ,
614
+ exception_in_operation_start : bool ,
552
615
request_cancel : bool ,
553
616
op_definition_type : OpDefinitionType ,
554
617
caller_reference : CallerReference ,
@@ -574,6 +637,7 @@ async def _start_wf_and_nexus_op(
574
637
operation_workflow_id ,
575
638
block_forever_waiting_for_cancellation ,
576
639
op_definition_type ,
640
+ exception_in_operation_start = exception_in_operation_start ,
577
641
),
578
642
start_options = nexusrpc .handler .StartOperationOptions (
579
643
headers = {"header-key" : "header-value" },
@@ -605,6 +669,8 @@ async def _start_wf_and_nexus_op(
605
669
return caller_wf_handle , handler_wf_handle
606
670
607
671
672
+ # TODO(dan): enable True for exception_in_operation_start
673
+ @pytest .mark .parametrize ("exception_in_operation_start" , [False ])
608
674
@pytest .mark .parametrize (
609
675
"op_definition_type" , [OpDefinitionType .SHORTHAND , OpDefinitionType .LONGHAND ]
610
676
)
@@ -615,6 +681,7 @@ async def _start_wf_and_nexus_op(
615
681
@pytest .mark .parametrize ("response_type" , [SyncResponse , AsyncResponse ])
616
682
async def test_untyped_caller (
617
683
client : Client ,
684
+ exception_in_operation_start : bool ,
618
685
op_definition_type : OpDefinitionType ,
619
686
caller_reference : CallerReference ,
620
687
response_type : ResponseType ,
@@ -629,12 +696,17 @@ async def test_untyped_caller(
629
696
workflow_failure_exception_types = [Exception ],
630
697
):
631
698
if response_type == SyncResponse :
632
- response_type = SyncResponse (op_definition_type , True )
699
+ response_type = SyncResponse (
700
+ op_definition_type = op_definition_type ,
701
+ use_async_def = True ,
702
+ exception_in_operation_start = exception_in_operation_start ,
703
+ )
633
704
else :
634
705
response_type = AsyncResponse (
635
- str (uuid .uuid4 ()),
636
- False ,
637
- op_definition_type ,
706
+ operation_workflow_id = str (uuid .uuid4 ()),
707
+ block_forever_waiting_for_cancellation = False ,
708
+ op_definition_type = op_definition_type ,
709
+ exception_in_operation_start = exception_in_operation_start ,
638
710
)
639
711
await create_nexus_endpoint (task_queue , client )
640
712
caller_wf_handle = await client .start_workflow (
0 commit comments