Skip to content

Commit 64584f4

Browse files
committed
Clean up existing code
1 parent e360398 commit 64584f4

File tree

3 files changed

+22
-21
lines changed

3 files changed

+22
-21
lines changed

temporalio/worker/_interceptor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -388,23 +388,23 @@ async def signal_external_workflow(
388388

389389
def start_activity(
390390
self, input: StartActivityInput
391-
) -> temporalio.workflow.ActivityHandle:
391+
) -> temporalio.workflow.ActivityHandle[Any]:
392392
"""Called for every :py:func:`temporalio.workflow.start_activity` and
393393
:py:func:`temporalio.workflow.execute_activity` call.
394394
"""
395395
return self.next.start_activity(input)
396396

397397
async def start_child_workflow(
398398
self, input: StartChildWorkflowInput
399-
) -> temporalio.workflow.ChildWorkflowHandle:
399+
) -> temporalio.workflow.ChildWorkflowHandle[Any, Any]:
400400
"""Called for every :py:func:`temporalio.workflow.start_child_workflow`
401401
and :py:func:`temporalio.workflow.execute_child_workflow` call.
402402
"""
403403
return await self.next.start_child_workflow(input)
404404

405405
def start_local_activity(
406406
self, input: StartLocalActivityInput
407-
) -> temporalio.workflow.ActivityHandle:
407+
) -> temporalio.workflow.ActivityHandle[Any]:
408408
"""Called for every :py:func:`temporalio.workflow.start_local_activity`
409409
and :py:func:`temporalio.workflow.execute_local_activity` call.
410410
"""

temporalio/worker/_workflow_instance.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1537,25 +1537,26 @@ def _outbound_schedule_activity(
15371537
self,
15381538
input: Union[StartActivityInput, StartLocalActivityInput],
15391539
) -> _ActivityHandle:
1540+
# A ScheduleActivityTask command always results in an ActivityTaskScheduled event,
1541+
# so this function returns the handle immediately. This is similar to nexus
1542+
# operation but differs from child workflow.
1543+
15401544
# Validate
15411545
if not input.start_to_close_timeout and not input.schedule_to_close_timeout:
15421546
raise ValueError(
15431547
"Activity must have start_to_close_timeout or schedule_to_close_timeout"
15441548
)
15451549

1546-
handle: Optional[_ActivityHandle] = None
1550+
handle: _ActivityHandle
15471551

15481552
# Function that runs in the handle
15491553
async def run_activity() -> Any:
1550-
nonlocal handle
1551-
assert handle
15521554
while True:
15531555
# Mark it as started each loop because backoff could cause it to
15541556
# be marked as unstarted
15551557
handle._started = True
15561558
try:
1557-
# We have to shield because we don't want the underlying
1558-
# result future to be cancelled
1559+
# Shield so that future itself is not cancelled
15591560
return await asyncio.shield(handle._result_fut)
15601561
except _ActivityDoBackoffError as err:
15611562
# We have to sleep then reschedule. Note this sleep can be
@@ -1615,12 +1616,16 @@ async def _outbound_signal_external_workflow(
16151616
async def _outbound_start_child_workflow(
16161617
self, input: StartChildWorkflowInput
16171618
) -> _ChildWorkflowHandle:
1618-
handle: Optional[_ChildWorkflowHandle] = None
1619+
# A StartChildWorkflowExecution command results in a
1620+
# StartChildWorkflowExecutionInitiated event, but the start may fail (e.g. due to
1621+
# workflow ID collision). Therefore this function does not return the handle until
1622+
# a future activation contains an event indicating start success / failure. This
1623+
# differs from activity and nexus operation.
1624+
1625+
handle: _ChildWorkflowHandle
16191626

16201627
# Common code for handling cancel for start and run
16211628
def apply_child_cancel_error() -> None:
1622-
nonlocal handle
1623-
assert handle
16241629
# Send a cancel request to the child
16251630
cancel_command = self._add_command()
16261631
handle._apply_cancel_command(cancel_command)
@@ -1638,12 +1643,9 @@ def apply_child_cancel_error() -> None:
16381643

16391644
# Function that runs in the handle
16401645
async def run_child() -> Any:
1641-
nonlocal handle
16421646
while True:
1643-
assert handle
16441647
try:
1645-
# We have to shield because we don't want the future itself
1646-
# to be cancelled
1648+
# Shield so that future itself is not cancelled
16471649
return await asyncio.shield(handle._result_fut)
16481650
except asyncio.CancelledError:
16491651
apply_child_cancel_error()
@@ -1658,8 +1660,7 @@ async def run_child() -> Any:
16581660
# Wait on start before returning
16591661
while True:
16601662
try:
1661-
# We have to shield because we don't want the future itself
1662-
# to be cancelled
1663+
# Shield so that future itself is not cancelled
16631664
await asyncio.shield(handle._start_fut)
16641665
return handle
16651666
except asyncio.CancelledError:
@@ -2391,17 +2392,17 @@ async def signal_external_workflow(
23912392

23922393
def start_activity(
23932394
self, input: StartActivityInput
2394-
) -> temporalio.workflow.ActivityHandle:
2395+
) -> temporalio.workflow.ActivityHandle[Any]:
23952396
return self._instance._outbound_schedule_activity(input)
23962397

23972398
async def start_child_workflow(
23982399
self, input: StartChildWorkflowInput
2399-
) -> temporalio.workflow.ChildWorkflowHandle:
2400+
) -> temporalio.workflow.ChildWorkflowHandle[Any, Any]:
24002401
return await self._instance._outbound_start_child_workflow(input)
24012402

24022403
def start_local_activity(
24032404
self, input: StartLocalActivityInput
2404-
) -> temporalio.workflow.ActivityHandle:
2405+
) -> temporalio.workflow.ActivityHandle[Any]:
24052406
return self._instance._outbound_schedule_activity(input)
24062407

24072408

tests/helpers/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import uuid
55
from contextlib import closing
66
from datetime import timedelta
7-
from typing import Any, Awaitable, Callable, Optional, Sequence, Type, TypeVar, Union
7+
from typing import Any, Awaitable, Callable, Optional, Sequence, Type, TypeVar
88

99
from temporalio.api.common.v1 import WorkflowExecution
1010
from temporalio.api.enums.v1 import IndexedValueType

0 commit comments

Comments
 (0)