Skip to content

Commit

Permalink
fix race condition in SupervisorProxyServer
Browse files Browse the repository at this point in the history
  • Loading branch information
julien6387 committed Jan 6, 2025
1 parent 8c9ab90 commit db46939
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 16 deletions.
4 changes: 3 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

## 0.19 (2025-xx-xx)

* Fix a random Python crash due to race condition when restarting **Supvisors**.

* Fix the statistics compiler, so that it manages the **Supvisors** instances discovered.
*

* Refactoring of the **Supvisors** internal state machine.
The state `INITIALIZATION` has been renamed as `SYNCHRONIZATION` and a new state `ELECTION` has been added
to get more stability in the *Master* election.
Expand Down
37 changes: 22 additions & 15 deletions supvisors/internal_com/supervisorproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ def run(self):
if self.status.identifier != self.local_identifier and self.status.has_active_state():
origin = self._get_origin(self.status.identifier)
message = NotificationHeaders.INSTANCE_FAILURE.value, None
# FIXME: using the proxy_server from another may cause issues when the proxies are being closed
self.supvisors.rpc_handler.proxy_server.push_notification((origin, message))
self.logger.debug('SupervisorProxyThread.run: exiting main loop'
f' for identifier={self.status.usage_identifier}')
Expand Down Expand Up @@ -372,6 +371,8 @@ def __init__(self, supvisors):
""" Initialization of the attributes. """
self.supvisors = supvisors
self.proxies: Dict[str, SupervisorProxyThread] = {}
# do not allow the creation of a new proxy when stop is requested
self.stop_event: threading.Event = threading.Event()

@property
def logger(self) -> Logger:
Expand Down Expand Up @@ -405,6 +406,9 @@ def get_proxy(self, identifier: str) -> Optional[SupervisorProxyThread]:

def stop(self):
""" Stop all the proxy threads. """
# prevent the creation of new threads, especially for INSTANCE_FAILURE notification
self.stop_event.set()
# stop all threads
self.logger.debug(f'SupervisorProxyServer.stop: {list(self.proxies.keys())}')
for proxy in self.proxies.values():
proxy.stop()
Expand All @@ -415,31 +419,34 @@ def push_request(self, identifier: str, message):
""" Send an XML-RPC request to a Supervisor proxy.
:param identifier: the identifier of the Supvisors instance to request.
:param message: the message to send.
:param message: the message to push.
:return: None.
"""
proxy = self.get_proxy(identifier)
if proxy:
proxy.push_message((InternalEventHeaders.REQUEST, (self.local_identifier, message)))
if not self.stop_event.is_set():
proxy = self.get_proxy(identifier)
if proxy:
proxy.push_message((InternalEventHeaders.REQUEST, (self.local_identifier, message)))

def push_publication(self, message):
""" Send a publication to all remote Supervisor proxies.
:param message: the message to send.
:param message: the message to publish.
:return: None.
"""
for identifier in self.supvisors.mapper.instances:
# No publication to self instance because the event has already been processed.
if identifier != self.local_identifier:
proxy = self.get_proxy(identifier)
if proxy:
proxy.push_message((InternalEventHeaders.PUBLICATION, (self.local_identifier, message)))
if not self.stop_event.is_set():
for identifier in self.supvisors.mapper.instances:
# No publication to self instance because the event has already been processed.
if identifier != self.local_identifier:
proxy = self.get_proxy(identifier)
if proxy:
proxy.push_message((InternalEventHeaders.PUBLICATION, (self.local_identifier, message)))

def push_notification(self, message):
""" Send a discovery event to all remote Supervisor proxies.
:param message: the message to send.
:param message: the message to notify.
:return: None.
"""
proxy = self.get_proxy(self.local_identifier)
proxy.push_message((InternalEventHeaders.NOTIFICATION, message))
if not self.stop_event.is_set():
proxy = self.get_proxy(self.local_identifier)
proxy.push_message((InternalEventHeaders.NOTIFICATION, message))
11 changes: 11 additions & 0 deletions supvisors/tests/test_supervisorproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ def test_proxy_server_creation(supvisors_instance, proxy_server):
""" Test the SupervisorProxyServer creation. """
assert proxy_server.supvisors is supvisors_instance
assert proxy_server.proxies == {}
assert not proxy_server.stop_event.is_set()
assert proxy_server.local_identifier == supvisors_instance.mapper.local_identifier


Expand Down Expand Up @@ -675,6 +676,7 @@ def test_proxy_server_get_proxy(supvisors_instance, mocked_rpc, proxy_server):
assert proxy_server.proxies == {'10.0.0.2:25000': proxy_2bis}
# test stop
proxy_server.stop()
assert proxy_server.stop_event.is_set()
assert proxy_server.proxies == {'10.0.0.2:25000': proxy_2bis}
assert not proxy_2bis.is_alive()

Expand Down Expand Up @@ -703,8 +705,15 @@ def test_server_proxy_push_message(mocker, supvisors_instance, mocked_rpc, proxy
assert sorted(proxy_server.proxies.keys()) == ['10.0.0.1:25000']
# stop all and reset
proxy_server.stop()
assert proxy_server.stop_event.is_set()
proxy_server.proxies = {}
# stopped flag is set, so proxy creation is disabled
proxy_server.push_request('10.0.0.1:25000', {'message': 'test request'})
proxy_server.push_publication({'message': 'test publish'})
proxy_server.push_notification({'message': 'test discovery'})
assert not mocked_push.called
# test publish
proxy_server.stop_event.clear()
proxy_server.push_publication({'message': 'test publish'})
assert len(mocked_push.call_args_list) == 3
for called in mocked_push.call_args_list:
Expand All @@ -713,7 +722,9 @@ def test_server_proxy_push_message(mocker, supvisors_instance, mocked_rpc, proxy
mocked_push.reset_mock()
# stop all and reset
proxy_server.stop()
assert proxy_server.stop_event.is_set()
proxy_server.proxies = {}
proxy_server.stop_event.clear()
# test post_discovery
proxy_server.push_notification({'message': 'test discovery'})
assert mocked_push.call_args_list == [call((InternalEventHeaders.NOTIFICATION, {'message': 'test discovery'}))]
Expand Down

0 comments on commit db46939

Please sign in to comment.