Skip to content

[IMP] queue_job_cron_jobrunner: channel #750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: 14.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 90 additions & 21 deletions queue_job_cron_jobrunner/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

import logging
import traceback
from collections import defaultdict
from datetime import datetime
from io import StringIO

import psutil
from psycopg2 import OperationalError

from odoo import _, api, fields, models, tools
Expand All @@ -19,6 +21,7 @@
RetryableJobError,
)
from odoo.addons.queue_job.job import Job
from odoo.addons.queue_job.jobrunner import QueueJobRunner

_logger = logging.getLogger(__name__)

Expand All @@ -27,47 +30,86 @@
_inherit = "queue.job"

@api.model
def _acquire_one_job(self):
def _acquire_one_job(self, commit=False):
"""Acquire the next job to be run.

:returns: queue.job record (locked for update)
"""
# TODO: This method should respect channel priority and capacity,
# rather than just fetching them by creation date.
self.flush()
runner = QueueJobRunner.from_environ_or_config()
self.env.cr.execute(
"""
SELECT id
FROM queue_job
WHERE state = 'pending'
AND (eta IS NULL OR eta <= (now() AT TIME ZONE 'UTC'))
ORDER BY priority, date_created
LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED
FOR NO KEY UPDATE
"""
)
row = self.env.cr.fetchone()
return self.browse(row and row[0])
rows = self.env.cr.fetchall()

channels = defaultdict(int)
for queue_job in self.search([("state", "=", "started")]):
if not queue_job.channel:
continue

Check warning on line 54 in queue_job_cron_jobrunner/models/queue_job.py

View check run for this annotation

Codecov / codecov/patch

queue_job_cron_jobrunner/models/queue_job.py#L54

Added line #L54 was not covered by tests
channels[queue_job.channel] += 1
channels_without_capacity = set()
for channel_str, running in channels.items():
channel = runner.channel_manager.get_channel_by_name(
channel_str, autocreate=True
)
if channel.capacity and channel.capacity <= running:
channels_without_capacity.add(channel_str)
channels_without_capacity.discard(
"root"
) # root must be disabled to avoid normal jobrunner
_logger.info(
"_acquire_one_job channels_without_capacity %s",
channels_without_capacity,
)

result = self.browse()
for row in rows:
queue_job = self.browse(row[0])
if queue_job.channel and queue_job.channel in channels_without_capacity:
continue
job = Job._load_from_db_record(queue_job)
job.set_started()
job.store()
_logger.info(
"_acquire_one_job queue.job %s[channel=%s,uuid=%s] started",
row[0],
job.channel,
job.uuid,
)
result = queue_job
break
self.flush()
if commit: # pragma: no cover
self.env.cr.commit() # pylint: disable=invalid-commit
return result

def _process(self, commit=False):
"""Process the job"""
self.ensure_one()
job = Job._load_from_db_record(self)
# Set it as started
job.set_started()
job.store()
_logger.debug("%s started", job.uuid)
# TODO: Commit the state change so that the state can be read from the UI
# while the job is processing. However, doing this will release the
# lock on the db, so we need to find another way.
# if commit:
# self.flush()
# self.env.cr.commit()

# Actual processing
try:
try:
with self.env.cr.savepoint():
_logger.info(
"perform %s[channel=%s,uuid=%s]",
self.id,
self.channel,
self.uuid,
)
job.perform()
_logger.info(
"performed %s[channel=%s,uuid=%s]",
self.id,
self.channel,
self.uuid,
)
job.set_done()
job.store()
except OperationalError as err:
Expand All @@ -87,20 +129,28 @@
msg = _("Job interrupted and set to Done: nothing to do.")
job.set_done(msg)
job.store()
_logger.info(

Check warning on line 132 in queue_job_cron_jobrunner/models/queue_job.py

View check run for this annotation

Codecov / codecov/patch

queue_job_cron_jobrunner/models/queue_job.py#L132

Added line #L132 was not covered by tests
"interrupted %s[channel=%s,uuid=%s]", self.id, self.channel, self.uuid
)

