Skip to content

Commit

Permalink
boost
Browse files Browse the repository at this point in the history
  • Loading branch information
domdinicola committed Jan 2, 2025
1 parent f2649e0 commit 4eb3369
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 0 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ dependencies = [
"paramiko",
"phonenumbers",
"psycopg[binary]",
"python-redis-lock[django]",
"sentry-sdk",
"social-auth-app-django",
"social-auth-core",
Expand Down
18 changes: 18 additions & 0 deletions src/hope_payment_gateway/apps/gateway/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from adminactions.export import base_export
from adminfilters.autocomplete import AutoCompleteFilter
from adminfilters.mixin import AdminFiltersMixin
from django_celery_boost.admin import CeleryTaskModelAdmin
from jsoneditor.forms import JSONEditor
from viewflow.fsm import TransitionNotAllowed

Expand All @@ -37,6 +38,7 @@
moneygram_update_status,
)
from hope_payment_gateway.apps.gateway.models import (
AsyncJob,
DeliveryMechanism,
ExportTemplate,
FinancialServiceProvider,
Expand Down Expand Up @@ -502,3 +504,19 @@ class DeliveryMechanismAdmin(ExtraButtonsMixin, admin.ModelAdmin):
class ExportTemplateAdmin(ExtraButtonsMixin, admin.ModelAdmin):
list_display = ("fsp", "config_key", "delivery_mechanism")
search_fields = ("config_key", "delivery_mechanism__name")


@admin.register(AsyncJob)
class AsyncJobAdmin(AdminFiltersMixin, CeleryTaskModelAdmin, admin.ModelAdmin):
list_display = ("type", "verbose_status", "owner")
autocomplete_fields = ("owner", "content_type")
list_filter = (
("owner", AutoCompleteFilter),
"type",
# FailedFilter,
)

def get_readonly_fields(self, request: "HttpRequest", obj: "Optional[AsyncJob]" = None):
if obj:
return ("owner", "local_status", "type", "action", "sentry_id")
return super().get_readonly_fields(request, obj)
96 changes: 96 additions & 0 deletions src/hope_payment_gateway/apps/gateway/migrations/0027_asyncjob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Generated by Django 5.1.4 on 2024-12-17 17:58

import concurrency.fields
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("gateway", "0026_alter_paymentinstruction_status"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]

operations = [
migrations.CreateModel(
name="AsyncJob",
fields=[
("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("version", concurrency.fields.AutoIncVersionField(default=0, help_text="record revision number")),
(
"curr_async_result_id",
models.CharField(
blank=True,
editable=False,
help_text="Current (active) AsyncResult is",
max_length=36,
null=True,
),
),
(
"last_async_result_id",
models.CharField(
blank=True, editable=False, help_text="Latest executed AsyncResult is", max_length=36, null=True
),
),
("datetime_created", models.DateTimeField(auto_now_add=True, help_text="Creation date and time")),
(
"datetime_queued",
models.DateTimeField(
blank=True, help_text="Queueing date and time", null=True, verbose_name="Queued At"
),
),
(
"repeatable",
models.BooleanField(
blank=True, default=False, help_text="Indicate if the job can be repeated as-is"
),
),
("celery_history", models.JSONField(blank=True, default=dict, editable=False)),
("local_status", models.CharField(blank=True, default="", editable=False, max_length=100, null=True)),
(
"group_key",
models.CharField(
blank=True,
editable=False,
help_text="Tasks with the same group key will not run in parallel",
max_length=255,
null=True,
),
),
(
"type",
models.CharField(
choices=[("FQN", "Operation"), ("ACTION", "Action"), ("TASK", "Task")], max_length=50
),
),
("config", models.JSONField(blank=True, default=dict)),
("action", models.CharField(blank=True, max_length=500, null=True)),
("description", models.CharField(blank=True, max_length=255, null=True)),
("sentry_id", models.CharField(blank=True, max_length=255, null=True)),
(
"instruction",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="jobs",
to="gateway.paymentinstruction",
),
),
(
"owner",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="%(app_label)s_%(class)s_jobs",
to=settings.AUTH_USER_MODEL,
),
),
],
options={
"permissions": (("debug_job", "Can debug background jobs"),),
},
),
]
62 changes: 62 additions & 0 deletions src/hope_payment_gateway/apps/gateway/models.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import csv

