Skip to content

Commit

Permalink
Added ENABLE_REPLICATION tag to identify replication code.
Browse files Browse the repository at this point in the history
  • Loading branch information
jhpark816 committed Nov 18, 2014
1 parent 3341cde commit 0172246
Show file tree
Hide file tree
Showing 14 changed files with 167 additions and 2 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
## ENABLE_REPLICATION

This is an experimental Java client for Arcus memcached with
replication support. Replication takes place at the server side using
a master-slave approach. It is transparent to the client. There are
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
<showDeprecations>true</showDeprecations>
<!-- <compilerArgument>-Xlint:unchecked</compilerArgument> -->
</configuration>
<!-- ENABLE_REPLICATION start -->
<executions>
<execution>
<id>default-testCompile</id>
Expand All @@ -101,6 +102,7 @@
</goals>
</execution>
</executions>
<!-- ENABLE_REPLICATION end -->
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/net/spy/memcached/Arcus17KetamaNodeLocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* ENABLE_REPLICATION start */
package net.spy.memcached;

import java.io.IOException;
Expand Down Expand Up @@ -76,3 +77,4 @@ public void update(Collection<MemcachedNode> toAttach, Collection<MemcachedNode>
}
}
}
/* ENABLE_REPLICATION end */
2 changes: 2 additions & 0 deletions src/main/java/net/spy/memcached/Arcus17NodeAddress.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* ENABLE_REPLICATION start */
package net.spy.memcached;

import java.net.InetSocketAddress;
Expand Down Expand Up @@ -137,3 +138,4 @@ public static Arcus17NodeAddress parseNodeName(String node) throws Exception {
return Arcus17NodeAddress.create(group, master, ipport);
}
}
/* ENABLE_REPLICATION end */
59 changes: 59 additions & 0 deletions src/main/java/net/spy/memcached/CacheManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,19 @@

