Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch task log iterator #171

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion tasktiger/_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
# Global task context. We store this globally (and not on the TaskTiger
# instance) for consistent results just in case the user has multiple TaskTiger
# instances.
g = {'current_task_is_batch': None, 'current_tasks': None}
g = {
'current_task_is_batch': None,
'current_tasks': None,
'current_batch_task': None,
}

# from rq
def import_attribute(name):
Expand Down
17 changes: 16 additions & 1 deletion tasktiger/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,22 @@ def tasktiger_processor(logger, method_name, event_dict):
Inject the current task id for non-batch tasks.
"""

if g['current_tasks'] is not None and not g['current_task_is_batch']:
if not g['current_task_is_batch'] and g['current_tasks'] is not None:
event_dict['task_id'] = g['current_tasks'][0].id
elif g['current_task_is_batch'] and g['current_batch_task'] is not None:
event_dict['task_id'] = g['current_batch_task'].id

return event_dict


def batch_param_iterator(params):
"""
Helper to set current batch task.

This helper should be used in conjunction with tasktiger_processor
to facilitate logging of task ids.
"""
for i, p in enumerate(params):
g['current_batch_task'] = g['current_tasks'][i]
yield p
g['current_batch_task'] = None
1 change: 1 addition & 0 deletions tasktiger/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ def execute(self):
finally:
g['current_task_is_batch'] = None
g['current_tasks'] = None
g['current_batch_task'] = None

def delay(self, when=None, max_queue_size=None):
tiger = self.tiger
Expand Down
1 change: 1 addition & 0 deletions tasktiger/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ def _execute_forked(self, tasks, log):

is_batch_func = getattr(func, '_task_batch', False)
g['current_task_is_batch'] = is_batch_func
g['current_batch_task'] = None

with WorkerContextManagerStack(
self.config['CHILD_CONTEXT_MANAGERS']
Expand Down
97 changes: 60 additions & 37 deletions tests/test_logging.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import absolute_import

import pytest
import structlog

from tasktiger import TaskTiger, Worker
from tasktiger.logging import tasktiger_processor
from tasktiger import TaskTiger, Worker, g
from tasktiger.logging import tasktiger_processor, batch_param_iterator

from .test_base import BaseTestCase
from .utils import get_tiger, get_redis
Expand All @@ -20,44 +21,66 @@ def logging_task():
assert log[1]["task_id"] == tiger.current_task.id


@tiger.task(batch=True)
def logging_batch_task(params):
for i, p in enumerate(batch_param_iterator(params)):
log = logger.info("simple batch task")
# Confirm tasktiger_processor injected task id
assert log[1]["task_id"] == g['current_tasks'][i].id


@pytest.fixture
def structlog_processor():
# Use ReturnLogger for testing
structlog.configure(
processors=[tasktiger_processor],
context_class=dict,
logger_factory=structlog.ReturnLoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
yield
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.stdlib.filter_by_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
context_class=dict,
logger_factory=structlog.ReturnLoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)


class TestLogging(BaseTestCase):
"""Test logging."""

def test_structlog_processor(self):
try:
# Use ReturnLogger for testing
structlog.configure(
processors=[tasktiger_processor],
context_class=dict,
logger_factory=structlog.ReturnLoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)

# Run a simple task. Logging output is verified in
# the task.
self.tiger.delay(logging_task)
queues = self._ensure_queues(queued={"default": 1})
task = queues["queued"]["default"][0]
assert task["func"] == "tests.test_logging:logging_task"
Worker(self.tiger).run(once=True)
self._ensure_queues(queued={"default": 0})
assert not self.conn.exists("t:task:%s" % task["id"])
finally:
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.stdlib.filter_by_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
context_class=dict,
logger_factory=structlog.ReturnLoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
def test_structlog_processor(self, structlog_processor):

# Logging output is verified in the task.
self.tiger.delay(logging_task)
queues = self._ensure_queues(queued={"default": 1})
task = queues["queued"]["default"][0]
assert task["func"] == "tests.test_logging:logging_task"
Worker(self.tiger).run(once=True)
self._ensure_queues(queued={"default": 0})
assert not self.conn.exists("t:task:%s" % task["id"])

def test_structlog_processor_batch_task(self, structlog_processor):

# Logging output is verified in the task.
self.tiger.delay(logging_batch_task, args=('1',))
self.tiger.delay(logging_batch_task, args=('2',))
queues = self._ensure_queues(queued={"default": 2})
task = queues["queued"]["default"][0]
assert task["func"] == "tests.test_logging:logging_batch_task"
Worker(self.tiger).run(once=True)
self._ensure_queues(queued={"default": 0})
assert not self.conn.exists("t:task:%s" % task["id"])


class TestSetupStructlog(BaseTestCase):
Expand Down