Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMP] Multi-node high-availability jobrunner #607

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions queue_job/README.rst
Original file line number Diff line number Diff line change
@@ -132,6 +132,23 @@ Configuration
.. [1] It works with the threaded Odoo server too, although this way
of running Odoo is obviously not for production purposes.

* Deploying in high availability mode or odoo.sh:

When deploying queue_job on multiple nodes or on odoo.sh, on top of the configuration
parameters mentioned above you need to also set the env variable
ODOO_QUEUE_JOB_HIGH_AVAILABILITY=1 or via config parameter as such:

.. code-block:: ini

(...)
[queue_job]
high_availability = 1


> :warning: **Warning:** Failure to enable the high_availability flag on odoo.sh could
constitute a breach of Acceptable Use Policy. Always enable this flag via the odoo.conf file for odoo.sh
deployment

Usage
=====

@@ -571,6 +588,12 @@ Known issues / Roadmap
You must therefore requeue them manually, either from the Jobs view,
or by running the following SQL statement *before starting Odoo*:

* When deployed in high_availability mode the allocated databases for the
jobrunners must be identical. If the databases are different and overlap
i.e jobrunner A runs on DB1,DB2 and jobrunner B runs on DB2,DB3 then either
DB1 or DB3 will not proccess jobs because there can be only one leader per
sets of databases.

.. code-block:: sql

update queue_job set state='pending' where state in ('started', 'enqueued')
@@ -631,6 +654,8 @@ Contributors
* Souheil Bejaoui <souheil.bejaoui@acsone.eu>
* Eric Antones <eantones@nuobit.com>
* Simone Orsi <simone.orsi@camptocamp.com>
* Paul Catinean <pca@pledra.com>
* Ruchir Shukla <ruchir@bizzappdev.com>

Maintainers
~~~~~~~~~~~
92 changes: 82 additions & 10 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
@@ -145,6 +145,7 @@
import selectors
import threading
import time
import uuid
from contextlib import closing, contextmanager

import psycopg2
@@ -159,6 +160,7 @@

SELECT_TIMEOUT = 60
ERROR_RECOVERY_DELAY = 5
LEADER_CHECK_DELAY = 10

_logger = logging.getLogger(__name__)

@@ -192,7 +194,7 @@ def _odoo_now():
return _datetime_to_epoch(dt)


def _connection_info_for(db_name):
def _connection_info_for(db_name, jobrunner_ha_uuid=None):
db_or_uri, connection_info = odoo.sql_db.connection_info_for(db_name)

for p in ("host", "port", "user", "password"):
@@ -202,6 +204,8 @@ def _connection_info_for(db_name):

if cfg:
connection_info[p] = cfg
if jobrunner_ha_uuid:
connection_info["application_name"] = "jobrunner_%s" % jobrunner_ha_uuid

return connection_info

@@ -260,14 +264,13 @@ def urlopen():


class Database(object):
def __init__(self, db_name):
def __init__(self, db_name, jobrunner_ha_uuid=None):
self.db_name = db_name
connection_info = _connection_info_for(db_name)
self.jobrunner_ha_uuid = jobrunner_ha_uuid
connection_info = _connection_info_for(db_name, self.jobrunner_ha_uuid)
self.conn = psycopg2.connect(**connection_info)
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.has_queue_job = self._has_queue_job()
if self.has_queue_job:
self._initialize()

def close(self):
# pylint: disable=except-pass
@@ -280,6 +283,41 @@ def close(self):
pass
self.conn = None

def _check_leader(self, jobrunner_db_names):
"""Check if the linked jobrunner is the leader of all jobrunner_db_names"""
if not self.jobrunner_ha_uuid:
return False

with closing(self.conn.cursor()) as cr:
cr.execute(
"""
SELECT substring(application_name FROM 'jobrunner_(.*)')
FROM pg_stat_activity
WHERE application_name LIKE 'jobrunner_%%' AND
datname IN %s
ORDER BY backend_start, datname
LIMIT 1;""",
(jobrunner_db_names,),
)
res = cr.fetchone()
leader_uuid = res[0] if res else ""
if leader_uuid != self.jobrunner_ha_uuid:
_logger.debug(
"jobrunner %s: not leader of db(s) [ %s ]. leader: %s. sleeping %s sec.",
self.jobrunner_ha_uuid,
", ".join(jobrunner_db_names),
leader_uuid,
LEADER_CHECK_DELAY,
)
return False