public class CacheManager extends SpyThread implements Watcher,
CacheMonitor.CacheMonitorListener {
/* ENABLE_REPLICATION start */
private static final String CACHE_LIST_PATH = "/arcus/cache_list/";

private static final String CLIENT_INFO_PATH = "/arcus/client_list/";

private static final String CACHE_1_7_LIST_PATH = "/arcus_1_7/cache_list/";

private static final String CLIENT_1_7_INFO_PATH = "/arcus_1_7/client_list/";
/* ENABLE_REPLICATION else */
//public static final String CACHE_LIST_PATH = "/arcus/cache_list/";

//public static final String CLIENT_INFO_PATH = "/arcus/client_list/";
/* ENABLE_REPLICATION end */

private static final int SESSION_TIMEOUT = 15000;

Expand Down Expand Up @@ -80,7 +86,9 @@ public class CacheManager extends SpyThread implements Watcher,

private CountDownLatch zkInitLatch;

/* ENABLE_REPLICATION start */
private boolean arcus17 = false;
/* ENABLE_REPLICATION end */

public CacheManager(String hostPort, String serviceCode,
ConnectionFactoryBuilder cfb, CountDownLatch clientInitLatch, int poolSize,
Expand Down Expand Up @@ -114,6 +122,8 @@ private void initZooKeeperClient() {
try {
zkInitLatch.await(ZK_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS);


/* ENABLE_REPLICATION start */
// Check /arcus_1_7/cache_list/{svc} first
// If it exists, the service code belongs to a 1.7 cluster
if (zk.exists(CACHE_1_7_LIST_PATH + serviceCode, false) != null) {
Expand All @@ -130,6 +140,14 @@ else if (zk.exists(CACHE_LIST_PATH + serviceCode, false) != null) {
"Service code not found. (" + serviceCode + ")");
throw new NotExistsServiceCodeException(serviceCode);
}
/* ENABLE_REPLICATION else */
//if (zk.exists(CacheManager.CACHE_LIST_PATH + serviceCode, false) == null) {
// getLogger().fatal(
// "Service code not found. (" + serviceCode + ")");
// throw new NotExistsServiceCodeException(serviceCode);
//}
/* ENABLE_REPLICATION end */


String path = getClientInfo();
if (path.isEmpty()) {
Expand Down Expand Up @@ -160,8 +178,12 @@ else if (zk.exists(CACHE_LIST_PATH + serviceCode, false) != null) {
"Can't initialize Arcus client.", e);
}

/* ENABLE_REPLICATION start */
String cachePath = arcus17 ? CACHE_1_7_LIST_PATH : CACHE_LIST_PATH;
cacheMonitor = new CacheMonitor(zk, cachePath, serviceCode, this);
/* ENABLE_REPLICATION else */
//cacheMonitor = new CacheMonitor(zk, serviceCode, this);
/* ENABLE_REPLICATION end */
} catch (IOException e) {
throw new InitializeClientException(
"Can't initialize Arcus client.", e);
Expand All @@ -177,6 +199,7 @@ private String getClientInfo() {

// create the ephemeral znode
// "/arcus/client_list/{service_code}/{client hostname}_{ip address}_{pool size}_java_{client version}_{YYYYMMDDHHIISS}_{zk session id}"
/* ENABLE_REPLICATION start */
if (arcus17)
path = CLIENT_1_7_INFO_PATH; // /arcus_1_7/client_list/...
else
Expand All @@ -189,6 +212,16 @@ private String getClientInfo() {
+ ArcusClient.VERSION + "_"
+ simpleDateFormat.format(currentTime) + "_"
+ zk.getSessionId();
/* ENABLE_REPLICATION else */
//path = CLIENT_INFO_PATH + serviceCode + "/"
// + InetAddress.getLocalHost().getHostName() + "_"
// + InetAddress.getLocalHost().getHostAddress() + "_"
// + this.poolSize
// + "_java_"
// + ArcusClient.VERSION + "_"
// + simpleDateFormat.format(currentTime) + "_"
// + zk.getSessionId();
/* ENABLE_REPLICATION end */

} catch (UnknownHostException e) {
return null;
Expand Down Expand Up @@ -266,6 +299,7 @@ public void closing() {
* new children node list
*/
public void commandNodeChange(List<String> children) {
/* ENABLE_REPLICATION start */
// children is the current list of znodes in the cache_list directory
// Arcus 1.6 and 1.7 use different znode names.
//
Expand All @@ -276,8 +310,10 @@ public void commandNodeChange(List<String> children) {
// Arcus 1.7
// Znode names are group^{M,S}^ip:port-hostname. Concat all names separated
// by commas. Arcus17NodeAddress turns these names into Arcus17NodeAddress.
/* ENABLE_REPLICATION end */

String addrs = "";
/* ENABLE_REPLICATION start */
if (arcus17) {
for (int i = 0; i < children.size(); i++) {
if (i > 0)
Expand All @@ -295,6 +331,16 @@ public void commandNodeChange(List<String> children) {
}
}
}
/* ENABLE_REPLICATION else */
//for (int i = 0; i < children.size(); i++) {
// String[] temp = children.get(i).split("-");
// if (i != 0) {
// addrs = addrs + "," + temp[0];
// } else {
// addrs = temp[0];
// }
//}
/* ENABLE_REPLICATION end */

if (client == null) {
createArcusClient(addrs);
Expand All @@ -315,6 +361,7 @@ public void commandNodeChange(List<String> children) {
* current available Memcached Addresses
*/
private void createArcusClient(String addrs) {
/* ENABLE_REPLICATION start */
List<InetSocketAddress> socketList;
int count;
if (arcus17) {
Expand All @@ -339,6 +386,11 @@ private void createArcusClient(String addrs) {
}

final CountDownLatch latch = new CountDownLatch(count);
/* ENABLE_REPLICATION else */
//List<InetSocketAddress> socketList = AddrUtil.getAddresses(addrs);

//final CountDownLatch latch = new CountDownLatch(socketList.size());
/* ENABLE_REPLICATION end */
final ConnectionObserver observer = new ConnectionObserver() {

@Override
Expand All @@ -356,10 +408,17 @@ public void connectionEstablished(SocketAddress sa,
cfb.setInitialObservers(Collections.singleton(observer));

int _awaitTime = 0;
/* ENABLE_REPLICATION start */
if (waitTimeForConnect == 0)
_awaitTime = 50 * count;
else
_awaitTime = waitTimeForConnect;
/* ENABLE_REPLICATION else */
//if (waitTimeForConnect == 0)
// _awaitTime = 50 * socketList.size();
//else
// _awaitTime = waitTimeForConnect;
/* ENABLE_REPLICATION end */

client = new ArcusClient[poolSize];
for (int i = 0; i < poolSize; i++) {
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/net/spy/memcached/CacheMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public class CacheMonitor extends SpyObject implements Watcher,

ZooKeeper zk;

/* ENABLE_REPLICATION start */
String cacheListPath;
/* ENABLE_REPLICATION end */

String serviceCode;

Expand All @@ -54,8 +56,10 @@ public class CacheMonitor extends SpyObject implements Watcher,
*/
public static final String FAKE_SERVER_NODE = "0.0.0.0:23456";

/* ENABLE_REPLICATION start */
/* We only use this for demo, to show more readable node names when the cache list changes. */
boolean demoPrintClusterDiff = false;
/* ENABLE_REPLICATION end */

/**
* Constructor
Expand All @@ -67,6 +71,7 @@ public class CacheMonitor extends SpyObject implements Watcher,
* @param listener
* Callback listener
*/
/* ENABLE_REPLICATION start */
public CacheMonitor(ZooKeeper zk, String cacheListPath, String serviceCode,
CacheMonitorListener listener) {
this.zk = zk;
Expand All @@ -82,6 +87,20 @@ public CacheMonitor(ZooKeeper zk, String cacheListPath, String serviceCode,
// Returning list would be processed in processResult().
asyncGetCacheList();
}
/* ENABLE_REPLICATION else */
//public CacheMonitor(ZooKeeper zk, String serviceCode,
// CacheMonitorListener listener) {
// this.zk = zk;
// this.serviceCode = serviceCode;
// this.listener = listener;

// getLogger().info("Initializing the CacheMonitor.");

// // Get the cache list from the Arcus admin asynchronously.
// // Returning list would be processed in processResult().
// asyncGetCacheList();
//}
/* ENABLE_REPLICATION end */

/**
* Other classes use the CacheMonitor by implementing this method
Expand Down Expand Up @@ -161,10 +180,18 @@ public void processResult(int rc, String path, Object ctx,
*/
void asyncGetCacheList() {
if (getLogger().isDebugEnabled()) {
/* ENABLE_REPLICATION start */
getLogger().debug("Set a new watch on " + (cacheListPath + serviceCode));
/* ENABLE_REPLICATION else */
//getLogger().debug("Set a new watch on " + (CacheManager.CACHE_LIST_PATH + serviceCode));
/* ENABLE_REPLICATION end */
}

/* ENABLE_REPLICATION start */
zk.getChildren(cacheListPath + serviceCode, true, this, null);
/* ENABLE_REPLICATION else */
//zk.getChildren(CacheManager.CACHE_LIST_PATH + serviceCode, true, this, null);
/* ENABLE_REPLICATION end */
}

/**
Expand All @@ -181,6 +208,7 @@ void commandNodeChange(List<String> children) {

if (!children.equals(prevChildren)) {
getLogger().warn("Cache list has been changed : From=" + prevChildren + ", To=" + children + ", " + getInfo());
/* ENABLE_REPLICATION start */
if (demoPrintClusterDiff) {
// Assume 1.7 cluster
System.out.println("\nCLUSTER CHANGE\n---PREVIOUS---");
Expand Down Expand Up @@ -209,6 +237,7 @@ void commandNodeChange(List<String> children) {
}
System.out.println("");
}
/* ENABLE_REPLICATION end */
}

// Store the current children.
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
* limitations under the License.
*/
package net.spy.memcached;

/* ENABLE_REPLICATION start */
import java.io.IOException;
import java.net.InetSocketAddress;
/* ENABLE_REPLICATION end */
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -77,11 +78,13 @@ public class ConnectionFactoryBuilder {

private String frontCacheName = "ArcusFrontCache_" + this.hashCode();

/* ENABLE_REPLICATION start */
private boolean arcus17 = false;

public void setArcus17(boolean b) {
arcus17 = b;
}
/* ENABLE_REPLICATION end */

/**
* Set the operation queue factory.
Expand Down Expand Up @@ -321,13 +324,15 @@ public ConnectionFactoryBuilder setMaxSMGetKeyChunkSize(int size) {
public ConnectionFactory build() {
return new DefaultConnectionFactory() {

/* ENABLE_REPLICATION start */
@Override
public MemcachedConnection createConnection(List<InetSocketAddress> addrs)
throws IOException {
MemcachedConnection c = super.createConnection(addrs);
c.setArcus17(arcus17);
return c;
}
/* ENABLE_REPLICATION end */

@Override
public BlockingQueue<Operation> createOperationQueue() {
Expand Down Expand Up @@ -357,6 +362,7 @@ public NodeLocator createLocator(List<MemcachedNode> nodes) {
case CONSISTENT:
return new KetamaNodeLocator(nodes, getHashAlg());
case ARCUSCONSISTENT:
/* ENABLE_REPLICATION start */
if (arcus17) {
// Arcus 1.7
// This locator uses Arcus17KetamaNodeLocatorConfiguration
Expand All @@ -368,6 +374,9 @@ public NodeLocator createLocator(List<MemcachedNode> nodes) {
// Arcus 1.6
return new ArcusKetamaNodeLocator(nodes, getHashAlg());
}
/* ENABLE_REPLICATION else */
//return new ArcusKetamaNodeLocator(nodes, getHashAlg());
/* ENABLE_REPLICATION end */
default: throw new IllegalStateException(
"Unhandled locator type: " + locator);
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,9 @@ public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys,
validateKey(key);
final MemcachedNode primaryNode=locator.getPrimary(key);
MemcachedNode node=null;
/* ENABLE_REPLICATION start */
// FIXME. Support FailureMode. See MemcachedConnection.addOperation.
/* ENABLE_REPLICATION end */
if(primaryNode.isActive()) {
node=primaryNode;
} else {
Expand Down
Loading

0 comments on commit 0172246

Please sign in to comment.