except RetryableJobError as err:
# delay the job later, requeue
job.postpone(result=str(err), seconds=5)
job.set_pending(reset_retry=False)
job.store()
_logger.debug("%s postponed", job)
_logger.info(

Check warning on line 141 in queue_job_cron_jobrunner/models/queue_job.py

View check run for this annotation

Codecov / codecov/patch

queue_job_cron_jobrunner/models/queue_job.py#L141

Added line #L141 was not covered by tests
"postponed %s[channel=%s,uuid=%s]", self.id, self.channel, self.uuid
)

except (FailedJobError, Exception):
with StringIO() as buff:
traceback.print_exc(file=buff)
_logger.error(buff.getvalue())
job.set_failed(exc_info=buff.getvalue())
job.store()
_logger.info(
"failed %s[channel=%s,uuid=%s]", self.id, self.channel, self.uuid
)

if commit: # pragma: no cover
self.env["base"].flush()
Expand All @@ -113,10 +163,11 @@
@api.model
def _job_runner(self, commit=True):
"""Short-lived job runner, triggered by async crons"""
job = self._acquire_one_job()
self._release_started_jobs(commit=commit)
job = self._acquire_one_job(commit=commit)
while job:
job._process(commit=commit)
job = self._acquire_one_job()
job = self._acquire_one_job(commit=commit)
# TODO: If limit_time_real_cron is reached before all the jobs are done,
# the worker will be killed abruptly.
# Ideally, find a way to know if we're close to reaching this limit,
Expand Down Expand Up @@ -166,6 +217,24 @@
if delayed_etas:
self._cron_trigger(at=list(delayed_etas))

@api.model
def _release_started_jobs(self, commit=False):
pids = [x.pid for x in psutil.process_iter()]
for record in self.search(
[("state", "=", "started"), ("worker_pid", "not in", pids)]
):
job = Job._load_from_db_record(record)
job.set_pending()
job.store()
_logger.info(
"release started job %s[channel=%s,uuid=%s]",
record.id,
record.channel,
record.uuid,
)
if commit: # pragma: no cover
self.env.cr.commit() # pylint: disable=invalid-commit

@api.model_create_multi
def create(self, vals_list):
# When jobs are created, also create the cron trigger
Expand Down
112 changes: 95 additions & 17 deletions queue_job_cron_jobrunner/tests/test_queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

from datetime import datetime
from unittest import mock

from freezegun import freeze_time

from odoo import SUPERUSER_ID, api
from odoo.tests.common import TransactionCase
from odoo.tools import mute_logger

from odoo.addons.queue_job.jobrunner import QueueJobRunner


class TestQueueJob(TransactionCase):
def setUp(self):
Expand Down Expand Up @@ -56,7 +59,6 @@ def test_queue_job_cron_trigger_enqueue_dependencies(self):
# if the state is "waiting_dependencies", it means the "enqueue_waiting()"
# step has not been done when the parent job has been done
self.assertEqual(job_record_depends.state, "done", "Processed OK")
self.assertEqual(self.cron.nextcall, datetime(2022, 2, 22, 22, 22, 22))

@freeze_time("2022-02-22 22:22:22")
def test_concurrent_cron_access(self):
Expand All @@ -70,20 +72,7 @@ def test_concurrent_cron_access(self):
(self.cron.id,),
log_exceptions=False,
)

delayable = self.env["res.partner"].delayable().create({"name": "test"})
delayable2 = self.env["res.partner"].delayable().create({"name": "test2"})
delayable.on_done(delayable2)
delayable.delay()
job_record = delayable._generated_job.db_record()
job_record_depends = delayable2._generated_job.db_record()

self.env["queue.job"]._job_runner(commit=False)

self.assertEqual(job_record.state, "done", "Processed OK")
# if the state is "waiting_dependencies", it means the "enqueue_waiting()"
# step has not been done when the parent job has been done
self.assertEqual(job_record_depends.state, "done", "Processed OK")
self.env["res.partner"].delayable().create({"name": "test"})
self.assertNotEqual(self.cron.nextcall, datetime(2022, 2, 22, 22, 22, 22))

