Skip to content

Commit

Permalink
fix issues with pending tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
jefer94 committed Feb 7, 2025
1 parent 3ce53a5 commit 80beece
Show file tree
Hide file tree
Showing 17 changed files with 356 additions and 354 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/v1.10.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# v1.10.0

- Add `priority` to the `TaskManager` model.
- TaskManager doesn't reattempt `mark_task_as_pending`, `mark_task_as_paused`, `mark_task_as_cancelled` and `mark_task_as_reversed` tasks anymore.
- `task_manager` command kills `PENDING` tasks before reattempting them.
51 changes: 26 additions & 25 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,46 +6,47 @@ repo_url: https://github.com/breatheco-de/celery-task-manager-django-plugin
edit_uri: ""

theme:
name: 'material'
palette:
- scheme: 'default'
media: '(prefers-color-scheme: light)'
toggle:
icon: 'material/lightbulb'
name: "Switch to dark mode"
- scheme: 'slate'
media: '(prefers-color-scheme: dark)'
primary: 'blue'
toggle:
icon: 'material/lightbulb-outline'
name: 'Switch to light mode'
features:
- navigation.sections
name: "material"
palette:
- scheme: "default"
media: "(prefers-color-scheme: light)"
toggle:
icon: "material/lightbulb"
name: "Switch to dark mode"
- scheme: "slate"
media: "(prefers-color-scheme: dark)"
primary: "blue"
toggle:
icon: "material/lightbulb-outline"
name: "Switch to light mode"
features:
- navigation.sections

# this define the order of menu
nav:
- "index.md"
- Getting started:
- "index.md"
- Getting started:
- "getting-started/installation.md"
- "getting-started/set-up.md"
- "getting-started/task.md"
- "getting-started/schedule-tasks.md"
- "getting-started/emisors.md"
- Changelog:
- Changelog:
- "changelog/v1.10.0.md"
- "changelog/v1.8.0.md"
- "changelog/v1.7.0.md"
- "changelog/v1.6.0.md"
- "changelog/v1.5.0.md"

plugins:
- search
- search

markdown_extensions:
- pymdownx.highlight:
anchor_linenums: true
- pymdownx.inlinehilite
- pymdownx.snippets
- pymdownx.superfences
- pymdownx.highlight:
anchor_linenums: true
- pymdownx.inlinehilite
- pymdownx.snippets
- pymdownx.superfences

extra_css:
- css/custom.css
- css/custom.css
2 changes: 1 addition & 1 deletion src/task_manager/__about__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2024-present jefer94 <[email protected]>
#
# SPDX-License-Identifier: LGPL-3.0-or-later
__version__ = "1.9.0"
__version__ = "1.10.0"
10 changes: 10 additions & 0 deletions src/task_manager/core/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from datetime import datetime, timezone
from enum import Enum

__all__ = ["DuplicationPolicy"]


class DuplicationPolicy(Enum):
SKIP = "SKIP"
OVERRIDE = "OVERRIDE"
APPEND = "APPEND"
19 changes: 18 additions & 1 deletion src/task_manager/core/decorators.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import functools
import importlib
import inspect
import logging
import traceback
from datetime import datetime
Expand All @@ -14,6 +13,9 @@
from .exceptions import AbortTask, ProgrammingError, RetryTask
from .settings import settings

# from task_manager.core.constants import DuplicationPolicy


__all__ = ["Task"]

logger = logging.getLogger(__name__)
Expand All @@ -31,6 +33,7 @@ class CircuitBreakerError(Exception):
class TaskManager:
current_page: Optional[int]
total_pages: Optional[int]
priority: Optional[int]
attempts: int

task_module: str
Expand All @@ -56,6 +59,13 @@ class TaskManager:


class Task(object):
priority: int
bind: bool
fallback: Optional[Callable]
reverse: Optional[Callable]
transaction_id: Optional[Any]
is_transaction: bool
# duplication_policy: Optional[str]

