diff --git a/tasktiger/_internal.py b/tasktiger/_internal.py index 05fcb4fc..602f994f 100644 --- a/tasktiger/_internal.py +++ b/tasktiger/_internal.py @@ -36,7 +36,11 @@ # Global task context. We store this globally (and not on the TaskTiger # instance) for consistent results just in case the user has multiple TaskTiger # instances. -g = {'current_task_is_batch': None, 'current_tasks': None} +g = { + 'current_task_is_batch': None, + 'current_tasks': None, + 'current_batch_task': None, +} # from rq def import_attribute(name): diff --git a/tasktiger/logging.py b/tasktiger/logging.py index ef082657..dd8442ca 100644 --- a/tasktiger/logging.py +++ b/tasktiger/logging.py @@ -8,7 +8,22 @@ def tasktiger_processor(logger, method_name, event_dict): Inject the current task id for non-batch tasks. """ - if g['current_tasks'] is not None and not g['current_task_is_batch']: + if not g['current_task_is_batch'] and g['current_tasks'] is not None: event_dict['task_id'] = g['current_tasks'][0].id + elif g['current_task_is_batch'] and g['current_batch_task'] is not None: + event_dict['task_id'] = g['current_batch_task'].id return event_dict + + +def batch_param_iterator(params): + """ + Helper to set current batch task. + + This helper should be used in conjunction with tasktiger_processor + to facilitate logging of task ids. + """ + for i, p in enumerate(params): + g['current_batch_task'] = g['current_tasks'][i] + yield p + g['current_batch_task'] = None diff --git a/tasktiger/task.py b/tasktiger/task.py index d5b5709c..7b073ea7 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -304,6 +304,7 @@ def execute(self): finally: g['current_task_is_batch'] = None g['current_tasks'] = None + g['current_batch_task'] = None def delay(self, when=None, max_queue_size=None): tiger = self.tiger diff --git a/tasktiger/worker.py b/tasktiger/worker.py index 558f710b..93a5725e 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -364,6 +364,7 @@ def _execute_forked(self, tasks, log): is_batch_func = getattr(func, '_task_batch', False) g['current_task_is_batch'] = is_batch_func + g['current_batch_task'] = None with WorkerContextManagerStack( self.config['CHILD_CONTEXT_MANAGERS'] diff --git a/tests/test_logging.py b/tests/test_logging.py index e5919b2a..f30f21b4 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -1,9 +1,10 @@ from __future__ import absolute_import +import pytest import structlog -from tasktiger import TaskTiger, Worker -from tasktiger.logging import tasktiger_processor +from tasktiger import TaskTiger, Worker, g +from tasktiger.logging import tasktiger_processor, batch_param_iterator from .test_base import BaseTestCase from .utils import get_tiger, get_redis @@ -20,44 +21,66 @@ def logging_task(): assert log[1]["task_id"] == tiger.current_task.id +@tiger.task(batch=True) +def logging_batch_task(params): + for i, p in enumerate(batch_param_iterator(params)): + log = logger.info("simple batch task") + # Confirm tasktiger_processor injected task id + assert log[1]["task_id"] == g['current_tasks'][i].id + + +@pytest.fixture +def structlog_processor(): + # Use ReturnLogger for testing + structlog.configure( + processors=[tasktiger_processor], + context_class=dict, + logger_factory=structlog.ReturnLoggerFactory(), + wrapper_class=structlog.stdlib.BoundLogger, + cache_logger_on_first_use=True, + ) + yield + structlog.configure( + processors=[ + structlog.stdlib.add_log_level, + structlog.stdlib.filter_by_level, + structlog.processors.TimeStamper(fmt="iso", utc=True), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer(), + ], + context_class=dict, + logger_factory=structlog.ReturnLoggerFactory(), + wrapper_class=structlog.stdlib.BoundLogger, + cache_logger_on_first_use=True, + ) + + class TestLogging(BaseTestCase): """Test logging.""" - def test_structlog_processor(self): - try: - # Use ReturnLogger for testing - structlog.configure( - processors=[tasktiger_processor], - context_class=dict, - logger_factory=structlog.ReturnLoggerFactory(), - wrapper_class=structlog.stdlib.BoundLogger, - cache_logger_on_first_use=True, - ) - - # Run a simple task. Logging output is verified in - # the task. - self.tiger.delay(logging_task) - queues = self._ensure_queues(queued={"default": 1}) - task = queues["queued"]["default"][0] - assert task["func"] == "tests.test_logging:logging_task" - Worker(self.tiger).run(once=True) - self._ensure_queues(queued={"default": 0}) - assert not self.conn.exists("t:task:%s" % task["id"]) - finally: - structlog.configure( - processors=[ - structlog.stdlib.add_log_level, - structlog.stdlib.filter_by_level, - structlog.processors.TimeStamper(fmt="iso", utc=True), - structlog.processors.StackInfoRenderer(), - structlog.processors.format_exc_info, - structlog.processors.JSONRenderer(), - ], - context_class=dict, - logger_factory=structlog.ReturnLoggerFactory(), - wrapper_class=structlog.stdlib.BoundLogger, - cache_logger_on_first_use=True, - ) + def test_structlog_processor(self, structlog_processor): + + # Logging output is verified in the task. + self.tiger.delay(logging_task) + queues = self._ensure_queues(queued={"default": 1}) + task = queues["queued"]["default"][0] + assert task["func"] == "tests.test_logging:logging_task" + Worker(self.tiger).run(once=True) + self._ensure_queues(queued={"default": 0}) + assert not self.conn.exists("t:task:%s" % task["id"]) + + def test_structlog_processor_batch_task(self, structlog_processor): + + # Logging output is verified in the task. + self.tiger.delay(logging_batch_task, args=('1',)) + self.tiger.delay(logging_batch_task, args=('2',)) + queues = self._ensure_queues(queued={"default": 2}) + task = queues["queued"]["default"][0] + assert task["func"] == "tests.test_logging:logging_batch_task" + Worker(self.tiger).run(once=True) + self._ensure_queues(queued={"default": 0}) + assert not self.conn.exists("t:task:%s" % task["id"]) class TestSetupStructlog(BaseTestCase):