diff --git a/temporalio/worker/_activity.py b/temporalio/worker/_activity.py index b05f3f6e..012fbc52 100644 --- a/temporalio/worker/_activity.py +++ b/temporalio/worker/_activity.py @@ -351,11 +351,15 @@ async def _run_activity( ): # Downgrade log level to DEBUG for BENIGN application errors. temporalio.activity.logger.debug( - "Completing activity as failed", exc_info=True + "Completing activity as failed", + exc_info=True, + extra={"__temporal_error_identifier": "ActivityFailure"}, ) else: temporalio.activity.logger.warning( - "Completing activity as failed", exc_info=True + "Completing activity as failed", + exc_info=True, + extra={"__temporal_error_identifier": "ActivityFailure"}, ) await self._data_converter.encode_failure( err, completion.result.failed.failure diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 3d66b811..db910d88 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -447,7 +447,10 @@ def activate( logger.warning( f"Failed activation on workflow {self._info.workflow_type} with ID {self._info.workflow_id} and run ID {self._info.run_id}", exc_info=activation_err, - extra={"temporal_workflow": self._info._logger_details()}, + extra={ + "temporal_workflow": self._info._logger_details(), + "__temporal_error_identifier": "WorkflowTaskFailure", + }, ) # Set completion failure self._current_completion.failed.failure.SetInParent() diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index 70ba8cfd..9d06da73 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -1379,3 +1379,40 @@ def assert_activity_application_error( ret = assert_activity_error(err) assert isinstance(ret, ApplicationError) return ret + + +class CustomLogHandler(logging.Handler): + def __init__(self): + super().__init__() + self._trace_identifiers = 0 + + def emit(self, record: logging.LogRecord) -> None: + if ( + hasattr(record, "__temporal_error_identifier") + and getattr(record, "__temporal_error_identifier") == "ActivityFailure" + ): + assert record.msg.startswith("Completing activity as failed") + self._trace_identifiers += 1 + return None + + +async def test_activity_failure_trace_identifier( + client: Client, worker: ExternalWorker +): + @activity.defn + async def raise_error(): + raise RuntimeError("oh no!") + + handler = CustomLogHandler() + activity.logger.base_logger.addHandler(handler) + + try: + with pytest.raises(WorkflowFailureError) as err: + await _execute_workflow_with_activity(client, worker, raise_error) + assert ( + str(assert_activity_application_error(err.value)) == "RuntimeError: oh no!" + ) + assert handler._trace_identifiers == 1 + + finally: + activity.logger.base_logger.removeHandler(CustomLogHandler()) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 0fae6cd7..f8ad5647 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -1961,8 +1961,13 @@ def logs_captured(self, *loggers: logging.Logger): l.setLevel(prev_levels[i]) def find_log(self, starts_with: str) -> Optional[logging.LogRecord]: + return self.find(lambda l: l.message.startswith(starts_with)) + + def find( + self, pred: Callable[[logging.LogRecord], bool] + ) -> Optional[logging.LogRecord]: for record in cast(List[logging.LogRecord], self.log_queue.queue): - if record.message.startswith(starts_with): + if pred(record): return record return None @@ -2058,6 +2063,7 @@ async def run(self) -> None: if not task_fail_once_workflow_has_failed: task_fail_once_workflow_has_failed = True raise RuntimeError("Intentional workflow task failure") + task_fail_once_workflow_has_failed = False # Execute activity that will fail once await workflow.execute_activity( @@ -7975,6 +7981,33 @@ async def test_quick_activity_swallows_cancellation(client: Client): temporalio.worker._workflow_instance._raise_on_cancelling_completed_activity_override = False +async def test_workflow_logging_trace_identifier(client: Client): + with LogCapturer().logs_captured( + temporalio.worker._workflow_instance.logger + ) as capturer: + async with new_worker( + client, + TaskFailOnceWorkflow, + activities=[task_fail_once_activity], + ) as worker: + await client.execute_workflow( + TaskFailOnceWorkflow.run, + id=f"workflow_failure_trace_identifier", + task_queue=worker.task_queue, + ) + + def workflow_failure(l: logging.LogRecord): + if ( + hasattr(l, "__temporal_error_identifier") + and getattr(l, "__temporal_error_identifier") == "WorkflowTaskFailure" + ): + assert l.msg.startswith("Failed activation on workflow") + return True + return False + + assert capturer.find(workflow_failure) is not None + + @activity.defn def use_in_workflow() -> bool: return workflow.in_workflow()