Skip to content

Commit df65924

Browse files
authored
cluster shutdown should handle waiting requests quit and shutdown client properly. (#65)
Cleanup noisy near-cache logs
1 parent 827a91a commit df65924

File tree

9 files changed

+68
-17
lines changed

9 files changed

+68
-17
lines changed

hazelcast/connection.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ def start(self):
191191
Starts sending periodic HeartBeat operations.
192192
"""
193193
def _heartbeat():
194+
if not self._client.lifecycle.is_live:
195+
return
194196
self._heartbeat()
195197
self._heartbeat_timer = self._client.reactor.add_timer(self._heartbeat_interval, _heartbeat)
196198

hazelcast/exception.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ class AuthenticationError(HazelcastError):
2727
pass
2828

2929

30+
class HazelcastClientNotActiveException(ValueError):
31+
"""
32+
A ValueError which is raised when Hazelcast client is not active during an invocation.
33+
"""
34+
pass
35+
36+
3037
@retryable
3138
class HazelcastInstanceNotActiveError(HazelcastError):
3239
"""

hazelcast/invocation.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from Queue import Queue
55
import functools
66
from hazelcast.exception import create_exception, HazelcastInstanceNotActiveError, is_retryable_error, TimeoutError, \
7-
AuthenticationError, TargetDisconnectedError
7+
AuthenticationError, TargetDisconnectedError, HazelcastClientNotActiveException
88
from hazelcast.future import Future
99
from hazelcast.lifecycle import LIFECYCLE_STATE_CONNECTED
1010
from hazelcast.protocol.client_message import LISTENER_FLAG
@@ -154,7 +154,6 @@ def invoke_smart(self, invocation, ignore_heartbeat=False):
154154
else: # send to random address
155155
addr = self._client.load_balancer.next_address()
156156
self._send_to_address(invocation, addr)
157-
158157
return invocation.future
159158

160159
def invoke_non_smart(self, invocation, ignore_heartbeat=False):
@@ -251,9 +250,11 @@ def _handle_event(self, invocation, message):
251250
self.logger.warn("Error handling event %s", message, exc_info=True)
252251

253252
def _handle_exception(self, invocation, error, traceback=None):
253+
if not self._client.lifecycle.is_live:
254+
invocation.set_exception(HazelcastClientNotActiveException(error.message), traceback)
255+
return
254256
if self.logger.isEnabledFor(logging.DEBUG):
255-
self.logger.debug("Got exception for request %s: %s: %s", invocation.request,
256-
type(error).__name__, error)
257+
self.logger.debug("Got exception for request %s: %s: %s", invocation.request, type(error).__name__, error)
257258
if isinstance(error, (AuthenticationError, IOError, HazelcastInstanceNotActiveError)):
258259
if self._try_retry(invocation):
259260
return
@@ -272,7 +273,6 @@ def _try_retry(self, invocation):
272273
return False
273274

274275
invoke_func = functools.partial(self.invoke, invocation)
275-
self.logger.debug("Rescheduling request %s to be retried in %s seconds", invocation.request,
276-
RETRY_WAIT_TIME_IN_SECONDS)
276+
self.logger.debug("Rescheduling request %s to be retried in %s seconds", invocation.request, RETRY_WAIT_TIME_IN_SECONDS)
277277
self._client.reactor.add_timer(RETRY_WAIT_TIME_IN_SECONDS, invoke_func)
278278
return True

hazelcast/near_cache.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,13 +159,12 @@ def _do_eviction_if_required(self):
159159

160160
if len(new_eviction_samples) == len(new_eviction_samples_cleaned): # did any item expired or do we need to evict
161161
try:
162-
self.logger.debug("Evicting key:{}".format(self._eviction_candidates[0].key))
163162
self.__delitem__(self._eviction_candidates[0].key)
164163
self._evicted_count += 1
165164
del self._eviction_candidates[0]
166165
except KeyError:
167166
# key may be evicted previously so just ignore it
168-
self.logger.debug("Trying to evict but key:{} already expired.".format(self._eviction_candidates[0].key))
167+
pass
169168

170169
def _find_new_random_samples(self):
171170
records = self.values() # has random order because of dict hash
@@ -206,12 +205,11 @@ def get_statistics(self):
206205

207206
def _clean_expired_record(self, key):
208207
try:
209-
self.logger.debug("Expiring key:{}".format(key))
210208
self.__delitem__(key)
211209
self._expired_count += 1
212210
except KeyError:
213211
# key may be evicted previously so just ignore it
214-
self.logger.debug("Trying to expire but key:{} already expired.".format(key))
212+
pass
215213

216214
def __repr__(self):
217215
return "NearCache[len:{}, evicted:{}]".format(self.__len__(), self._evicted_count)

hazelcast/partition.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def get_partition_owner(self, partition_id):
5151
"""
5252
if partition_id not in self.partitions:
5353
self._do_refresh()
54-
return self.partitions[partition_id]
54+
return self.partitions.get(partition_id, None)
5555

5656
def get_partition_id(self, key):
5757
"""
@@ -62,6 +62,8 @@ def get_partition_id(self, key):
6262
"""
6363
data = self._client.serialization_service.to_data(key)
6464
count = self.get_partition_count()
65+
if count <= 0:
66+
return 0
6567
return hash_to_index(data.get_partition_hash(), count)
6668

6769
def get_partition_count(self):
@@ -76,7 +78,7 @@ def get_partition_count(self):
7678

7779
def _get_partition_count_blocking(self):
7880
event = threading.Event()
79-
while not self.partitions:
81+
while not event.isSet():
8082
self._do_refresh(callback=lambda: event.set())
8183
event.wait(timeout=1)
8284

@@ -86,6 +88,8 @@ def _do_refresh(self, callback=None):
8688
connection = self._client.connection_manager.get_connection(address)
8789
if connection is None:
8890
self.logger.debug("Could not update partition thread as owner connection is not available.")
91+
if callback:
92+
callback()
8993
return
9094
request = client_get_partitions_codec.encode_request()
9195

hazelcast/reactor.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ def _loop(self):
4444
self.logger.exception("Error in Reactor Thread")
4545
# TODO: shutdown client
4646
return
47-
self.logger.debug("Reactor Thread exited.")
47+
self.logger.debug("Reactor Thread exited. %s" % self._timers.qsize())
48+
self._cleanup_all_timers()
4849

4950
def _check_timers(self):
5051
now = time.time()
@@ -71,6 +72,9 @@ def add_timer(self, delay, callback):
7172
return self.add_timer_absolute(delay + time.time(), callback)
7273

7374
def shutdown(self):
75+
if not self._is_live:
76+
return
77+
self._is_live = False
7478
for connection in self._map.values():
7579
try:
7680
connection.close(HazelcastError("Client is shutting down"))
@@ -80,7 +84,6 @@ def shutdown(self):
8084
else:
8185
raise
8286
self._map.clear()
83-
self._is_live = False
8487
self._thread.join()
8588

8689
def new_connection(self, address, connect_timeout, socket_options, connection_closed_callback, message_callback):
@@ -89,10 +92,19 @@ def new_connection(self, address, connect_timeout, socket_options, connection_cl
8992

9093
def _cleanup_timer(self, timer):
9194
try:
95+
self.logger.debug("Cancel timer %s" % timer)
9296
self._timers.queue.remove((timer.end, timer))
9397
except ValueError:
9498
pass
9599

100+
def _cleanup_all_timers(self):
101+
while not self._timers.empty():
102+
try:
103+
_, timer = self._timers.get_nowait()
104+
timer.timer_ended_cb()
105+
except Empty:
106+
return
107+
96108

97109
class AsyncoreConnection(Connection, asyncore.dispatcher):
98110
sent_protocol_bytes = False

run-tests.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ else
66
USER=""
77
fi
88

9-
HAZELCAST_VERSION="3.8.1-SNAPSHOT"
9+
HAZELCAST_VERSION="3.8.2-SNAPSHOT"
1010

1111
HAZELCAST_RC_VERSION="0.2-SNAPSHOT"
1212
SNAPSHOT_REPO="https://oss.sonatype.org/content/repositories/snapshots"
@@ -17,7 +17,7 @@ mvn dependency:get -DrepoUrl=${SNAPSHOT_REPO} -Dartifact=com.hazelcast:hazelcast
1717

1818
pip install -r test-requirements.txt ${USER}
1919

20-
nohup java -cp hazelcast-remote-controller-${HAZELCAST_RC_VERSION}.jar:hazelcast-${HAZELCAST_VERSION}.jar com.hazelcast.remotecontroller.Main>rc_stdout.log 2>rc_stderr.log &
20+
java -cp hazelcast-remote-controller-${HAZELCAST_RC_VERSION}.jar:hazelcast-${HAZELCAST_VERSION}.jar com.hazelcast.remotecontroller.Main>rc_stdout.log 2>rc_stderr.log &
2121

2222
sleep 15
2323

start-rc.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ else
66
USER=""
77
fi
88

9-
HAZELCAST_VERSION="3.8.1-SNAPSHOT"
9+
HAZELCAST_VERSION="3.8.2-SNAPSHOT"
1010

1111
HAZELCAST_RC_VERSION="0.2-SNAPSHOT"
1212
SNAPSHOT_REPO="https://oss.sonatype.org/content/repositories/snapshots"

tests/shutdown_test.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from hazelcast import ClientConfig
2+
from hazelcast.exception import HazelcastClientNotActiveException
3+
from tests.base import HazelcastTestCase
4+
from tests.util import configure_logging
5+
6+
7+
class ShutdownTest(HazelcastTestCase):
8+
rc = None
9+
10+
def setUp(self):
11+
configure_logging()
12+
self.rc = self.create_rc()
13+
self.cluster = self.create_cluster(self.rc)
14+
15+
def tearDown(self):
16+
self.shutdown_all_clients()
17+
self.rc.exit()
18+
19+
def test_shutdown_not_hang_on_member_closed(self):
20+
config = ClientConfig()
21+
member = self.cluster.start_member()
22+
client = self.create_client(config)
23+
my_map = client.get_map("test")
24+
my_map.put("key", "value").result()
25+
member.shutdown()
26+
with self.assertRaises(HazelcastClientNotActiveException):
27+
while True:
28+
my_map.get("key").result()

0 commit comments

Comments
 (0)