@@ -356,7 +356,6 @@ def find_group_node(self, group, hkey):
356
356
group_nodes .sort ()
357
357
358
358
idx = bisect .bisect_right (group_nodes , hkey ) - 1
359
- # logger.debug("IDX %d" % idx, hkey, kv_nodes, len(kv_nodes))
360
359
nuuid = group_nodes [idx ]
361
360
return nuuid
362
361
@@ -686,6 +685,18 @@ def set_suspect(self, suspect):
686
685
self .stack_suspect_broadcast (node )
687
686
688
687
688
+ # Define a function that will wait 10s to let the others nodes know that we did leave
689
+ # and then ask for a clean stop of the daemon
690
+ @staticmethod
691
+ def _bailout_after_leave ():
692
+ wait_time = 10
693
+ logger .info ('Waiting out %s seconds before exiting as we are set in leave state' % wait_time )
694
+ time .sleep (10 )
695
+ logger .info ('Exiting from a self leave message' )
696
+ # Will set self.interrupted = True to every thread that loop
697
+ stopper .do_stop ('Exiting from a leave massage' )
698
+
699
+
689
700
# Someone ask us about a leave node, so believe it
690
701
# Leave node are about all states, so we don't filter by current state
691
702
# if the incarnation is ok, we believe it
@@ -739,19 +750,7 @@ def set_leave(self, leaved, force=False):
739
750
self ._set_myself_atomic_property ('state' , state )
740
751
self .increase_incarnation_and_broadcast ()
741
752
742
-
743
- # Define a function that will wait 10s to let the others nodes know that we did leave
744
- # and then ask for a clean stop of the daemon
745
- def bailout_after_leave (self ):
746
- wait_time = 10
747
- logger .info ('Waiting out %s seconds before exiting as we are set in leave state' % wait_time )
748
- time .sleep (10 )
749
- logger .info ('Exiting from a self leave message' )
750
- # Will set self.interrupted = True to every thread that loop
751
- stopper .do_stop ('Exiting from a leave massage' )
752
-
753
-
754
- threader .create_and_launch (bailout_after_leave , args = (self ,), name = 'Exiting agent after set to leave' , part = 'agent' )
753
+ threader .create_and_launch (self ._bailout_after_leave , name = 'Exiting agent after set to leave' , part = 'agent' )
755
754
return
756
755
757
756
logger .info ('LEAVING: The node %s is leaving' % node ['name' ])
@@ -1026,12 +1025,6 @@ def ping_another_nodes(self):
1026
1025
# but talk to us
1027
1026
# also exclude leave node, because thay said they are not here anymore ^^
1028
1027
def ping_another (self ):
1029
- # Only launch one parallel ping in the same time, max2 if we have thread
1030
- # that mess up with this flag :)
1031
- # if self.ping_another_in_progress:
1032
- # return
1033
- # self.ping_another_in_progress = True
1034
-
1035
1028
possible_nodes = self .__get_valid_nodes_to_ping ()
1036
1029
1037
1030
# first previously deads
@@ -1048,7 +1041,6 @@ def ping_another(self):
1048
1041
other = random .choice (possible_nodes )
1049
1042
self .__do_ping (other )
1050
1043
# Ok we did finish to ping another
1051
- # self.ping_another_in_progress = False
1052
1044
1053
1045
1054
1046
# Launch a ping to another node and if fail set it as suspect
@@ -1061,7 +1053,6 @@ def __do_ping(self, other):
1061
1053
if zonemgr .is_top_zone_from (self .zone , other_zone_name ):
1062
1054
ping_zone = self .zone
1063
1055
ping_payload = {'type' : PACKET_TYPES .PING , 'seqno' : 0 , 'node' : other ['uuid' ], 'from_zone' : self .zone , 'from' : self .uuid }
1064
- # print "PREPARE PING", ping_payload, other
1065
1056
message = jsoner .dumps (ping_payload )
1066
1057
encrypter = libstore .get_encrypter ()
1067
1058
enc_message = encrypter .encrypt (message , dest_zone_name = ping_zone )
@@ -1090,7 +1081,6 @@ def __do_ping(self, other):
1090
1081
self .set_suspect (other )
1091
1082
except (socket .timeout , socket .gaierror ) as exp :
1092
1083
logger .info ("PING: error joining the other node %s:%s : %s. Switching to a indirect ping mode." % (addr , port , exp ))
1093
- # with self.nodes_lock:
1094
1084
possible_relays = [n for n in self .nodes .values () if
1095
1085
n ['uuid' ] != self .uuid
1096
1086
and n != other
@@ -1222,7 +1212,6 @@ def do_indirect_ping(self, tgt, _from, addr):
1222
1212
if zonemgr .is_top_zone_from (self .zone , nfrom_zone ):
1223
1213
nfrom_zone = self .zone
1224
1214
enc_ret_msg = encrypter .encrypt (ret_msg , dest_zone_name = nfrom_zone )
1225
- # sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
1226
1215
sock .sendto (enc_ret_msg , addr )
1227
1216
sock .close ()
1228
1217
except (socket .timeout , socket .gaierror ) as exp :
@@ -1601,16 +1590,13 @@ def get_nodes_for_push_pull_response(self, other_node_zone):
1601
1590
# set the node as dead, and broadcast the information to everyone
1602
1591
def look_at_deads (self ):
1603
1592
# suspect a node for 5 * log(n+1) * interval
1604
- # with self.nodes_lock:
1605
1593
node_scale = math .ceil (math .log10 (float (len (self .nodes ) + 1 )))
1606
1594
probe_interval = 1
1607
1595
suspicion_mult = 5
1608
1596
suspect_timeout = suspicion_mult * node_scale * probe_interval
1609
1597
leave_timeout = suspect_timeout * 30 # something like 300s
1610
1598
1611
- # print "SUSPECT timeout", suspect_timeout
1612
1599
now = int (time .time ())
1613
- # with self.nodes_lock:
1614
1600
for node in self .nodes .values ():
1615
1601
# Only look at suspect nodes of course...
1616
1602
if node ['state' ] != NODE_STATES .SUSPECT :
@@ -1688,9 +1674,6 @@ def create_leave_msg(self, node):
1688
1674
return r
1689
1675
1690
1676
1691
- # def create_new_ts_msg(self, key):
1692
- # return {'type': '/ts/new', 'from': self.uuid, 'key': key}
1693
-
1694
1677
def stack_alive_broadcast (self , node ):
1695
1678
msg = self .create_alive_msg (node )
1696
1679
# Node messages are before all others
@@ -1710,12 +1693,6 @@ def stack_event_broadcast(self, payload, prioritary=False):
1710
1693
return
1711
1694
1712
1695
1713
- # def stack_new_ts_broadcast(self, key):
1714
- # msg = self.create_new_ts_msg(key)
1715
- # b = {'send': 0, 'msg': msg, 'groups': 'ts'}
1716
- # broadcaster.append(b)
1717
- # return
1718
-
1719
1696
def stack_suspect_broadcast (self , node ):
1720
1697
msg = self .create_suspect_msg (node )
1721
1698
# Node messages are before all others
@@ -1829,7 +1806,6 @@ def get_name():
1829
1806
1830
1807
@http_export ('/agent/leave/:nuuid' , protected = True )
1831
1808
def set_node_leave (nuuid ):
1832
- # with self.nodes_lock:
1833
1809
node = self .nodes .get (nuuid , None )
1834
1810
if node is None :
1835
1811
logger .error ('Asking us to set as leave the node %s but we cannot find it' % (nuuid ))
@@ -1841,8 +1817,6 @@ def set_node_leave(nuuid):
1841
1817
@http_export ('/agent/members' )
1842
1818
def agent_members ():
1843
1819
response .content_type = 'application/json'
1844
- # with self.nodes_lock:
1845
- # nodes = copy.copy(self.nodes)
1846
1820
return self .nodes
1847
1821
1848
1822
0 commit comments