_logger.info(
"jobrunner %s is now the leader of db(s) [ %s ]",
self.jobrunner_ha_uuid,
", ".join(jobrunner_db_names),
)
return True

def _has_queue_job(self):
with closing(self.conn.cursor()) as cr:
cr.execute(
@@ -353,6 +391,7 @@ def __init__(
user=None,
password=None,
channel_config_string=None,
high_availability=None,
):
self.scheme = scheme
self.host = host
@@ -363,6 +402,10 @@ def __init__(
if channel_config_string is None:
channel_config_string = _channels()
self.channel_manager.simple_configure(channel_config_string)
self.uuid = False
if high_availability:
self.uuid = str(uuid.uuid4())
_logger.info("jobrunner %s initialized in HA mode", self.uuid)
self.db_by_name = {}
self._stop = False
self._stop_pipe = os.pipe()
@@ -388,12 +431,18 @@ def from_environ_or_config(cls):
password = os.environ.get(
"ODOO_QUEUE_JOB_HTTP_AUTH_PASSWORD"
) or queue_job_config.get("http_auth_password")
if "ODOO_QUEUE_JOB_HIGH_AVAILABILITY" in os.environ:
high_availability = str(os.environ["ODOO_QUEUE_JOB_HIGH_AVAILABILITY"])
else:
high_availability = str(queue_job_config.get("high_availability"))
high_availability = high_availability.lower() in ("true", "1", "t")
runner = cls(
scheme=scheme or "http",
host=host or "localhost",
port=port or 8069,
user=user,
password=password,
high_availability=high_availability,
)
return runner

@@ -421,15 +470,20 @@ def close_databases(self, remove_jobs=True):
_logger.warning("error closing database %s", db_name, exc_info=True)
self.db_by_name = {}

def initialize_runner(self):
"""Listen for db notifications and load existing jobs into memory"""
for db in self.db_by_name.values():
db._initialize()
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
for job_data in cr:
self.channel_manager.notify(db.db_name, *job_data)
_logger.info("queue job runner ready for db %s", db.db_name)

def initialize_databases(self):
for db_name in self.get_db_names():
db = Database(db_name)
db = Database(db_name, self.uuid)
if db.has_queue_job:
self.db_by_name[db_name] = db
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
for job_data in cr:
self.channel_manager.notify(db_name, *job_data)
_logger.info("queue job runner ready for db %s", db_name)

def run_jobs(self):
now = _odoo_now()
@@ -511,6 +565,17 @@ def stop(self):
# wakeup the select() in wait_notification
os.write(self._stop_pipe[1], b".")

def check_db_leader(self):
"""Check if the current jobrunner is the leader for all configured databases"""
jobrunner_db_names = tuple(self.db_by_name.keys())

if not jobrunner_db_names:
return False

# Use the first db connection to for leadership check
db_obj = self.db_by_name[jobrunner_db_names[0]]
return db_obj._check_leader(jobrunner_db_names)

def run(self):
_logger.info("starting")
while not self._stop:
@@ -520,6 +585,13 @@ def run(self):
# TODO: how to detect new databases or databases
# on which queue_job is installed after server start?
self.initialize_databases()
while not self._stop and self.uuid:
leader = self.check_db_leader()
if leader:
break
time.sleep(LEADER_CHECK_DELAY)
continue
self.initialize_runner()
_logger.info("database connections ready")
# inner loop does the normal processing
while not self._stop:
17 changes: 17 additions & 0 deletions queue_job/readme/CONFIGURE.rst
Original file line number Diff line number Diff line change
@@ -41,3 +41,20 @@

.. [1] It works with the threaded Odoo server too, although this way
of running Odoo is obviously not for production purposes.

* Deploying in high availability mode or odoo.sh:

When deploying queue_job on multiple nodes or on odoo.sh, on top of the configuration
parameters mentioned above you need to also set the env variable
ODOO_QUEUE_JOB_HIGH_AVAILABILITY=1 or via config parameter as such:

.. code-block:: ini

(...)
[queue_job]
high_availability = 1


> :warning: **Warning:** Failure to enable the high_availability flag on odoo.sh could
constitute a breach of Acceptable Use Policy. Always enable this flag via the odoo.conf file for odoo.sh
deployment
2 changes: 2 additions & 0 deletions queue_job/readme/CONTRIBUTORS.rst
Original file line number Diff line number Diff line change
@@ -10,3 +10,5 @@
* Souheil Bejaoui <souheil.bejaoui@acsone.eu>
* Eric Antones <eantones@nuobit.com>
* Simone Orsi <simone.orsi@camptocamp.com>
* Paul Catinean <pca@pledra.com>
* Ruchir Shukla <ruchir@bizzappdev.com>
6 changes: 6 additions & 0 deletions queue_job/readme/ROADMAP.rst
Original file line number Diff line number Diff line change
@@ -13,6 +13,12 @@
You must therefore requeue them manually, either from the Jobs view,
or by running the following SQL statement *before starting Odoo*:

* When deployed in high_availability mode the allocated databases for the
jobrunners must be identical. If the databases are different and overlap
i.e jobrunner A runs on DB1,DB2 and jobrunner B runs on DB2,DB3 then either
DB1 or DB3 will not proccess jobs because there can be only one leader per
sets of databases.

.. code-block:: sql

update queue_job set state='pending' where state in ('started', 'enqueued')
21 changes: 21 additions & 0 deletions queue_job/static/description/index.html
Original file line number Diff line number Diff line change
@@ -492,6 +492,20 @@ <h1><a class="toc-backref" href="#toc-entry-2">Configuration</a></h1>
of running Odoo is obviously not for production purposes.</td></tr>
</tbody>
</table>
<ul class="simple">
<li>Deploying in high availability mode or odoo.sh:</li>
</ul>
<p>When deploying queue_job on multiple nodes or on odoo.sh, on top of the configuration
parameters mentioned above you need to also set the env variable
ODOO_QUEUE_JOB_HIGH_AVAILABILITY=1 or via config parameter as such:</p>
<pre class="code ini literal-block">
<span class="na">(...)</span><span class="w">
</span><span class="k">[queue_job]</span><span class="w">
</span><span class="na">high_availability</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s">1</span>
</pre>
<p>&gt; :warning: <strong>Warning:</strong> Failure to enable the high_availability flag on odoo.sh could
constitute a breach of Acceptable Use Policy. Always enable this flag via the odoo.conf file for odoo.sh
deployment</p>
</div>
<div class="section" id="usage">
<h1><a class="toc-backref" href="#toc-entry-3">Usage</a></h1>
@@ -871,6 +885,11 @@ <h1><a class="toc-backref" href="#toc-entry-11">Known issues / Roadmap</a></h1>
therefore fill the running queue and prevent other jobs to start.
You must therefore requeue them manually, either from the Jobs view,
or by running the following SQL statement <em>before starting Odoo</em>:</li>
<li>When deployed in high_availability mode the allocated databases for the
jobrunners must be identical. If the databases are different and overlap
i.e jobrunner A runs on DB1,DB2 and jobrunner B runs on DB2,DB3 then either
DB1 or DB3 will not proccess jobs because there can be only one leader per
sets of databases.</li>
</ul>
<pre class="code sql literal-block">
<span class="k">update</span><span class="w"> </span><span class="n">queue_job</span><span class="w"> </span><span class="k">set</span><span class="w"> </span><span class="k">state</span><span class="o">=</span><span class="s1">'pending'</span><span class="w"> </span><span class="k">where</span><span class="w"> </span><span class="k">state</span><span class="w"> </span><span class="k">in</span><span class="w"> </span><span class="p">(</span><span class="s1">'started'</span><span class="p">,</span><span class="w"> </span><span class="s1">'enqueued'</span><span class="p">)</span>
@@ -930,6 +949,8 @@ <h2><a class="toc-backref" href="#toc-entry-17">Contributors</a></h2>
<li>Souheil Bejaoui &lt;<a class="reference external" href="mailto:souheil.bejaoui&#64;acsone.eu">souheil.bejaoui&#64;acsone.eu</a>&gt;</li>
<li>Eric Antones &lt;<a class="reference external" href="mailto:eantones&#64;nuobit.com">eantones&#64;nuobit.com</a>&gt;</li>
<li>Simone Orsi &lt;<a class="reference external" href="mailto:simone.orsi&#64;camptocamp.com">simone.orsi&#64;camptocamp.com</a>&gt;</li>
<li>Paul Catinean &lt;<a class="reference external" href="mailto:pca&#64;pledra.com">pca&#64;pledra.com</a>&gt;</li>
<li>Ruchir Shukla &lt;<a class="reference external" href="mailto:ruchir&#64;bizzappdev.com">ruchir&#64;bizzappdev.com</a>&gt;</li>
</ul>
</div>
<div class="section" id="maintainers">