Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle SystemExit gracefully in two phase commit mode #21

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ CHANGES
2.0.2 (unreleased)
------------------

- Nothing changed yet.
- Handle SystemExit gracefully if raised while PJDataManager is
joining the transaction in two-phase-commit mode.


2.0.1 (2020-10-13)
Expand Down
14 changes: 14 additions & 0 deletions src/pjpersist/datamanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,20 @@ def _begin(self, transaction):
except psycopg2.Error as e:
check_for_disconnect(e, 'PJDataManager._begin')
raise
except BaseException as exc:
# There's a corner case when SystemExit is raised while
# tpc_begin is still being processed. Because we don't
# know if that operation completed, we need to check
# transaction status so we can clean up correctly.
if self._conn.status == psycopg2.extensions.STATUS_BEGIN:
self._tpc_activated = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what this have to do with SystemExit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If SystemExit is raised in just the right time, _tpc_activated flag was not being set (because of exception) even though tpc_begin had just completed. So when PJDataManager tried to clean up, it called _conn.rollback() - this is invalid call for a connection in status STATUS_BEGIN. And so it failed with ProgrammingError

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SystemExit is just an exception we encountered in production, when a celery worker is being shut down while still trying to process some database task.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean you think this happens when SystemExit is raised right between tpc_begin() and _tpc_activated = True?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, this makes sense. but the code needs in addition:

  1. Comment, explaining why this code is there
  2. Logging message on warning level when we don't set _tpc_activated to True.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kedder re-arranged the code a bit and added logging. Hope it reads better now.

else:
LOG.warning(
"PJDataManager._begin: tried to call connection.tpc_begin,"
" but it did not change connection status to STATUS_BEGIN,"
" failing with a generic exception %s."
" Not setting TPC mode on PJDataManager.", exc)
raise
self._tpc_activated = True

def _log_rw_stats(self):
Expand Down
69 changes: 69 additions & 0 deletions src/pjpersist/tests/test_datamanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,74 @@ def cursor(self, abort=False):
transaction.commit()


class ConnectionProxy:
__conn__ = None
def __init__(self, conn, **kw):
self.__dict__['__conn__'] = conn
self.__dict__.update(kw)

def __getattr__(self, name):
return getattr(self.__conn__, name)

def __setattr__(self, name, value):
return setattr(self.__conn__, name, value)

class TwoPhaseCommitTestCase(testing.PJTestCase):

def setUp(self):
super(TwoPhaseCommitTestCase, self).setUp()

# get rid of the previous transaction
transaction.abort()

tpc_patch = mock.patch(
"pjpersist.datamanager.PJ_TWO_PHASE_COMMIT_ENABLED", True)
no_prep_patch = mock.patch(
"pjpersist.datamanager."
"CALL_TPC_PREPARE_ON_NO_WRITE_TRANSACTION", False)
log_patch = mock.patch(
"pjpersist.datamanager.LOG_READ_WRITE_TRANSACTION", True)
self.patches = [tpc_patch, no_prep_patch, log_patch]
for p in self.patches:
p.start()

self.conn = testing.getConnection(testing.DBNAME)

def tearDown(self):
for p in self.patches:
p.stop()

super(TwoPhaseCommitTestCase, self).tearDown()

def test_system_exit_while_joining_transaction_on_tpc_begin(self):
def tpc_begin(tx):
self.conn.tpc_begin(tx)
raise SystemExit(-241)
proxy = ConnectionProxy(self.conn, tpc_begin=tpc_begin)

with self.assertRaises(SystemExit):
datamanager.PJDataManager(proxy)

def test_system_exit_while_joining_transaction_before_tpc_begin(self):
def tpc_begin(tx):
raise SystemExit(-241)
proxy = ConnectionProxy(self.conn, tpc_begin=tpc_begin)

with self.assertRaises(SystemExit):
datamanager.PJDataManager(proxy)

def test_system_exit_while_joining_transaction_fail_rollback(self):
def tpc_begin(tx):
self.conn.tpc_begin(tx)
raise SystemExit(-241)
def tpc_rollback(tx):
raise psycopg2.OperationalError('boom')
proxy = ConnectionProxy(self.conn, tpc_begin=tpc_begin)

with self.assertRaises(SystemExit):
datamanager.PJDataManager(proxy)


def test_suite():
dtsuite = doctest.DocTestSuite(
setUp=testing.setUp, tearDown=testing.tearDown,
Expand All @@ -1622,4 +1690,5 @@ def test_suite():
unittest.makeSuite(QueryLoggingTestCase),
unittest.makeSuite(TransactionOptionsTestCase),
unittest.makeSuite(DirtyTestCase),
unittest.makeSuite(TwoPhaseCommitTestCase),
))