from django.apps import apps
from django.db import models
from django.utils.module_loading import import_string
from django.utils.translation import gettext_lazy as _

import sentry_sdk
from adminactions.api import delimiters, quotes
from django_celery_boost.models import CeleryTaskModel
from model_utils.models import TimeStampedModel
from strategy_field.fields import StrategyField

Expand Down Expand Up @@ -196,3 +200,61 @@ class Meta:

def __str__(self) -> str:
return f"{self.fsp} / {self.config_key}"

class AsyncJob(CeleryTaskModel, models.Model):
class JobType(models.TextChoices):
STANDARD_TASK = "FQN", "Operation"
ADMIN_ACTION = "ACTION", "Action"
JOB_TASK = "TASK", "Task"

type = models.CharField(max_length=50, choices=JobType.choices)
instruction = models.ForeignKey(PaymentInstruction, related_name="jobs", on_delete=models.CASCADE)
config = models.JSONField(default=dict, blank=True)
action = models.CharField(max_length=500, blank=True, null=True)
description = models.CharField(max_length=255, blank=True, null=True)
sentry_id = models.CharField(max_length=255, blank=True, null=True)

celery_task_name = "hope_payment_gateway.apps.gateway.tasks.sync_job_task"

def __str__(self):
return self.description or f"Background Job #{self.pk}"

class Meta:
permissions = (("debug_job", "Can debug background jobs"),)

@property
def queue_position(self) -> int:
try:
return super().queue_position
except Exception:
return 0

@property
def started(self) -> str:
try:
return self.task_info["started_at"]
except Exception:
return "="

def execute(self):
sid = None
try:
func = import_string(self.action)
match self.type:
case AsyncJob.JobType.FQN:
return func(**self.config)
case AsyncJob.JobType.ACTION:
model = apps.get_model(self.config["model_name"])
qs = model.objects.all()
if self.config["pks"] != "__all__":
qs = qs.filter(pk__in=self.config["pks"])
return func(qs, **self.config.get("kwargs", {}))
case AsyncJob.JobType.TASK:
return func(self)
except Exception as e:
sid = sentry_sdk.capture_exception(e)
raise e
finally:
if sid:
self.sentry_id = sid
self.save(update_fields=["sentry_id"])
62 changes: 62 additions & 0 deletions src/hope_payment_gateway/apps/gateway/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import contextlib
import logging
from typing import TYPE_CHECKING, Any

from django.core.cache import cache

import sentry_sdk
from redis_lock import Lock

from hope_payment_gateway.apps.gateway.models import AsyncJob
from hope_payment_gateway.celery import app

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
from redis_lock.django_cache import RedisCache

cache: RedisCache


@contextlib.contextmanager
def lock_job(job: AsyncJob) -> Lock:
lock = None
if job.group_key:
lock_key = f"lock:{job.group_key}"
# Get a lock with a 60-second lifetime but keep renewing it automatically
# to ensure the lock is held for as long as the Python process is running.
lock = cache.lock(lock_key, 60, auto_renewal=True)
yield lock.__enter__()
else:
yield
if lock:
lock.release()


@app.task()
def sync_job_task(pk: int, version: int) -> dict[str, Any]:
try:
job: AsyncJob = AsyncJob.objects.select_related("program", "program__country_office", "owner").get(
pk=pk, version=version
)
except AsyncJob.DoesNotExist as e:
sentry_sdk.capture_exception(e)
raise e

with lock_job(job):
try:
scope = sentry_sdk.get_current_scope()
sentry_sdk.set_tag("business_area", job.program.country_office.slug)
sentry_sdk.set_tag("project", job.program.name)
sentry_sdk.set_user = {"id": job.owner.pk, "email": job.owner.email}
return job.execute()
except Exception:
# error is logged in job.execute
raise
finally:
scope.clear()


@app.task()
def removed_expired_jobs(**kwargs):
AsyncJob.objects.filter(**kwargs).delete()
32 changes: 32 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 4eb3369

Please sign in to comment.