def __init__(self, *args, **kwargs):
self.transaction_id = None
Expand All @@ -64,6 +74,12 @@ def __init__(self, *args, **kwargs):
self.reverse = kwargs.pop("reverse", None)
self.bind = kwargs.get("bind", False)
self.priority = kwargs.pop("priority", settings["DEFAULT"])
# self.duplication_policy = kwargs.pop("duplication_policy", settings["DUPLICATION_POLICY"])
# if self.duplication_policy is not None and self.duplication_policy not in [x.value for x in DuplicationPolicy]:
# raise ValueError(
# f"Invalid duplication policy: {self.duplication_policy}, must be one of: {', '.join([x.value for x in DuplicationPolicy])}"
# )

kwargs["priority"] = settings["SCHEDULER"]

if self.fallback and not callable(self.fallback):
Expand Down Expand Up @@ -170,6 +186,7 @@ def wrapper(*args, **kwargs):
current_page=page,
total_pages=total_pages,
last_run=last_run,
priority=self.priority,
)

kwargs["task_manager_id"] = x.id
Expand Down
2 changes: 2 additions & 0 deletions src/task_manager/core/settings.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
from datetime import timedelta

# importing signals to register them
from . import signals # noqa
from .exceptions import ProgrammingError

__init__ = ["set_settings", "get_setting"]
Expand Down
18 changes: 18 additions & 0 deletions src/task_manager/core/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from celery import current_app

# `after_task_publish` is available in celery 3.1+
# for older versions use the deprecated `task_sent` signal
from celery.signals import after_task_publish

# when using celery versions older than 4.0, use body instead of headers


@after_task_publish.connect
def update_sent_state(sender=None, headers=None, **kwargs):
# the task may not exist if sent using `send_task` which
# sends tasks by name, so fall back to the default result backend
# if that is the case.
task = current_app.tasks.get(sender)
backend = task.backend if task else current_app.backend

backend.store_result(headers["id"], None, "SENT")
1 change: 1 addition & 0 deletions src/task_manager/django/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class TaskManager(models.Model):
status = models.CharField(max_length=20, choices=TASK_STATUS, default=PENDING)
status_message = models.TextField(blank=True, null=True, max_length=255)
task_id = models.CharField(max_length=36, default="", blank=True)
priority = models.IntegerField(default=None, blank=True, null=True)

killed = models.BooleanField(default=False)
fixed = models.BooleanField(default=False, help_text="True if any inconsistence was fixed")
Expand Down
83 changes: 37 additions & 46 deletions src/task_manager/django/tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# https://docs.celeryq.dev/en/stable/reference/celery.result.html#celery.result.AsyncResult.status

import importlib
import logging
from datetime import timedelta
from typing import Any

from celery import shared_task
from django.utils import timezone
from celery.result import AsyncResult

from task_manager.core.exceptions import RetryTask
from task_manager.core.settings import get_setting
Expand All @@ -15,7 +16,6 @@

logger = logging.getLogger(__name__)

TOLERANCE = 10

PRIORITY = get_setting("TASK_MANAGER")

Expand All @@ -34,15 +34,24 @@ def mark_task_as_cancelled(task_manager_id):
logger.warning(f"TaskManager {task_manager_id} was already DONE")
return

pending_task = AsyncResult(x.task_id)
if pending_task.status == "STARTED":
logger.warning(f"TaskManager {task_manager_id} is being executed, skipping")
return

x.status = "CANCELLED"
x.killed = pending_task.status == "SENT"

pending_task.revoke(terminate=True)

x.save()

logger.info(f"TaskManager {task_manager_id} is being marked as CANCELLED")
logger.info(f"TaskManager {task_manager_id} marked as CANCELLED")


# do not use our own task decorator
@shared_task(bind=False, priority=PRIORITY)
def mark_task_as_reversed(task_manager_id, *, attempts=0, force=False):
def mark_task_as_reversed(task_manager_id, *, force=False):
logger.info(f"Running mark_task_as_reversed for {task_manager_id}")

x = TaskManager.objects.filter(id=task_manager_id).first()
Expand All @@ -54,30 +63,25 @@ def mark_task_as_reversed(task_manager_id, *, attempts=0, force=False):
logger.warning(f"TaskManager {task_manager_id} does not have a reverse function")
return

if not force and (
x.status != "DONE"
and not x.last_run < timezone.now() - timedelta(minutes=TOLERANCE)
and not x.killed
and attempts < 10
):
logger.warning(f"TaskManager {task_manager_id} was not killed, scheduling to run it again")

