4545import java .util .TreeMap ;
4646import java .util .concurrent .ConcurrentLinkedQueue ;
4747import java .util .concurrent .CountDownLatch ;
48+ import java .util .concurrent .Future ;
4849import java .util .concurrent .TimeUnit ;
4950import java .util .concurrent .atomic .AtomicReference ;
5051
@@ -115,6 +116,9 @@ public final class MemcachedConnection extends SpyObject {
115116 private final DelayedSwitchoverGroups delayedSwitchoverGroups =
116117 new DelayedSwitchoverGroups (DELAYED_SWITCHOVER_TIMEOUT_MILLISECONDS );
117118 /* ENABLE_REPLICATION end */
119+ private final HashRingUpdateService hashUpdateService = new HashRingUpdateService ();
120+
121+ private Future <Boolean > hashUpdateResult ;
118122
119123 /**
120124 * Construct a memcached connection.
@@ -313,7 +317,7 @@ public void handleIO() throws IOException {
313317 }
314318 }
315319
316- private void handleNodesToRemove (final List <MemcachedNode > nodesToRemove ) {
320+ void handleNodesToRemove (final List <MemcachedNode > nodesToRemove ) {
317321 for (MemcachedNode node : nodesToRemove ) {
318322 getLogger ().info ("old memcached node removed %s" , node );
319323 reconnectQueue .remove (node );
@@ -340,10 +344,9 @@ private void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
340344 }
341345 }
342346
343- private void updateConnections (List <InetSocketAddress > addrs ) throws IOException {
344- List <MemcachedNode > attachNodes = new ArrayList <MemcachedNode >();
345- List <MemcachedNode > removeNodes = new ArrayList <MemcachedNode >();
346-
347+ private void getUpdateNodes (List <InetSocketAddress > addrs ,
348+ List <MemcachedNode > attachNodes ,
349+ List <MemcachedNode > removeNodes ) throws IOException {
347350 for (MemcachedNode node : locator .getAll ()) {
348351 if (addrs .contains (node .getSocketAddress ())) {
349352 addrs .remove (node .getSocketAddress ());
@@ -356,12 +359,6 @@ private void updateConnections(List<InetSocketAddress> addrs) throws IOException
356359 for (SocketAddress sa : addrs ) {
357360 attachNodes .add (attachMemcachedNode (sa ));
358361 }
359-
360- // Update the hash.
361- locator .update (attachNodes , removeNodes );
362-
363- // Remove the unavailable nodes.
364- handleNodesToRemove (removeNodes );
365362 }
366363
367364 /* ENABLE_REPLICATION if */
@@ -704,7 +701,12 @@ void handleCacheNodesChange() throws IOException {
704701 return ;
705702 }
706703 /* ENABLE_REPLICATION end */
707- updateConnections (AddrUtil .getAddresses (cacheList ));
704+ List <MemcachedNode > attachNodes = new ArrayList <MemcachedNode >();
705+ List <MemcachedNode > removeNodes = new ArrayList <MemcachedNode >();
706+ getUpdateNodes (AddrUtil .getAddresses (cacheList ), attachNodes , removeNodes );
707+ // Update the hash.
708+ CacheListUpdateTask task = new CacheListUpdateTask (this , attachNodes , removeNodes );
709+ hashUpdateResult = hashUpdateService .updateHashes (task );
708710 }
709711 /* ENABLE_MIGRATION if */
710712 if (arcusMigrEnabled && alterList != null ) {
@@ -725,6 +727,16 @@ void handleCacheNodesChange() throws IOException {
725727 /* ENABLE_MIGRATION end */
726728 }
727729
730+ // Called By MemcachedConnectionTest.
731+ boolean getHashUpdateResult () {
732+ try {
733+ hashUpdateResult .get ();
734+ } catch (Exception e ) {
735+ return false ;
736+ }
737+ return true ;
738+ }
739+
728740 // Called by CacheManger to add the memcached server group.
729741 public void setCacheNodesChange (String addrs ) {
730742 String old = cacheNodesChange .getAndSet (addrs );
@@ -1533,6 +1545,7 @@ public void shutdown() throws IOException {
15331545 }
15341546 }
15351547 selector .close ();
1548+ hashUpdateService .shutdown ();
15361549 getLogger ().debug ("Shut down selector %s" , selector );
15371550 }
15381551
0 commit comments