Skip to content

Commit c8321d6

Browse files
committed
Refactor/cleanup workers
1 parent 0ac2053 commit c8321d6

File tree

2 files changed

+82
-14
lines changed

2 files changed

+82
-14
lines changed

temporalio/worker/_activity.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,12 +278,63 @@ async def _run_activity(
278278
# We choose to surround interceptor creation and activity invocation in
279279
# a try block so we can mark the workflow as failed on any error instead
280280
# of having error handling in the interceptor
281+
282+
# message ActivityTaskCompletion {
283+
# bytes task_token = 1;
284+
# activity_result.ActivityExecutionResult result = 2;
285+
# }
286+
# message ActivityExecutionResult {
287+
# oneof status {
288+
# Success completed = 1;
289+
# Failure failed = 2;
290+
# Cancellation cancelled = 3;
291+
# WillCompleteAsync will_complete_async = 4;
292+
# }
293+
# }
294+
281295
completion = temporalio.bridge.proto.ActivityTaskCompletion(
282296
task_token=task_token
283297
)
284298
try:
285299
await self._execute_activity(start, running_activity, completion)
286300
except BaseException as err:
301+
# temporal/api/failure/v1/
302+
# message Failure {
303+
# string message = 1;
304+
# // The source this Failure originated in, e.g. TypeScriptSDK / JavaSDK
305+
# // In some SDKs this is used to rehydrate the stack trace into an exception object.
306+
# string source = 2;
307+
# string stack_trace = 3;
308+
# // Alternative way to supply `message` and `stack_trace` and possibly other attributes, used for encryption of
309+
# // errors originating in user code which might contain sensitive information.
310+
# // The `encoded_attributes` Payload could represent any serializable object, e.g. JSON object or a `Failure` proto
311+
# // message.
312+
# //
313+
# // SDK authors:
314+
# // - The SDK should provide a default `encodeFailureAttributes` and `decodeFailureAttributes` implementation that:
315+
# // - Uses a JSON object to represent `{ message, stack_trace }`.
316+
# // - Overwrites the original message with "Encoded failure" to indicate that more information could be extracted.
317+
# // - Overwrites the original stack_trace with an empty string.
318+
# // - The resulting JSON object is converted to Payload using the default PayloadConverter and should be processed
319+
# // by the user-provided PayloadCodec
320+
# //
321+
# // - If there's demand, we could allow overriding the default SDK implementation to encode other opaque Failure attributes.
322+
# // (-- api-linter: core::0203::optional=disabled --)
323+
# temporal.api.common.v1.Payload encoded_attributes = 20;
324+
# Failure cause = 4;
325+
# oneof failure_info {
326+
# ApplicationFailureInfo application_failure_info = 5;
327+
# TimeoutFailureInfo timeout_failure_info = 6;
328+
# CanceledFailureInfo canceled_failure_info = 7;
329+
# TerminatedFailureInfo terminated_failure_info = 8;
330+
# ServerFailureInfo server_failure_info = 9;
331+
# ResetWorkflowFailureInfo reset_workflow_failure_info = 10;
332+
# ActivityFailureInfo activity_failure_info = 11;
333+
# ChildWorkflowExecutionFailureInfo child_workflow_execution_failure_info = 12;
334+
# NexusOperationFailureInfo nexus_operation_execution_failure_info = 13;
335+
# NexusHandlerFailureInfo nexus_handler_failure_info = 14;
336+
# }
337+
# }
287338
try:
288339
if isinstance(err, temporalio.activity._CompleteAsyncError):
289340
temporalio.activity.logger.debug("Completing asynchronously")

temporalio/worker/_nexus.py

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,37 @@ async def _run_nexus_operation(
221221
input: Any,
222222
options: nexusrpc.handler.StartOperationOptions,
223223
) -> None:
224+
# message NexusTaskCompletion {
225+
# bytes task_token = 1;
226+
# oneof status {
227+
# temporal.api.nexus.v1.Response completed = 2;
228+
# temporal.api.nexus.v1.HandlerError error = 3;
229+
# bool ack_cancel = 4;
230+
# }
231+
# }
232+
233+
# message HandlerError {
234+
# // See https://github.com/nexus-rpc/api/blob/main/SPEC.md#predefined-handler-errors.
235+
# string error_type = 1;
236+
# Failure failure = 2;
237+
# // Retry behavior, defaults to the retry behavior of the error type as defined in the spec.
238+
# temporal.api.enums.v1.NexusHandlerErrorRetryBehavior retry_behavior = 3;
239+
# }
240+
241+
# message Failure {
242+
# string message = 1;
243+
# map<string, string> metadata = 2;
244+
# // UTF-8 encoded JSON serializable details.
245+
# bytes details = 3;
246+
# }
247+
248+
completion = temporalio.bridge.proto.nexus.NexusTaskCompletion(
249+
task_token=task_token,
250+
)
224251
try:
225252
result = await start(input, options)
226253
except BaseException:
254+
# await self._data_converter.encode_failure(err, completion.error.failure)
227255
# TODO(dan): mirror appropriate aspects of _run_activity error handling
228256
raise NotImplementedError(
229257
"TODO: Nexus operation error handling not implemented"
@@ -252,21 +280,10 @@ async def _run_nexus_operation(
252280
payload=payload
253281
)
254282
)
255-
# message NexusTaskCompletion {
256-
# bytes task_token = 1;
257-
# oneof status {
258-
# temporal.api.nexus.v1.Response completed = 2;
259-
# temporal.api.nexus.v1.HandlerError error = 3;
260-
# bool ack_cancel = 4;
261-
# }
262-
# }
263-
264-
await self._bridge_worker().complete_nexus_task(
265-
temporalio.bridge.proto.nexus.NexusTaskCompletion(
266-
task_token=task_token,
267-
completed=temporalio.api.nexus.v1.Response(start_operation=op_resp),
268-
)
283+
completion.completed.CopyFrom(
284+
temporalio.api.nexus.v1.Response(start_operation=op_resp)
269285
)
286+
await self._bridge_worker().complete_nexus_task(completion)
270287
del self._running_operations[task_token]
271288
except Exception:
272289
temporalio.nexus.logger.exception(

0 commit comments

Comments
 (0)