x.status = "CANCELLED"
x.save()

mark_task_as_reversed.apply_async(
args=(task_manager_id,), kwargs={"attempts": attempts + 1}, eta=timezone.now() + timedelta(seconds=30)
if not force and x.status != "DONE":
logger.warning(
f"TaskManager {task_manager_id} is '{x.status}', skipping, you could use force=True to reverse it"
)
return

if force:
pending_task = AsyncResult(x.task_id)
pending_task.revoke(terminate=True)

x.killed = False
x.status = "REVERSED"
x.save()

module = importlib.import_module(x.reverse_module)
function = getattr(module, x.reverse_name)
function(*x.arguments["args"], **x.arguments["kwargs"])

logger.info(f"TaskManager {task_manager_id} is being marked as REVERSED")
logger.info(f"TaskManager {task_manager_id} marked as REVERSED")


# do not use our own task decorator
Expand All @@ -95,14 +99,15 @@ def mark_task_as_paused(task_manager_id):
return

x.status = "PAUSED"

x.save()

logger.info(f"TaskManager {task_manager_id} is being marked as PAUSED")
logger.info(f"TaskManager {task_manager_id} marked as PAUSED")


# do not use our own task decorator
@shared_task(bind=False, priority=PRIORITY)
def mark_task_as_pending(task_manager_id, *, attempts=0, force=False, last_run=None):
def mark_task_as_pending(task_manager_id, *, force=False):
logger.info(f"Running mark_task_as_pending for {task_manager_id}")

x = TaskManager.objects.filter(id=task_manager_id).first()
Expand All @@ -111,37 +116,23 @@ def mark_task_as_pending(task_manager_id, *, attempts=0, force=False, last_run=N
return

if x.status in ["DONE", "CANCELLED", "REVERSED"]:
logger.warning(f"TaskManager {task_manager_id} was already DONE")
logger.warning(f"TaskManager {task_manager_id} is already DONE")
return

if last_run and last_run != x.last_run:
logger.warning(f"TaskManager {task_manager_id} is already running")
pending_task = AsyncResult(x.task_id)
if force is False and pending_task.status == "SENT":
logger.warning(f"TaskManager {task_manager_id} scheduled, skipping")
return

tolerance = TOLERANCE
if x.status == "SCHEDULED":
tolerance *= 3

if (
force is False
and not x.last_run < timezone.now() - timedelta(minutes=tolerance)
and not x.killed
and attempts < 10
):
logger.warning(f"TaskManager {task_manager_id} was not killed, scheduling to run it again")

mark_task_as_pending.apply_async(
args=(task_manager_id,),
kwargs={
"attempts": attempts + 1,
"last_run": last_run or x.last_run,
},
eta=timezone.now() + timedelta(seconds=30),
)
if pending_task.status == "STARTED":
logger.warning(f"TaskManager {task_manager_id} is being executed, skipping")
return

x.status = "PENDING"
x.killed = False
x.killed = pending_task.status == "SENT"

pending_task.revoke(terminate=True)

x.save()

module = importlib.import_module(x.task_module)
Expand All @@ -156,7 +147,7 @@ def mark_task_as_pending(task_manager_id, *, attempts=0, force=False, last_run=N
},
)

logger.info(f"TaskManager {task_manager_id} is being marked as PENDING")
logger.info(f"TaskManager {task_manager_id} marked as PENDING")


# do not use our own task decorator
Expand Down
7 changes: 7 additions & 0 deletions src/task_manager/management/commands/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any

from celery import Task
from celery.result import AsyncResult

# from breathecode.notify.actions import send_email_message
from django.core.management.base import BaseCommand
Expand Down Expand Up @@ -30,6 +31,7 @@
]

LIMITS = {
"extra_small": 20,
"small": 100,
"medium": 1000,
}
Expand Down Expand Up @@ -177,6 +179,11 @@ def rerun_pending_tasks(self):
ids = task_managers[a:b].values_list("id", flat=True)

for id in ids:
pending_task = AsyncResult(id)
if pending_task.status == "SENT":
continue

pending_task.revoke(terminate=True)
tasks.mark_task_as_pending.delay(id, force=True)

if ids:
Expand Down
Loading

0 comments on commit 80beece

Please sign in to comment.