Skip to content

Commit 1a8793f

Browse files
committed
Refactor activity worker
1 parent b8bcefa commit 1a8793f

File tree

1 file changed

+175
-177
lines changed

1 file changed

+175
-177
lines changed

temporalio/worker/_activity.py

Lines changed: 175 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -281,183 +281,7 @@ async def _run_activity(
281281
task_token=task_token
282282
)
283283
try:
284-
# Find activity or fail
285-
activity_def = self._activities.get(
286-
start.activity_type, self._dynamic_activity
287-
)
288-
if not activity_def:
289-
activity_names = ", ".join(sorted(self._activities.keys()))
290-
raise temporalio.exceptions.ApplicationError(
291-
f"Activity function {start.activity_type} for workflow {start.workflow_execution.workflow_id} "
292-
f"is not registered on this worker, available activities: {activity_names}",
293-
type="NotFoundError",
294-
)
295-
296-
# Create the worker shutdown event if not created
297-
if not self._worker_shutdown_event:
298-
self._worker_shutdown_event = temporalio.activity._CompositeEvent(
299-
thread_event=threading.Event(), async_event=asyncio.Event()
300-
)
301-
302-
# Setup events
303-
sync_non_threaded = False
304-
if not activity_def.is_async:
305-
running_activity.sync = True
306-
# If we're in a thread-pool executor we can use threading events
307-
# otherwise we must use manager events
308-
if isinstance(
309-
self._activity_executor, concurrent.futures.ThreadPoolExecutor
310-
):
311-
running_activity.cancelled_event = (
312-
temporalio.activity._CompositeEvent(
313-
thread_event=threading.Event(),
314-
# No async event
315-
async_event=None,
316-
)
317-
)
318-
if not activity_def.no_thread_cancel_exception:
319-
running_activity.cancel_thread_raiser = _ThreadExceptionRaiser()
320-
else:
321-
sync_non_threaded = True
322-
manager = self._shared_state_manager
323-
# Pre-checked on worker init
324-
assert manager
325-
running_activity.cancelled_event = (
326-
temporalio.activity._CompositeEvent(
327-
thread_event=manager.new_event(),
328-
# No async event
329-
async_event=None,
330-
)
331-
)
332-
# We also must set the worker shutdown thread event to a
333-
# manager event if this is the first sync event. We don't
334-
# want to create if there never is a sync event.
335-
if not self._seen_sync_activity:
336-
self._worker_shutdown_event.thread_event = manager.new_event()
337-
# Say we've seen a sync activity
338-
self._seen_sync_activity = True
339-
else:
340-
# We have to set the async form of events
341-
running_activity.cancelled_event = temporalio.activity._CompositeEvent(
342-
thread_event=threading.Event(),
343-
async_event=asyncio.Event(),
344-
)
345-
346-
# Convert arguments. We use raw value for dynamic. Otherwise, we
347-
# only use arg type hints if they match the input count.
348-
arg_types = activity_def.arg_types
349-
if not activity_def.name:
350-
# Dynamic is just the raw value for each input value
351-
arg_types = [temporalio.common.RawValue] * len(start.input)
352-
elif arg_types is not None and len(arg_types) != len(start.input):
353-
arg_types = None
354-
try:
355-
args = (
356-
[]
357-
if not start.input
358-
else await self._data_converter.decode(
359-
start.input, type_hints=arg_types
360-
)
361-
)
362-
except Exception as err:
363-
raise temporalio.exceptions.ApplicationError(
364-
"Failed decoding arguments"
365-
) from err
366-
# Put the args inside a list if dynamic
367-
if not activity_def.name:
368-
args = [args]
369-
370-
# Convert heartbeat details
371-
# TODO(cretz): Allow some way to configure heartbeat type hinting?
372-
try:
373-
heartbeat_details = (
374-
[]
375-
if not start.heartbeat_details
376-
else await self._data_converter.decode(start.heartbeat_details)
377-
)
378-
except Exception as err:
379-
raise temporalio.exceptions.ApplicationError(
380-
"Failed decoding heartbeat details", non_retryable=True
381-
) from err
382-
383-
# Build info
384-
info = temporalio.activity.Info(
385-
activity_id=start.activity_id,
386-
activity_type=start.activity_type,
387-
attempt=start.attempt,
388-
current_attempt_scheduled_time=_proto_to_datetime(
389-
start.current_attempt_scheduled_time
390-
),
391-
heartbeat_details=heartbeat_details,
392-
heartbeat_timeout=_proto_to_non_zero_timedelta(start.heartbeat_timeout)
393-
if start.HasField("heartbeat_timeout")
394-
else None,
395-
is_local=start.is_local,
396-
schedule_to_close_timeout=_proto_to_non_zero_timedelta(
397-
start.schedule_to_close_timeout
398-
)
399-
if start.HasField("schedule_to_close_timeout")
400-
else None,
401-
scheduled_time=_proto_to_datetime(start.scheduled_time),
402-
start_to_close_timeout=_proto_to_non_zero_timedelta(
403-
start.start_to_close_timeout
404-
)
405-
if start.HasField("start_to_close_timeout")
406-
else None,
407-
started_time=_proto_to_datetime(start.started_time),
408-
task_queue=self._task_queue,
409-
task_token=task_token,
410-
workflow_id=start.workflow_execution.workflow_id,
411-
workflow_namespace=start.workflow_namespace,
412-
workflow_run_id=start.workflow_execution.run_id,
413-
workflow_type=start.workflow_type,
414-
priority=temporalio.common.Priority._from_proto(start.priority),
415-
)
416-
running_activity.info = info
417-
input = ExecuteActivityInput(
418-
fn=activity_def.fn,
419-
args=args,
420-
executor=None if not running_activity.sync else self._activity_executor,
421-
headers=start.header_fields,
422-
)
423-
424-
# Set the context early so the logging adapter works and
425-
# interceptors have it
426-
temporalio.activity._Context.set(
427-
temporalio.activity._Context(
428-
info=lambda: info,
429-
heartbeat=None,
430-
cancelled_event=running_activity.cancelled_event,
431-
worker_shutdown_event=self._worker_shutdown_event,
432-
shield_thread_cancel_exception=None
433-
if not running_activity.cancel_thread_raiser
434-
else running_activity.cancel_thread_raiser.shielded,
435-
payload_converter_class_or_instance=self._data_converter.payload_converter,
436-
runtime_metric_meter=None
437-
if sync_non_threaded
438-
else self._metric_meter,
439-
)
440-
)
441-
temporalio.activity.logger.debug("Starting activity")
442-
443-
# Build the interceptors chaining in reverse. We build a context right
444-
# now even though the info() can't be intercepted and heartbeat() will
445-
# fail. The interceptors may want to use the info() during init.
446-
impl: ActivityInboundInterceptor = _ActivityInboundImpl(
447-
self, running_activity
448-
)
449-
for interceptor in reversed(list(self._interceptors)):
450-
impl = interceptor.intercept_activity(impl)
451-
# Init
452-
impl.init(_ActivityOutboundImpl(self, running_activity.info))
453-
# Exec
454-
result = await impl.execute_activity(input)
455-
# Convert result even if none. Since Python essentially only
456-
# supports single result types (even if they are tuples), we will do
457-
# the same.
458-
completion.result.completed.result.CopyFrom(
459-
(await self._data_converter.encode([result]))[0]
460-
)
284+
await self._execute_activity(start, running_activity, completion)
461285
except BaseException as err:
462286
try:
463287
if isinstance(err, temporalio.activity._CompleteAsyncError):
@@ -532,6 +356,180 @@ async def _run_activity(
532356
except Exception:
533357
temporalio.activity.logger.exception("Failed completing activity task")
534358

359+
async def _execute_activity(
360+
self,
361+
start: temporalio.bridge.proto.activity_task.Start,
362+
running_activity: _RunningActivity,
363+
completion: temporalio.bridge.proto.ActivityTaskCompletion,
364+
):
365+
# Find activity or fail
366+
activity_def = self._activities.get(start.activity_type, self._dynamic_activity)
367+
if not activity_def:
368+
activity_names = ", ".join(sorted(self._activities.keys()))
369+
raise temporalio.exceptions.ApplicationError(
370+
f"Activity function {start.activity_type} for workflow {start.workflow_execution.workflow_id} "
371+
f"is not registered on this worker, available activities: {activity_names}",
372+
type="NotFoundError",
373+
)
374+
375+
# Create the worker shutdown event if not created
376+
if not self._worker_shutdown_event:
377+
self._worker_shutdown_event = temporalio.activity._CompositeEvent(
378+
thread_event=threading.Event(), async_event=asyncio.Event()
379+
)
380+
381+
# Setup events
382+
sync_non_threaded = False
383+
if not activity_def.is_async:
384+
running_activity.sync = True
385+
# If we're in a thread-pool executor we can use threading events
386+
# otherwise we must use manager events
387+
if isinstance(
388+
self._activity_executor, concurrent.futures.ThreadPoolExecutor
389+
):
390+
running_activity.cancelled_event = temporalio.activity._CompositeEvent(
391+
thread_event=threading.Event(),
392+
# No async event
393+
async_event=None,
394+
)
395+
if not activity_def.no_thread_cancel_exception:
396+
running_activity.cancel_thread_raiser = _ThreadExceptionRaiser()
397+
else:
398+
sync_non_threaded = True
399+
manager = self._shared_state_manager
400+
# Pre-checked on worker init
401+
assert manager
402+
running_activity.cancelled_event = temporalio.activity._CompositeEvent(
403+
thread_event=manager.new_event(),
404+
# No async event
405+
async_event=None,
406+
)
407+
# We also must set the worker shutdown thread event to a
408+
# manager event if this is the first sync event. We don't
409+
# want to create if there never is a sync event.
410+
if not self._seen_sync_activity:
411+
self._worker_shutdown_event.thread_event = manager.new_event()
412+
# Say we've seen a sync activity
413+
self._seen_sync_activity = True
414+
else:
415+
# We have to set the async form of events
416+
running_activity.cancelled_event = temporalio.activity._CompositeEvent(
417+
thread_event=threading.Event(),
418+
async_event=asyncio.Event(),
419+
)
420+
421+
# Convert arguments. We use raw value for dynamic. Otherwise, we
422+
# only use arg type hints if they match the input count.
423+
arg_types = activity_def.arg_types
424+
if not activity_def.name:
425+
# Dynamic is just the raw value for each input value
426+
arg_types = [temporalio.common.RawValue] * len(start.input)
427+
elif arg_types is not None and len(arg_types) != len(start.input):
428+
arg_types = None
429+
try:
430+
args = (
431+
[]
432+
if not start.input
433+
else await self._data_converter.decode(
434+
start.input, type_hints=arg_types
435+
)
436+
)
437+
except Exception as err:
438+
raise temporalio.exceptions.ApplicationError(
439+
"Failed decoding arguments"
440+
) from err
441+
# Put the args inside a list if dynamic
442+
if not activity_def.name:
443+
args = [args]
444+
445+
# Convert heartbeat details
446+
# TODO(cretz): Allow some way to configure heartbeat type hinting?
447+
try:
448+
heartbeat_details = (
449+
[]
450+
if not start.heartbeat_details
451+
else await self._data_converter.decode(start.heartbeat_details)
452+
)
453+
except Exception as err:
454+
raise temporalio.exceptions.ApplicationError(
455+
"Failed decoding heartbeat details", non_retryable=True
456+
) from err
457+
458+
# Build info
459+
info = temporalio.activity.Info(
460+
activity_id=start.activity_id,
461+
activity_type=start.activity_type,
462+
attempt=start.attempt,
463+
current_attempt_scheduled_time=_proto_to_datetime(
464+
start.current_attempt_scheduled_time
465+
),
466+
heartbeat_details=heartbeat_details,
467+
heartbeat_timeout=_proto_to_non_zero_timedelta(start.heartbeat_timeout)
468+
if start.HasField("heartbeat_timeout")
469+
else None,
470+
is_local=start.is_local,
471+
schedule_to_close_timeout=_proto_to_non_zero_timedelta(
472+
start.schedule_to_close_timeout
473+
)
474+
if start.HasField("schedule_to_close_timeout")
475+
else None,
476+
scheduled_time=_proto_to_datetime(start.scheduled_time),
477+
start_to_close_timeout=_proto_to_non_zero_timedelta(
478+
start.start_to_close_timeout
479+
)
480+
if start.HasField("start_to_close_timeout")
481+
else None,
482+
started_time=_proto_to_datetime(start.started_time),
483+
task_queue=self._task_queue,
484+
task_token=completion.task_token,
485+
workflow_id=start.workflow_execution.workflow_id,
486+
workflow_namespace=start.workflow_namespace,
487+
workflow_run_id=start.workflow_execution.run_id,
488+
workflow_type=start.workflow_type,
489+
priority=temporalio.common.Priority._from_proto(start.priority),
490+
)
491+
running_activity.info = info
492+
input = ExecuteActivityInput(
493+
fn=activity_def.fn,
494+
args=args,
495+
executor=None if not running_activity.sync else self._activity_executor,
496+
headers=start.header_fields,
497+
)
498+
499+
# Set the context early so the logging adapter works and
500+
# interceptors have it
501+
temporalio.activity._Context.set(
502+
temporalio.activity._Context(
503+
info=lambda: info,
504+
heartbeat=None,
505+
cancelled_event=running_activity.cancelled_event,
506+
worker_shutdown_event=self._worker_shutdown_event,
507+
shield_thread_cancel_exception=None
508+
if not running_activity.cancel_thread_raiser
509+
else running_activity.cancel_thread_raiser.shielded,
510+
payload_converter_class_or_instance=self._data_converter.payload_converter,
511+
runtime_metric_meter=None if sync_non_threaded else self._metric_meter,
512+
)
513+
)
514+
temporalio.activity.logger.debug("Starting activity")
515+
516+
# Build the interceptors chaining in reverse. We build a context right
517+
# now even though the info() can't be intercepted and heartbeat() will
518+
# fail. The interceptors may want to use the info() during init.
519+
impl: ActivityInboundInterceptor = _ActivityInboundImpl(self, running_activity)
520+
for interceptor in reversed(list(self._interceptors)):
521+
impl = interceptor.intercept_activity(impl)
522+
# Init
523+
impl.init(_ActivityOutboundImpl(self, running_activity.info))
524+
# Exec
525+
result = await impl.execute_activity(input)
526+
# Convert result even if none. Since Python essentially only
527+
# supports single result types (even if they are tuples), we will do
528+
# the same.
529+
completion.result.completed.result.CopyFrom(
530+
(await self._data_converter.encode([result]))[0]
531+
)
532+
535533

536534
@dataclass
537535
class _RunningActivity:

0 commit comments

Comments
 (0)