diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index 01e6a89015..2d74ea3fc1 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -29,7 +29,7 @@ }, "installable": True, "development_status": "Mature", - "maintainers": ["guewen"], + "maintainers": ["guewen", "sbidoul"], "post_init_hook": "post_init_hook", "post_load": "post_load", } diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index da0a21c701..4a039d8eba 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -27,15 +27,48 @@ class RunJobController(http.Controller): - def _try_perform_job(self, env, job): - """Try to perform the job.""" + @classmethod + def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: + """Acquire a job for execution. + + - make sure it is in ENQUEUED state + - mark it as STARTED and commit the state change + - acquire the job lock + + If successful, return the Job instance, otherwise return None. This + function may fail to acquire the job is not in the expected state or is + already locked by another worker. + """ + env.cr.execute( + "SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s " + "FOR NO KEY UPDATE SKIP LOCKED", + (job_uuid, ENQUEUED), + ) + if not env.cr.fetchone(): + _logger.warning( + "was requested to run job %s, but it does not exist, " + "or is not in state %s, or is being handled by another worker", + job_uuid, + ENQUEUED, + ) + return None + job = Job.load(env, job_uuid) + assert job and job.state == ENQUEUED job.set_started() job.store() env.cr.commit() - job.lock() + if not job.lock(): + _logger.warning( + "was requested to run job %s, but it could not be locked", + job_uuid, + ) + return None + return job + @classmethod + def _try_perform_job(cls, env, job): + """Try to perform the job, mark it done and commit if successful.""" _logger.debug("%s started", job) - job.perform() # Triggers any stored computed fields before calling 'set_done' # so that will be part of the 'exec_time' @@ -46,18 +79,20 @@ def _try_perform_job(self, env, job): env.cr.commit() _logger.debug("%s done", job) - def _enqueue_dependent_jobs(self, env, job): + @classmethod + def _enqueue_dependent_jobs(cls, env, job): tries = 0 while True: try: - job.enqueue_waiting() + with job.env.cr.savepoint(): + job.enqueue_waiting() except OperationalError as err: # Automatically retry the typical transaction serialization # errors if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: raise if tries >= DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE: - _logger.info( + _logger.error( "%s, maximum number of tries reached to update dependencies", errorcodes.lookup(err.pgcode), ) @@ -75,17 +110,8 @@ def _enqueue_dependent_jobs(self, env, job): else: break - @http.route( - "/queue_job/runjob", - type="http", - auth="none", - save_session=False, - readonly=False, - ) - def runjob(self, db, job_uuid, **kw): - http.request.session.db = db - env = http.request.env(user=SUPERUSER_ID) - + @classmethod + def _runjob(cls, env: api.Environment, job: Job) -> None: def retry_postpone(job, message, seconds=None): job.env.clear() with Registry(job.env.cr.dbname).cursor() as new_cr: @@ -94,26 +120,9 @@ def retry_postpone(job, message, seconds=None): job.set_pending(reset_retry=False) job.store() - # ensure the job to run is in the correct state and lock the record - env.cr.execute( - "SELECT state FROM queue_job WHERE uuid=%s AND state=%s FOR UPDATE", - (job_uuid, ENQUEUED), - ) - if not env.cr.fetchone(): - _logger.warning( - "was requested to run job %s, but it does not exist, " - "or is not in state %s", - job_uuid, - ENQUEUED, - ) - return "" - - job = Job.load(env, job_uuid) - assert job and job.state == ENQUEUED - try: try: - self._try_perform_job(env, job) + cls._try_perform_job(env, job) except OperationalError as err: # Automatically retry the typical transaction serialization # errors @@ -131,7 +140,6 @@ def retry_postpone(job, message, seconds=None): # traceback in the logs we should have the traceback when all # retries are exhausted env.cr.rollback() - return "" except (FailedJobError, Exception) as orig_exception: buff = StringIO() @@ -141,19 +149,18 @@ def retry_postpone(job, message, seconds=None): job.env.clear() with Registry(job.env.cr.dbname).cursor() as new_cr: job.env = job.env(cr=new_cr) - vals = self._get_failure_values(job, traceback_txt, orig_exception) + vals = cls._get_failure_values(job, traceback_txt, orig_exception) job.set_failed(**vals) job.store() buff.close() raise _logger.debug("%s enqueue depends started", job) - self._enqueue_dependent_jobs(env, job) + cls._enqueue_dependent_jobs(env, job) _logger.debug("%s enqueue depends done", job) - return "" - - def _get_failure_values(self, job, traceback_txt, orig_exception): + @classmethod + def _get_failure_values(cls, job, traceback_txt, orig_exception): """Collect relevant data from exception.""" exception_name = orig_exception.__class__.__name__ if hasattr(orig_exception, "__module__"): @@ -167,6 +174,22 @@ def _get_failure_values(self, job, traceback_txt, orig_exception): "exc_message": exc_message, } + @http.route( + "/queue_job/runjob", + type="http", + auth="none", + save_session=False, + readonly=False, + ) + def runjob(self, db, job_uuid, **kw): + http.request.session.db = db + env = http.request.env(user=SUPERUSER_ID) + job = self._acquire_job(env, job_uuid) + if not job: + return "" + self._runjob(env, job) + return "" + # flake8: noqa: C901 @http.route("/queue_job/create_test_job", type="http", auth="user") def create_test_job( @@ -177,6 +200,7 @@ def create_test_job( description="Test job", size=1, failure_rate=0, + job_duration=0, ): if not http.request.env.user.has_group("base.group_erp_manager"): raise Forbidden(http.request.env._("Access Denied")) @@ -187,6 +211,12 @@ def create_test_job( except (ValueError, TypeError): failure_rate = 0 + if job_duration is not None: + try: + job_duration = float(job_duration) + except (ValueError, TypeError): + job_duration = 0 + if not (0 <= failure_rate <= 1): raise BadRequest("failure_rate must be between 0 and 1") @@ -215,6 +245,7 @@ def create_test_job( channel=channel, description=description, failure_rate=failure_rate, + job_duration=job_duration, ) if size > 1: @@ -225,6 +256,7 @@ def create_test_job( channel=channel, description=description, failure_rate=failure_rate, + job_duration=job_duration, ) return "" @@ -236,6 +268,7 @@ def _create_single_test_job( description="Test job", size=1, failure_rate=0, + job_duration=0, ): delayed = ( http.request.env["queue.job"] @@ -245,7 +278,7 @@ def _create_single_test_job( channel=channel, description=description, ) - ._test_job(failure_rate=failure_rate) + ._test_job(failure_rate=failure_rate, job_duration=job_duration) ) return f"job uuid: {delayed.db_record().uuid}" @@ -259,6 +292,7 @@ def _create_graph_test_jobs( channel=None, description="Test job", failure_rate=0, + job_duration=0, ): model = http.request.env["queue.job"] current_count = 0 @@ -281,7 +315,7 @@ def _create_graph_test_jobs( max_retries=max_retries, channel=channel, description=f"{description} #{current_count}", - )._test_job(failure_rate=failure_rate) + )._test_job(failure_rate=failure_rate, job_duration=job_duration) ) grouping = random.choice(possible_grouping_methods) diff --git a/queue_job/job.py b/queue_job/job.py index 48a7561553..3eca2d2661 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -222,7 +222,7 @@ def load_many(cls, env, job_uuids): recordset = cls.db_records_from_uuids(env, job_uuids) return {cls._load_from_db_record(record) for record in recordset} - def add_lock_record(self): + def add_lock_record(self) -> None: """ Create row in db to be locked while the job is being performed. """ @@ -242,13 +242,11 @@ def add_lock_record(self): [self.uuid], ) - def lock(self): - """ - Lock row of job that is being performed + def lock(self) -> bool: + """Lock row of job that is being performed. - If a job cannot be locked, - it means that the job wasn't started, - a RetryableJobError is thrown. + Return False if a job cannot be locked: it means that the job is not in + STARTED state or is already locked by another worker. """ self.env.cr.execute( """ @@ -264,18 +262,15 @@ def lock(self): queue_job WHERE uuid = %s - AND state='started' + AND state = %s ) - FOR UPDATE; + FOR NO KEY UPDATE SKIP LOCKED; """, - [self.uuid], + [self.uuid, STARTED], ) # 1 job should be locked - if 1 != len(self.env.cr.fetchall()): - raise RetryableJobError( - f"Trying to lock job that wasn't started, uuid: {self.uuid}" - ) + return bool(self.env.cr.fetchall()) @classmethod def _load_from_db_record(cls, job_db_record): diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index b8cfe979cc..681d03fadf 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -357,23 +357,26 @@ def _query_requeue_dead_jobs(self): ELSE exc_info END) WHERE - id in ( - SELECT - queue_job_id - FROM - queue_job_lock - WHERE - queue_job_id in ( - SELECT - id - FROM - queue_job - WHERE - state IN ('enqueued','started') - AND date_enqueued < - (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') - ) - FOR UPDATE SKIP LOCKED + state IN ('enqueued','started') + AND date_enqueued < (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') + AND ( + id in ( + SELECT + queue_job_id + FROM + queue_job_lock + WHERE + queue_job_lock.queue_job_id = queue_job.id + FOR NO KEY UPDATE SKIP LOCKED + ) + OR NOT EXISTS ( + SELECT + 1 + FROM + queue_job_lock + WHERE + queue_job_lock.queue_job_id = queue_job.id + ) ) RETURNING uuid """ @@ -396,6 +399,12 @@ def requeue_dead_jobs(self): However, when the Odoo server crashes or is otherwise force-stopped, running jobs are interrupted while the runner has no chance to know they have been aborted. + + This also handles orphaned jobs (enqueued but never started, no lock). + This edge case occurs when the runner marks a job as 'enqueued' + but the HTTP request to start the job never reaches the Odoo server + (e.g., due to server shutdown/crash between setting enqueued and + the controller receiving the request). """ with closing(self.conn.cursor()) as cr: diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index b1a5dcaf7b..20eeaf02b3 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -3,6 +3,7 @@ import logging import random +import time from datetime import datetime, timedelta from odoo import api, exceptions, fields, models @@ -17,6 +18,7 @@ from ..job import ( CANCELLED, DONE, + ENQUEUED, FAILED, PENDING, STARTED, @@ -101,6 +103,7 @@ class QueueJob(models.Model): date_done = fields.Datetime(readonly=True) exec_time = fields.Float( string="Execution Time (avg)", + readonly=True, aggregator="avg", help="Time required to execute this job in seconds. Average when grouped.", ) @@ -328,18 +331,26 @@ def _change_job_state(self, state, result=None): raise ValueError(msg) def button_done(self): + # If job was set to STARTED or CANCELLED, do not set it to DONE + states_from = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, FAILED) result = self.env._("Manually set to done by %s", self.env.user.name) - self._change_job_state(DONE, result=result) + records = self.filtered(lambda job_: job_.state in states_from) + records._change_job_state(DONE, result=result) return True def button_cancelled(self): + # If job was set to DONE or WAIT_DEPENDENCIES, do not cancel it + states_from = (PENDING, ENQUEUED, FAILED) result = self.env._("Cancelled by %s", self.env.user.name) - self._change_job_state(CANCELLED, result=result) + records = self.filtered(lambda job_: job_.state in states_from) + records._change_job_state(CANCELLED, result=result) return True def requeue(self): - jobs_to_requeue = self.filtered(lambda job_: job_.state != WAIT_DEPENDENCIES) - jobs_to_requeue._change_job_state(PENDING) + # If job is already in queue or started, do not requeue it + states_from = (FAILED, DONE, CANCELLED) + records = self.filtered(lambda job_: job_.state in states_from) + records._change_job_state(PENDING) return True def _message_post_on_failure(self): @@ -447,7 +458,9 @@ def related_action_open_record(self): ) return action - def _test_job(self, failure_rate=0): + def _test_job(self, failure_rate=0, job_duration=0): _logger.info("Running test job.") if random.random() <= failure_rate: raise JobError("Job failed") + if job_duration: + time.sleep(job_duration) diff --git a/queue_job/readme/ROADMAP.md b/queue_job/readme/ROADMAP.md index a13be6beb3..df33142b88 100644 --- a/queue_job/readme/ROADMAP.md +++ b/queue_job/readme/ROADMAP.md @@ -1,17 +1,2 @@ - After creating a new database or installing `queue_job` on an existing database, Odoo must be restarted for the runner to detect it. -- When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - `started` or `enqueued` state after the Odoo server is halted. Since - the runner has no way to know if they are actually running or not, and - does not know for sure if it is safe to restart the jobs, it does not - attempt to restart them automatically. Such stale jobs therefore fill - the running queue and prevent other jobs to start. You must therefore - requeue them manually, either from the Jobs view, or by running the - following SQL statement *before starting Odoo*: - -``` sql -update queue_job set state='pending' where state in ('started', 'enqueued') -``` diff --git a/queue_job/tests/test_wizards.py b/queue_job/tests/test_wizards.py index 2ac162d313..7738836d2f 100644 --- a/queue_job/tests/test_wizards.py +++ b/queue_job/tests/test_wizards.py @@ -46,3 +46,60 @@ def test_03_done(self): wizard = self._wizard("queue.jobs.to.done") wizard.set_done() self.assertEqual(self.job.state, "done") + + def test_04_requeue_forbidden(self): + wizard = self._wizard("queue.requeue.job") + + # State WAIT_DEPENDENCIES is not requeued + self.job.state = "wait_dependencies" + wizard.requeue() + self.assertEqual(self.job.state, "wait_dependencies") + + # State PENDING, ENQUEUED or STARTED are ignored too + for test_state in ("pending", "enqueued", "started"): + self.job.state = test_state + wizard.requeue() + self.assertEqual(self.job.state, test_state) + + # States CANCELLED, DONE or FAILED will change status + self.job.state = "cancelled" + wizard.requeue() + self.assertEqual(self.job.state, "pending") + + def test_05_cancel_forbidden(self): + wizard = self._wizard("queue.jobs.to.cancelled") + + # State WAIT_DEPENDENCIES is not cancelled + self.job.state = "wait_dependencies" + wizard.set_cancelled() + self.assertEqual(self.job.state, "wait_dependencies") + + # State DONE is not cancelled + self.job.state = "done" + wizard.set_cancelled() + self.assertEqual(self.job.state, "done") + + # State PENDING, ENQUEUED or FAILED will be cancelled + for test_state in ("pending", "enqueued"): + self.job.state = test_state + wizard.set_cancelled() + self.assertEqual(self.job.state, "cancelled") + + def test_06_done_forbidden(self): + wizard = self._wizard("queue.jobs.to.done") + + # State STARTED is not set DONE manually + self.job.state = "started" + wizard.set_done() + self.assertEqual(self.job.state, "started") + + # State CANCELLED is not cancelled + self.job.state = "cancelled" + wizard.set_done() + self.assertEqual(self.job.state, "cancelled") + + # State WAIT_DEPENDENCIES, PENDING, ENQUEUED or FAILED will be set to DONE + for test_state in ("wait_dependencies", "pending", "enqueued"): + self.job.state = test_state + wizard.set_done() + self.assertEqual(self.job.state, "done") diff --git a/queue_job/wizards/queue_jobs_to_cancelled.py b/queue_job/wizards/queue_jobs_to_cancelled.py index 9e73374ebd..bb9f831576 100644 --- a/queue_job/wizards/queue_jobs_to_cancelled.py +++ b/queue_job/wizards/queue_jobs_to_cancelled.py @@ -10,8 +10,8 @@ class SetJobsToCancelled(models.TransientModel): _description = "Cancel all selected jobs" def set_cancelled(self): - jobs = self.job_ids.filtered( - lambda x: x.state in ("pending", "failed", "enqueued") - ) + # Only jobs with state PENDING, FAILED, ENQUEUED + # will change to CANCELLED + jobs = self.job_ids jobs.button_cancelled() return {"type": "ir.actions.act_window_close"} diff --git a/queue_job/wizards/queue_jobs_to_done.py b/queue_job/wizards/queue_jobs_to_done.py index ff1366ffed..caf8129213 100644 --- a/queue_job/wizards/queue_jobs_to_done.py +++ b/queue_job/wizards/queue_jobs_to_done.py @@ -10,6 +10,8 @@ class SetJobsToDone(models.TransientModel): _description = "Set all selected jobs to done" def set_done(self): + # Only jobs with state WAIT_DEPENDENCIES, PENDING, ENQUEUED or FAILED + # will change to DONE jobs = self.job_ids jobs.button_done() return {"type": "ir.actions.act_window_close"} diff --git a/queue_job/wizards/queue_requeue_job.py b/queue_job/wizards/queue_requeue_job.py index 67d2ffcbdc..a88256300f 100644 --- a/queue_job/wizards/queue_requeue_job.py +++ b/queue_job/wizards/queue_requeue_job.py @@ -20,6 +20,7 @@ def _default_job_ids(self): ) def requeue(self): + # Only jobs with state FAILED, DONE or CANCELLED will change to PENDING jobs = self.job_ids jobs.requeue() return {"type": "ir.actions.act_window_close"} diff --git a/test_queue_job/__manifest__.py b/test_queue_job/__manifest__.py index 3cf7243aa7..98b8a0d485 100644 --- a/test_queue_job/__manifest__.py +++ b/test_queue_job/__manifest__.py @@ -15,5 +15,6 @@ "security/ir.model.access.csv", "data/queue_job_test_job.xml", ], + "maintainers": ["sbidoul"], "installable": True, } diff --git a/test_queue_job/tests/__init__.py b/test_queue_job/tests/__init__.py index 62347148e5..0cfacebdf3 100644 --- a/test_queue_job/tests/__init__.py +++ b/test_queue_job/tests/__init__.py @@ -1,3 +1,4 @@ +from . import test_acquire_job from . import test_autovacuum from . import test_delayable from . import test_dependencies diff --git a/test_queue_job/tests/common.py b/test_queue_job/tests/common.py index 335c072625..d3173a2198 100644 --- a/test_queue_job/tests/common.py +++ b/test_queue_job/tests/common.py @@ -20,3 +20,13 @@ def _create_job(self): stored = Job.db_records_from_uuids(self.env, [test_job.uuid]) self.assertEqual(len(stored), 1) return stored + + def _get_demo_job(self, uuid): + # job created during load of demo data + job = self.env["queue.job"].search([("uuid", "=", uuid)], limit=1) + self.assertTrue( + job, + f"Demo data queue job {uuid!r} should be loaded in order " + "to make this test work", + ) + return job diff --git a/test_queue_job/tests/test_acquire_job.py b/test_queue_job/tests/test_acquire_job.py new file mode 100644 index 0000000000..3f0c92a2be --- /dev/null +++ b/test_queue_job/tests/test_acquire_job.py @@ -0,0 +1,51 @@ +# Copyright 2026 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +import logging +from unittest import mock + +from odoo.tests import tagged + +from odoo.addons.queue_job.controllers.main import RunJobController + +from .common import JobCommonCase + + +@tagged("post_install", "-at_install") +class TestRequeueDeadJob(JobCommonCase): + def test_acquire_enqueued_job(self): + job_record = self._get_demo_job(uuid="test_enqueued_job") + self.assertFalse( + self.env["queue.job.lock"].search( + [("queue_job_id", "=", job_record.id)], + ), + "A job lock record should not exist at this point", + ) + with mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit: + job = RunJobController._acquire_job(self.env, job_uuid="test_enqueued_job") + mock_commit.assert_called_once() + self.assertIsNotNone(job) + self.assertEqual(job.uuid, "test_enqueued_job") + self.assertEqual(job.state, "started") + self.assertTrue( + self.env["queue.job.lock"].search( + [("queue_job_id", "=", job_record.id)] + ), + "A job lock record should exist at this point", + ) + + def test_acquire_started_job(self): + with ( + mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit, + self.assertLogs(level=logging.WARNING) as logs, + ): + job = RunJobController._acquire_job(self.env, "test_started_job") + mock_commit.assert_not_called() + self.assertIsNone(job) + self.assertIn( + "was requested to run job test_started_job, but it does not exist", + logs.output[0], + ) diff --git a/test_queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py index a6328fed76..a267c43c87 100644 --- a/test_queue_job/tests/test_requeue_dead_job.py +++ b/test_queue_job/tests/test_requeue_dead_job.py @@ -13,23 +13,6 @@ @tagged("post_install", "-at_install") class TestRequeueDeadJob(JobCommonCase): - def _get_demo_job(self, uuid): - # job created during load of demo data - job = self.env["queue.job"].search( - [ - ("uuid", "=", uuid), - ], - limit=1, - ) - - self.assertTrue( - job, - f"Demo data queue job {uuid} should be loaded in order" - " to make this tests work", - ) - - return job - def get_locks(self, uuid, cr=None): """ Retrieve lock rows @@ -52,7 +35,7 @@ def get_locks(self, uuid, cr=None): WHERE uuid = %s ) - FOR UPDATE SKIP LOCKED + FOR NO KEY UPDATE SKIP LOCKED """, [uuid], ) @@ -99,3 +82,19 @@ def test_requeue_dead_jobs(self): uuids_requeued = self.env.cr.fetchall() self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued) + + def test_requeue_orphaned_jobs(self): + queue_job = self._get_demo_job("test_enqueued_job") + job_obj = Job.load(self.env, queue_job.uuid) + + # Only enqueued job, don't set it to started to simulate the scenario + # that system shutdown before job is starting + job_obj.set_enqueued() + job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) + job_obj.store() + + # job is now picked up by the requeue query (which includes orphaned jobs) + query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() + self.env.cr.execute(query) + uuids_requeued = self.env.cr.fetchall() + self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued)