Skip to content

Commit

Permalink
Merge pull request naver#8 from jhpark816/joon-dev
Browse files Browse the repository at this point in the history
Add asyncBopFindPositionWithGet API.
  • Loading branch information
hoonmin committed Dec 29, 2014
2 parents 41e83e7 + b97f9b2 commit cdf4988
Show file tree
Hide file tree
Showing 10 changed files with 935 additions and 0 deletions.
122 changes: 122 additions & 0 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import net.spy.memcached.collection.BTreeDelete;
import net.spy.memcached.collection.BTreeElement;
import net.spy.memcached.collection.BTreeFindPosition;
import net.spy.memcached.collection.BTreeFindPositionWithGet;
import net.spy.memcached.collection.BTreeGet;
import net.spy.memcached.collection.BTreeGetBulk;
import net.spy.memcached.collection.BTreeGetBulkWithByteTypeBkey;
Expand Down Expand Up @@ -112,6 +113,7 @@
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.SMGetFuture;
import net.spy.memcached.ops.BTreeFindPositionOperation;
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
import net.spy.memcached.ops.BTreeGetBulkOperation;
import net.spy.memcached.ops.BTreeGetByPositionOperation;
import net.spy.memcached.ops.BTreeSortMergeGetOperation;
Expand Down Expand Up @@ -2746,6 +2748,126 @@ public void gotData(int position) {
return rv;
}

@Override
public CollectionFuture<Map<Integer, Element<Object>>> asyncBopFindPositionWithGet(
String key, long longBKey, BTreeOrder order, int count) {
BTreeFindPositionWithGet<Object> get = new BTreeFindPositionWithGet<Object>(longBKey, order, count);
return asyncBopFindPositionWithGet(key, get, collectionTranscoder);
}

@Override
public <T> CollectionFuture<Map<Integer, Element<T>>> asyncBopFindPositionWithGet(
String key, long longBKey, BTreeOrder order, int count, Transcoder<T> tc) {
BTreeFindPositionWithGet<T> get = new BTreeFindPositionWithGet<T>(longBKey, order, count);
return asyncBopFindPositionWithGet(key, get, tc);
}

@Override
public CollectionFuture<Map<Integer, Element<Object>>> asyncBopFindPositionWithGet(
String key, byte[] byteArrayBKey, BTreeOrder order, int count) {
BTreeFindPositionWithGet<Object> get = new BTreeFindPositionWithGet<Object>(byteArrayBKey, order, count);
return asyncBopFindPositionWithGet(key, get, collectionTranscoder);
}

@Override
public <T> CollectionFuture<Map<Integer, Element<T>>> asyncBopFindPositionWithGet(
String key, byte[] byteArrayBKey, BTreeOrder order, int count, Transcoder<T> tc) {
BTreeFindPositionWithGet<T> get = new BTreeFindPositionWithGet<T>(byteArrayBKey, order, count);
return asyncBopFindPositionWithGet(key, get, tc);
}


/**
* Generic find position with get operation for b+tree items. Public methods for b+tree items call this method.
*
* @param k b+tree item's key
* @param get operation parameters (element key and so on)
* @param tc transcoder to serialize and unserialize value
* @return future holding the element's position
*/
private <T> CollectionFuture<Map<Integer, Element<T>>> asyncBopFindPositionWithGet(
final String k, final BTreeFindPositionWithGet<T> get, final Transcoder<T> tc) {
if (get.getOrder() == null) {
throw new IllegalArgumentException("BTreeOrder must not be null");
}
if (get.getCount() < 0 || get.getCount() > 100) {
throw new IllegalArgumentException("Count must be a value between 0 and 100.");
}

final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Map<Integer, Element<T>>> rv = new CollectionFuture<Map<Integer, Element<T>>>(
latch, operationTimeout);

Operation op = opFact.bopFindPositionWithGet(k, get, new BTreeFindPositionWithGetOperation.Callback() {

TreeMap<Integer, Element<T>> map = new TreeMap<Integer, Element<T>>();

public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
return;
}
if (cstatus.isSuccess()) {
rv.set(map, cstatus);
return;
}
switch (cstatus.getResponse()) {
case NOT_FOUND:
rv.set(null, cstatus);
if (getLogger().isDebugEnabled()) {
getLogger().debug("Key(%s) not found : %s", k, cstatus);
}
break;
case NOT_FOUND_ELEMENT:
rv.set(null, cstatus);
if (getLogger().isDebugEnabled()) {
getLogger().debug("Element(%s) not found : %s", k, cstatus);
}
break;
case UNREADABLE:
rv.set(null, cstatus);
if (getLogger().isDebugEnabled()) {
getLogger().debug("Collection(%s) is not readable : %s", k, cstatus);
}
break;
case BKEY_MISMATCH:
rv.set(null, cstatus);
if (getLogger().isDebugEnabled()) {
getLogger().debug("Collection(%s) has wrong bkey : %s(%s)",
k, cstatus, get.getBkeyObject().getType());
}
break;
case TYPE_MISMATCH:
rv.set(null, cstatus);
if (getLogger().isDebugEnabled()) {
getLogger().debug("Collection(%s) is not a B+Tree : %s", k, cstatus);
}
break;
default:
getLogger().warn("Unhandled state: " + status);
}
}

public void complete() {
latch.countDown();
}

public void gotData(String key, int flags, int pos, BKeyObject bkeyObject, byte[] eflag, byte[] data) {
assert key.equals(k) : "Wrong key returned";
Element<T> element = makeBTreeElement(key, flags, bkeyObject, eflag, data, tc);
if (element != null) {
map.put(pos, element);
}
}
});
rv.setOperation(op);
addOp(k, op);
return rv;
}

@Override
public BTreeStoreAndGetFuture<Boolean, Object> asyncBopInsertAndGetTrimmed(
String key, long bkey, byte[] eFlag, Object value,
Expand Down
50 changes: 50 additions & 0 deletions src/main/java/net/spy/memcached/ArcusClientIF.java
Original file line number Diff line number Diff line change
Expand Up @@ -1557,6 +1557,56 @@ public CollectionFuture<Integer> asyncBopFindPosition(
public CollectionFuture<Integer> asyncBopFindPosition(
String key, byte[] byteArrayBKey, BTreeOrder order);

/**
* Get the position, element, and neighbor elements of given bkey in b+tree.
*
* @param key b+tree item's key
* @param longBKey element's bkey (long type bkey)
* @param order ascending/descending order
* @param count number of elements requested in both direction
* @return future holding the element's position
*/
public CollectionFuture<Map<Integer, Element<Object>>> asyncBopFindPositionWithGet(
String key, long longBKey, BTreeOrder order, int count);

/**
* Get the position, element, and neighbor elements of given bkey in b+tree.
*
* @param key b+tree item's key
* @param longBKey element's bkey (long type bkey)
* @param order ascending/descending order
* @param count number of elements requested in both direction
* @param tc a transcoder to decode returned values
* @return future holding the element's position
*/
public <T> CollectionFuture<Map<Integer, Element<T>>> asyncBopFindPositionWithGet(
String key, long longBKey, BTreeOrder order, int count, Transcoder<T> tc);

/**
* Get the position, element, and neighbor elements of given bkey in b+tree.
*
* @param key b+tree item's key
* @param byteArrayBKey element's bkey (byte-array type bkey)
* @param order ascending/descending order
* @param count number of elements requested in both direction
* @return future holding the element's position
*/
public CollectionFuture<Map<Integer, Element<Object>>> asyncBopFindPositionWithGet(
String key, byte[] byteArrayBKey, BTreeOrder order, int count);

/**
* Get the position, element, and neighbor elements of given bkey in b+tree.
*
* @param key b+tree item's key
* @param byteArrayBKey element's bkey (byte-array type bkey)
* @param order ascending/descending order
* @param count number of elements requested in both direction
* @param tc a transcoder to decode returned values
* @return future holding the element's position
*/
public <T> CollectionFuture<Map<Integer, Element<T>>> asyncBopFindPositionWithGet(
String key, byte[] byteArrayBKey, BTreeOrder order, int count, Transcoder<T> tc);

/**
* Insert an element into b+tree and also get the "trimmed" element if any.
*
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/net/spy/memcached/ArcusClientPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,30 @@ public CollectionFuture<Integer> asyncBopFindPosition(String key,
return this.getClient().asyncBopFindPosition(key, byteArrayBKey, order);
}

@Override
public CollectionFuture<Map<Integer, Element<Object>>> asyncBopFindPositionWithGet(
String key, long longBKey, BTreeOrder order, int count) {
return this.getClient().asyncBopFindPositionWithGet(key, longBKey, order, count);
}

@Override
public <T> CollectionFuture<Map<Integer, Element<T>>> asyncBopFindPositionWithGet(
String key, long longBKey, BTreeOrder order, int count, Transcoder<T> tc) {
return this.getClient().asyncBopFindPositionWithGet(key, longBKey, order, count, tc);
}

@Override
public CollectionFuture<Map<Integer, Element<Object>>> asyncBopFindPositionWithGet(
String key, byte[] byteArrayBKey, BTreeOrder order, int count) {
return this.getClient().asyncBopFindPositionWithGet(key, byteArrayBKey, order, count);
}

@Override
public <T> CollectionFuture<Map<Integer, Element<T>>> asyncBopFindPositionWithGet(
String key, byte[] byteArrayBKey, BTreeOrder order, int count, Transcoder<T> tc) {
return this.getClient().asyncBopFindPositionWithGet(key, byteArrayBKey, order, count, tc);
}

@Override
public BTreeStoreAndGetFuture<Boolean, Object> asyncBopInsertAndGetTrimmed(
String key, long bkey, byte[] eFlag, Object value,
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/net/spy/memcached/OperationFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import net.spy.memcached.collection.Attributes;
import net.spy.memcached.collection.BTreeFindPosition;
import net.spy.memcached.collection.BTreeFindPositionWithGet;
import net.spy.memcached.collection.BTreeGetBulk;
import net.spy.memcached.collection.BTreeGetByPosition;
import net.spy.memcached.collection.BTreeSMGet;
Expand All @@ -41,6 +42,7 @@
import net.spy.memcached.collection.CollectionUpdate;
import net.spy.memcached.collection.SetPipedExist;
import net.spy.memcached.ops.BTreeFindPositionOperation;
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
import net.spy.memcached.ops.BTreeGetBulkOperation;
import net.spy.memcached.ops.BTreeGetByPositionOperation;
import net.spy.memcached.ops.BTreeSortMergeGetOperation;
Expand Down Expand Up @@ -480,6 +482,16 @@ CollectionBulkStoreOperation collectionBulkStore(List<String> key,
*/
BTreeFindPositionOperation bopFindPosition(String key, BTreeFindPosition get, OperationCallback cb);

/**
* Find-position-with-get operation for b+tree items.
*
* @param key b+tree item's key
* @param get operation parameters (element key and so on)
* @param cb the callback that will contain the results
* @return a new BTreeFindPositionWithGetOperation
*/
BTreeFindPositionWithGetOperation bopFindPositionWithGet(String key, BTreeFindPositionWithGet<?> get, OperationCallback cb);

/**
* Insert/upsert and get the trimmed element for b+tree items.
*
Expand Down
Loading

0 comments on commit cdf4988

Please sign in to comment.