def test_acquire_one_job_use_priority(self):
Expand All @@ -98,7 +87,9 @@ def test_acquire_one_job_use_priority(self):
with freeze_time("2024-01-01 10:03:01"):
self.env["res.partner"].with_delay(priority=2).create({"name": "test"})

self.assertEqual(self.env["queue.job"]._acquire_one_job(), job.db_record())
self.assertEqual(
self.env["queue.job"]._acquire_one_job(commit=False), job.db_record()
)

def test_acquire_one_job_consume_the_oldest_first(self):
with freeze_time("2024-01-01 10:01:01"):
Expand All @@ -112,4 +103,91 @@ def test_acquire_one_job_consume_the_oldest_first(self):
with freeze_time("2024-01-01 10:03:01"):
self.env["res.partner"].with_delay(priority=30).create({"name": "test"})

self.assertEqual(self.env["queue.job"]._acquire_one_job(), job.db_record())
self.assertEqual(
self.env["queue.job"]._acquire_one_job(commit=False), job.db_record()
)

def test_acquire_one_job_starts_job(self):
job = self.env["res.partner"].with_delay(priority=1).create({"name": "test"})

result = self.env["queue.job"]._acquire_one_job(commit=False)

self.assertEqual(result, job.db_record())
self.assertEqual(job.db_record().state, "started")

def test_acquire_one_job_do_not_overload_channel(self):
runner = QueueJobRunner.from_environ_or_config()
runner.channel_manager.get_channel_by_name(
"root.foobar", autocreate=True
).capacity = 2
job1 = (
self.env["res.partner"]
.with_delay(channel="root.foobar")
.create({"name": "test1"})
)
job2 = (
self.env["res.partner"]
.with_delay(channel="root.foobar")
.create({"name": "test2"})
)
self.env["res.partner"].with_delay(channel="root.foobar").create(
{"name": "test3"}
)

with mock.patch.object(
QueueJobRunner, "from_environ_or_config", return_value=runner
):
first_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False)
second_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False)
third_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False)

self.assertEqual(first_acquired_job, job1.db_record())
self.assertEqual(second_acquired_job, job2.db_record())
self.assertEqual(third_acquired_job, self.env["queue.job"].browse())

def test_acquire_one_job_root_capacity_ignored(self):
runner = QueueJobRunner.from_environ_or_config()
runner.channel_manager.get_channel_by_name("root", autocreate=True).capacity = 0
job1 = (
self.env["res.partner"].with_delay(channel="root").create({"name": "test1"})
)
job2 = (
self.env["res.partner"].with_delay(channel="root").create({"name": "test2"})
)
job3 = (
self.env["res.partner"].with_delay(channel="root").create({"name": "test3"})
)

with mock.patch.object(
QueueJobRunner, "from_environ_or_config", return_value=runner
):
first_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False)
second_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False)
third_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False)

self.assertEqual(first_acquired_job, job1.db_record())
self.assertEqual(second_acquired_job, job2.db_record())
self.assertEqual(third_acquired_job, job3.db_record())

@freeze_time("2022-02-22 22:22:22")
def test_queue_job_creation_create_change_next_call(self):
self.cron.nextcall = datetime(2021, 1, 21, 21, 21, 21)
self.env["res.partner"].with_delay().create({"name": "test"})
self.assertNotEqual(self.cron.nextcall, datetime(2022, 2, 22, 22, 22, 22))

def test_release_started_jobs(self):
job_known_pid = self.env["res.partner"].with_delay().create({"name": "test"})
job_known_pid.set_started()
job_known_pid.store()
known_pid = job_known_pid.db_record().worker_pid
job_unknown_pid = self.env["res.partner"].with_delay().create({"name": "test"})
job_unknown_pid.set_started()
job_unknown_pid.store()
job_unknown_pid.db_record().worker_pid = -1

self.env["queue.job"]._release_started_jobs(commit=False)

self.assertEqual(job_unknown_pid.db_record().state, "pending")
self.assertEqual(job_unknown_pid.db_record().worker_pid, 0)
self.assertEqual(job_known_pid.db_record().state, "started")
self.assertEqual(job_known_pid.db_record().worker_pid, known_pid)