From a5a1d2b031a8d1afc204367051741586c5f729b6 Mon Sep 17 00:00:00 2001 From: oliviarla Date: Wed, 22 Jan 2025 11:18:45 +0900 Subject: [PATCH] INTERNAL: add PipeOperationImpl to reduce duplicate code --- .../CollectionBulkInsertOperationImpl.java | 152 ++--------- .../CollectionPipedExistOperationImpl.java | 143 ++--------- .../CollectionPipedInsertOperationImpl.java | 185 ++------------ .../CollectionPipedUpdateOperationImpl.java | 157 +----------- .../protocol/ascii/PipeOperationImpl.java | 235 ++++++++++++++++++ 5 files changed, 296 insertions(+), 576 deletions(-) create mode 100644 src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java diff --git a/src/main/java/net/spy/memcached/protocol/ascii/CollectionBulkInsertOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/CollectionBulkInsertOperationImpl.java index f4d454505..8ec744545 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/CollectionBulkInsertOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/CollectionBulkInsertOperationImpl.java @@ -17,156 +17,48 @@ */ package net.spy.memcached.protocol.ascii; -import java.nio.ByteBuffer; -import java.util.Collection; - import net.spy.memcached.collection.CollectionBulkInsert; -import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.ops.APIType; import net.spy.memcached.ops.CollectionBulkInsertOperation; -import net.spy.memcached.ops.CollectionOperationStatus; import net.spy.memcached.ops.OperationCallback; -import net.spy.memcached.ops.OperationState; import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.OperationType; /** * Operation to store collection data in a memcached server. */ -public final class CollectionBulkInsertOperationImpl extends OperationImpl +public final class CollectionBulkInsertOperationImpl extends PipeOperationImpl implements CollectionBulkInsertOperation { - private static final OperationStatus STORE_CANCELED = new CollectionOperationStatus( - false, "collection canceled", CollectionResponse.CANCELED); - - private static final OperationStatus END = new CollectionOperationStatus( - true, "END", CollectionResponse.END); - private static final OperationStatus FAILED_END = new CollectionOperationStatus( - false, "END", CollectionResponse.END); - - private static final OperationStatus CREATED_STORED = new CollectionOperationStatus( - true, "CREATED_STORED", CollectionResponse.CREATED_STORED); - private static final OperationStatus STORED = new CollectionOperationStatus( - true, "STORED", CollectionResponse.STORED); - private static final OperationStatus NOT_FOUND = new CollectionOperationStatus( - false, "NOT_FOUND", CollectionResponse.NOT_FOUND); - private static final OperationStatus ELEMENT_EXISTS = new CollectionOperationStatus( - false, "ELEMENT_EXISTS", CollectionResponse.ELEMENT_EXISTS); - private static final OperationStatus OVERFLOWED = new CollectionOperationStatus( - false, "OVERFLOWED", CollectionResponse.OVERFLOWED); - private static final OperationStatus OUT_OF_RANGE = new CollectionOperationStatus( - false, "OUT_OF_RANGE", CollectionResponse.OUT_OF_RANGE); - private static final OperationStatus TYPE_MISMATCH = new CollectionOperationStatus( - false, "TYPE_MISMATCH", CollectionResponse.TYPE_MISMATCH); - private static final OperationStatus BKEY_MISMATCH = new CollectionOperationStatus( - false, "BKEY_MISMATCH", CollectionResponse.BKEY_MISMATCH); - - private final CollectionBulkInsert insert; private final CollectionBulkInsertOperation.Callback cb; - private int count; - private int index = 0; - private boolean successAll = true; - public CollectionBulkInsertOperationImpl(CollectionBulkInsert insert, OperationCallback cb) { - super(cb); - this.insert = insert; + super(insert.getKeyList(), insert, cb); this.cb = (Callback) cb; - if (this.insert instanceof CollectionBulkInsert.ListBulkInsert) { + if (insert instanceof CollectionBulkInsert.ListBulkInsert) { setAPIType(APIType.LOP_INSERT); - } else if (this.insert instanceof CollectionBulkInsert.SetBulkInsert) { + } else if (insert instanceof CollectionBulkInsert.SetBulkInsert) { setAPIType(APIType.SOP_INSERT); - } else if (this.insert instanceof CollectionBulkInsert.MapBulkInsert) { + } else if (insert instanceof CollectionBulkInsert.MapBulkInsert) { setAPIType(APIType.MOP_INSERT); - } else if (this.insert instanceof CollectionBulkInsert.BTreeBulkInsert) { + } else if (insert instanceof CollectionBulkInsert.BTreeBulkInsert) { setAPIType(APIType.BOP_INSERT); } setOperationType(OperationType.WRITE); } @Override - public void handleLine(String line) { - assert getState() == OperationState.READING - : "Read ``" + line + "'' when in " + getState() + " state"; - /* ENABLE_REPLICATION if */ - if (hasSwitchedOver(line)) { - this.insert.setNextOpIndex(index); - prepareSwitchover(line); - return; - } - /* ENABLE_REPLICATION end */ - /* ENABLE_MIGRATION if */ - if (hasNotMyKey(line)) { - addRedirectMultiKeyOperation(line, insert.getKey(index)); - if (insert.isNotPiped()) { - transitionState(OperationState.REDIRECT); - } else { - index++; - } - return; - } - /* ENABLE_MIGRATION end */ - if (insert.isNotPiped()) { - OperationStatus status = matchStatus(line, STORED, CREATED_STORED, - NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, - TYPE_MISMATCH, BKEY_MISMATCH); - if (!status.isSuccess()) { - cb.gotStatus(insert.getKey(index), status); - successAll = false; - } - cb.receivedStatus((successAll) ? END : FAILED_END); - transitionState(OperationState.COMPLETE); - return; - } - - /* - RESPONSE \r\n - \r\n - [ ... ] - \r\n - END|PIPE_ERROR \r\n - */ - if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { - /* ENABLE_MIGRATION if */ - if (needRedirect()) { - transitionState(OperationState.REDIRECT); - return; - } - /* ENABLE_MIGRATION end */ - cb.receivedStatus((successAll) ? END : FAILED_END); - transitionState(OperationState.COMPLETE); - } else if (line.startsWith("RESPONSE ")) { - getLogger().debug("Got line %s", line); - - // TODO server should be fixed - line = line.replace(" ", " "); - line = line.replace(" ", " "); - - String[] stuff = line.split(" "); - assert "RESPONSE".equals(stuff[0]); - count = Integer.parseInt(stuff[1]); - } else { - OperationStatus status = matchStatus(line, STORED, CREATED_STORED, - NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, - TYPE_MISMATCH, BKEY_MISMATCH); - - if (!status.isSuccess()) { - cb.gotStatus(insert.getKey(index), status); - successAll = false; - } - - index++; - } + protected OperationStatus matchStatus(String line) { + return matchStatus(line, STORED, CREATED_STORED, + NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, + TYPE_MISMATCH, BKEY_MISMATCH); } @Override - public void initialize() { - ByteBuffer buffer = insert.getAsciiCommand(); - setBuffer(buffer); - - if (getLogger().isDebugEnabled()) { - getLogger().debug("Request in ascii protocol: %s", - (new String(buffer.array())).replace("\r\n", "\\r\\n")); + protected void gotStatus(Object statusKey, OperationStatus status) { + if (!status.isSuccess()) { + successAll = false; + cb.gotStatus((String) statusKey, status); } } @@ -175,12 +67,8 @@ protected void wasCancelled() { getCallback().receivedStatus(STORE_CANCELED); } - public Collection getKeys() { - return insert.getKeyList(); - } - public CollectionBulkInsert getInsert() { - return insert; + return (CollectionBulkInsert) getCollectionPipe(); } @Override @@ -188,14 +76,4 @@ public boolean isBulkOperation() { return true; } - @Override - public boolean isPipeOperation() { - return true; - } - - @Override - public boolean isIdempotentOperation() { - return !(insert instanceof CollectionBulkInsert.ListBulkInsert); - } - } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedExistOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedExistOperationImpl.java index e82e9ca48..fc8e947d9 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedExistOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedExistOperationImpl.java @@ -17,137 +17,47 @@ */ package net.spy.memcached.protocol.ascii; -import java.nio.ByteBuffer; -import java.util.Collection; import java.util.Collections; -import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.collection.SetPipedExist; import net.spy.memcached.ops.APIType; -import net.spy.memcached.ops.CollectionOperationStatus; import net.spy.memcached.ops.CollectionPipedExistOperation; import net.spy.memcached.ops.OperationCallback; -import net.spy.memcached.ops.OperationState; import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.OperationType; -public final class CollectionPipedExistOperationImpl extends OperationImpl implements +public final class CollectionPipedExistOperationImpl extends PipeOperationImpl implements CollectionPipedExistOperation { - private static final OperationStatus EXIST_CANCELED = new CollectionOperationStatus( - false, "collection canceled", CollectionResponse.CANCELED); - - private static final OperationStatus EXIST = new CollectionOperationStatus( - true, "EXIST", CollectionResponse.EXIST); - private static final OperationStatus NOT_EXIST = new CollectionOperationStatus( - true, "NOT_EXIST", CollectionResponse.NOT_EXIST); - - private static final OperationStatus END = new CollectionOperationStatus( - true, "END", CollectionResponse.END); - private static final OperationStatus FAILED_END = new CollectionOperationStatus( - false, "END", CollectionResponse.END); - - private static final OperationStatus NOT_FOUND = new CollectionOperationStatus( - false, "NOT_FOUND", CollectionResponse.NOT_FOUND); - private static final OperationStatus TYPE_MISMATCH = new CollectionOperationStatus( - false, "TYPE_MISMATCH", CollectionResponse.TYPE_MISMATCH); - private static final OperationStatus UNREADABLE = new CollectionOperationStatus( - false, "UNREADABLE", CollectionResponse.UNREADABLE); - - private final String key; - private final SetPipedExist setPipedExist; private final CollectionPipedExistOperation.Callback cb; - private int count; - private int index = 0; - private boolean successAll = true; - public CollectionPipedExistOperationImpl(String key, SetPipedExist collectionExist, OperationCallback cb) { - super(cb); - this.key = key; - this.setPipedExist = collectionExist; + super(Collections.singletonList(key), collectionExist, cb); this.cb = (Callback) cb; - if (this.setPipedExist instanceof SetPipedExist) { - setAPIType(APIType.SOP_EXIST); - } + setAPIType(APIType.SOP_EXIST); setOperationType(OperationType.READ); } @Override - public void handleLine(String line) { - assert getState() == OperationState.READING : "Read ``" + line - + "'' when in " + getState() + " state"; - - /* ENABLE_MIGRATION if */ - if (hasNotMyKey(line)) { - // Only one NOT_MY_KEY is provided in response of single key piped operation when redirection. - addRedirectSingleKeyOperation(line, key); - if (setPipedExist.isNotPiped()) { - transitionState(OperationState.REDIRECT); - } else { - setPipedExist.setNextOpIndex(index); - } - return; - } - /* ENABLE_MIGRATION end */ - - if (setPipedExist.isNotPiped()) { - OperationStatus status = matchStatus(line, EXIST, NOT_EXIST, - NOT_FOUND, TYPE_MISMATCH, UNREADABLE); - cb.gotStatus(index, status); - cb.receivedStatus(status.isSuccess() ? END : FAILED_END); - transitionState(OperationState.COMPLETE); - return; - } - - /* - RESPONSE \r\n - \r\n - [ ... ] - \r\n - END|PIPE_ERROR \r\n - */ - if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { - /* ENABLE_MIGRATION if */ - if (needRedirect()) { - transitionState(OperationState.REDIRECT); - return; - } - /* ENABLE_MIGRATION end */ - cb.receivedStatus((successAll) ? END : FAILED_END); - transitionState(OperationState.COMPLETE); - } else if (line.startsWith("RESPONSE ")) { - getLogger().debug("Got line %s", line); - - // TODO server should be fixed - line = line.replace(" ", " "); - line = line.replace(" ", " "); - - String[] stuff = line.split(" "); - assert "RESPONSE".equals(stuff[0]); - count = Integer.parseInt(stuff[1]); - } else { - OperationStatus status = matchStatus(line, EXIST, NOT_EXIST, - NOT_FOUND, TYPE_MISMATCH, UNREADABLE); - - if (!status.isSuccess()) { - successAll = false; - } - cb.gotStatus(index, status); - index++; - } + protected boolean handleSwitchover(String line) { + // SOP EXIST command could be done in replica node. + // So, no need to handle switchover. + return false; } @Override - public void initialize() { - ByteBuffer buffer = setPipedExist.getAsciiCommand(); - setBuffer(buffer); + protected OperationStatus matchStatus(String line) { + return matchStatus(line, EXIST, NOT_EXIST, + NOT_FOUND, TYPE_MISMATCH, UNREADABLE); + } - if (getLogger().isDebugEnabled()) { - getLogger().debug("Request in ascii protocol: %s", - (new String(buffer.array())).replace("\r\n", "\\r\\n")); + @Override + protected void gotStatus(Object statusKey, OperationStatus status) { + if (!getCollectionPipe().isNotPiped() && !status.isSuccess()) { + successAll = false; } + cb.gotStatus((int) statusKey, status); } @Override @@ -155,27 +65,8 @@ protected void wasCancelled() { getCallback().receivedStatus(EXIST_CANCELED); } - public Collection getKeys() { - return Collections.singleton(key); - } - public SetPipedExist getExist() { - return setPipedExist; - } - - @Override - public boolean isBulkOperation() { - return false; - } - - @Override - public boolean isPipeOperation() { - return true; - } - - @Override - public boolean isIdempotentOperation() { - return true; + return (SetPipedExist) getCollectionPipe(); } } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java index b52fd4fb3..5856fa1bd 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java @@ -17,186 +17,53 @@ */ package net.spy.memcached.protocol.ascii; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collection; import java.util.Collections; import net.spy.memcached.collection.CollectionPipedInsert; -import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.ops.APIType; -import net.spy.memcached.ops.CollectionOperationStatus; import net.spy.memcached.ops.CollectionPipedInsertOperation; import net.spy.memcached.ops.OperationCallback; -import net.spy.memcached.ops.OperationErrorType; -import net.spy.memcached.ops.OperationException; -import net.spy.memcached.ops.OperationState; import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.OperationType; /** * Operation to store collection data in a memcached server. */ -public final class CollectionPipedInsertOperationImpl extends OperationImpl +public final class CollectionPipedInsertOperationImpl extends PipeOperationImpl implements CollectionPipedInsertOperation { - private static final OperationStatus STORE_CANCELED = new CollectionOperationStatus( - false, "collection canceled", CollectionResponse.CANCELED); - - private static final OperationStatus END = new CollectionOperationStatus( - true, "END", CollectionResponse.END); - private static final OperationStatus FAILED_END = new CollectionOperationStatus( - false, "END", CollectionResponse.END); - - private static final OperationStatus CREATED_STORED = new CollectionOperationStatus( - true, "CREATED_STORED", CollectionResponse.CREATED_STORED); - private static final OperationStatus STORED = new CollectionOperationStatus( - true, "STORED", CollectionResponse.STORED); - private static final OperationStatus NOT_FOUND = new CollectionOperationStatus( - false, "NOT_FOUND", CollectionResponse.NOT_FOUND); - private static final OperationStatus ELEMENT_EXISTS = new CollectionOperationStatus( - false, "ELEMENT_EXISTS", CollectionResponse.ELEMENT_EXISTS); - private static final OperationStatus OVERFLOWED = new CollectionOperationStatus( - false, "OVERFLOWED", CollectionResponse.OVERFLOWED); - private static final OperationStatus OUT_OF_RANGE = new CollectionOperationStatus( - false, "OUT_OF_RANGE", CollectionResponse.OUT_OF_RANGE); - private static final OperationStatus TYPE_MISMATCH = new CollectionOperationStatus( - false, "TYPE_MISMATCH", CollectionResponse.TYPE_MISMATCH); - private static final OperationStatus BKEY_MISMATCH = new CollectionOperationStatus( - false, "BKEY_MISMATCH", CollectionResponse.BKEY_MISMATCH); - - private final String key; - private final CollectionPipedInsert insert; private final CollectionPipedInsertOperation.Callback cb; - private int count; - private int index = 0; - private boolean successAll = true; - private boolean readUntilLastLine = false; - public CollectionPipedInsertOperationImpl(String key, CollectionPipedInsert insert, OperationCallback cb) { - super(cb); - this.key = key; - this.insert = insert; + super(Collections.singletonList(key), insert, cb); this.cb = (Callback) cb; - if (this.insert instanceof CollectionPipedInsert.ListPipedInsert) { + if (insert instanceof CollectionPipedInsert.ListPipedInsert) { setAPIType(APIType.LOP_INSERT); - } else if (this.insert instanceof CollectionPipedInsert.SetPipedInsert) { + } else if (insert instanceof CollectionPipedInsert.SetPipedInsert) { setAPIType(APIType.SOP_INSERT); - } else if (this.insert instanceof CollectionPipedInsert.MapPipedInsert) { + } else if (insert instanceof CollectionPipedInsert.MapPipedInsert) { setAPIType(APIType.MOP_INSERT); - } else if (this.insert instanceof CollectionPipedInsert.BTreePipedInsert) { + } else if (insert instanceof CollectionPipedInsert.BTreePipedInsert) { setAPIType(APIType.BOP_INSERT); - } else if (this.insert instanceof CollectionPipedInsert.ByteArraysBTreePipedInsert) { + } else if (insert instanceof CollectionPipedInsert.ByteArraysBTreePipedInsert) { setAPIType(APIType.BOP_INSERT); } setOperationType(OperationType.WRITE); } @Override - public void handleLine(String line) { - assert getState() == OperationState.READING - : "Read ``" + line + "'' when in " + getState() + " state"; - - /* ENABLE_REPLICATION if */ - if (hasSwitchedOver(line)) { - this.insert.setNextOpIndex(index); - prepareSwitchover(line); - return; - } - /* ENABLE_REPLICATION end */ - - /* ENABLE_MIGRATION if */ - if (hasNotMyKey(line)) { - // Only one NOT_MY_KEY is provided in response of single key piped operation when redirection. - addRedirectSingleKeyOperation(line, key); - if (insert.isNotPiped()) { - transitionState(OperationState.REDIRECT); - } else { - insert.setNextOpIndex(index); - } - return; - } - /* ENABLE_MIGRATION end */ - - if (insert.isNotPiped()) { - OperationStatus status = matchStatus(line, STORED, CREATED_STORED, - NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, - TYPE_MISMATCH, BKEY_MISMATCH); - if (status.isSuccess()) { - cb.receivedStatus((successAll) ? END : FAILED_END); - } else { - cb.gotStatus(index, status); - cb.receivedStatus(FAILED_END); - } - transitionState(OperationState.COMPLETE); - return; - } - - /* - RESPONSE \r\n - \r\n - [ ... ] - \r\n - END|PIPE_ERROR \r\n - */ - if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { - /* ENABLE_MIGRATION if */ - if (needRedirect()) { - transitionState(OperationState.REDIRECT); - return; - } - /* ENABLE_MIGRATION end */ - cb.receivedStatus((successAll) ? END : FAILED_END); - transitionState(OperationState.COMPLETE); - } else if (line.startsWith("RESPONSE ")) { - getLogger().debug("Got line %s", line); - - // TODO server should be fixed - line = line.replace(" ", " "); - line = line.replace(" ", " "); - - String[] stuff = line.split(" "); - assert "RESPONSE".equals(stuff[0]); - count = Integer.parseInt(stuff[1]); - readUntilLastLine = true; - } else { - OperationStatus status = matchStatus(line, STORED, CREATED_STORED, - NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, - TYPE_MISMATCH, BKEY_MISMATCH); - - if (!status.isSuccess()) { - cb.gotStatus(index, status); - successAll = false; - } - index++; - } - } - - @Override - protected void handleError(OperationErrorType eType, String line) throws IOException { - if (!readUntilLastLine) { - // this case means that error message came without 'RESPONSE '. - // so it doesn't need to read 'PIPE_ERROR'. - super.handleError(eType, line); - } else { - // this case means that error message came after 'RESPONSE '. - // so it needs to read 'PIPE_ERROR'. - getLogger().error("Error: %s by %s", line, this); - exception = new OperationException(eType, line + " @ " + getHandlingNode().getNodeName()); - } + protected OperationStatus matchStatus(String line) { + return matchStatus(line, STORED, CREATED_STORED, + NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, + TYPE_MISMATCH, BKEY_MISMATCH); } @Override - public void initialize() { - ByteBuffer buffer = insert.getAsciiCommand(); - setBuffer(buffer); - readUntilLastLine = false; - - if (getLogger().isDebugEnabled()) { - getLogger().debug("Request in ascii protocol: %s", - (new String(buffer.array())).replace("\r\n", "\\r\\n")); + protected void gotStatus(Object statusKey, OperationStatus status) { + if (!status.isSuccess()) { + successAll = false; + cb.gotStatus((int) statusKey, status); } } @@ -205,27 +72,7 @@ protected void wasCancelled() { getCallback().receivedStatus(STORE_CANCELED); } - public Collection getKeys() { - return Collections.singleton(key); - } - public CollectionPipedInsert getInsert() { - return insert; - } - - @Override - public boolean isBulkOperation() { - return false; - } - - @Override - public boolean isPipeOperation() { - return true; + return (CollectionPipedInsert) getCollectionPipe(); } - - @Override - public boolean isIdempotentOperation() { - return !(insert instanceof CollectionPipedInsert.ListPipedInsert); - } - } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedUpdateOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedUpdateOperationImpl.java index a9859097d..6731ec03d 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedUpdateOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedUpdateOperationImpl.java @@ -17,160 +17,49 @@ */ package net.spy.memcached.protocol.ascii; -import java.nio.ByteBuffer; -import java.util.Collection; import java.util.Collections; import net.spy.memcached.collection.CollectionPipedUpdate; import net.spy.memcached.collection.CollectionPipedUpdate.BTreePipedUpdate; import net.spy.memcached.collection.CollectionPipedUpdate.MapPipedUpdate; -import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.ops.APIType; -import net.spy.memcached.ops.CollectionOperationStatus; import net.spy.memcached.ops.CollectionPipedUpdateOperation; import net.spy.memcached.ops.OperationCallback; -import net.spy.memcached.ops.OperationState; import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.OperationType; /** * Operation to update collection data in a memcached server. */ -public final class CollectionPipedUpdateOperationImpl extends OperationImpl implements +public final class CollectionPipedUpdateOperationImpl extends PipeOperationImpl implements CollectionPipedUpdateOperation { - private static final OperationStatus STORE_CANCELED = new CollectionOperationStatus( - false, "collection canceled", CollectionResponse.CANCELED); - - private static final OperationStatus END = new CollectionOperationStatus( - true, "END", CollectionResponse.END); - private static final OperationStatus FAILED_END = new CollectionOperationStatus( - false, "END", CollectionResponse.END); - - private static final OperationStatus UPDATED = new CollectionOperationStatus( - true, "UPDATED", CollectionResponse.UPDATED); - private static final OperationStatus NOT_FOUND = new CollectionOperationStatus( - false, "NOT_FOUND", CollectionResponse.NOT_FOUND); - private static final OperationStatus NOT_FOUND_ELEMENT = new CollectionOperationStatus( - false, "NOT_FOUND_ELEMENT", CollectionResponse.NOT_FOUND_ELEMENT); - private static final OperationStatus NOTHING_TO_UPDATE = new CollectionOperationStatus( - false, "NOTHING_TO_UPDATE", CollectionResponse.NOTHING_TO_UPDATE); - private static final OperationStatus TYPE_MISMATCH = new CollectionOperationStatus( - false, "TYPE_MISMATCH", CollectionResponse.TYPE_MISMATCH); - private static final OperationStatus BKEY_MISMATCH = new CollectionOperationStatus( - false, "BKEY_MISMATCH", CollectionResponse.BKEY_MISMATCH); - private static final OperationStatus EFLAG_MISMATCH = new CollectionOperationStatus( - false, "EFLAG_MISMATCH", CollectionResponse.EFLAG_MISMATCH); - - private final String key; - private final CollectionPipedUpdate update; private final CollectionPipedUpdateOperation.Callback cb; - private int count; - private int index = 0; - private boolean successAll = true; - public CollectionPipedUpdateOperationImpl(String key, CollectionPipedUpdate update, OperationCallback cb) { - super(cb); - this.key = key; - this.update = update; + super(Collections.singletonList(key), update, cb); this.cb = (Callback) cb; - if (this.update instanceof BTreePipedUpdate) { + if (update instanceof BTreePipedUpdate) { setAPIType(APIType.BOP_UPDATE); - } else if (this.update instanceof MapPipedUpdate) { + } else if (update instanceof MapPipedUpdate) { setAPIType(APIType.MOP_UPDATE); } setOperationType(OperationType.WRITE); } @Override - public void handleLine(String line) { - assert getState() == OperationState.READING : "Read ``" + line - + "'' when in " + getState() + " state"; - - /* ENABLE_REPLICATION if */ - if (hasSwitchedOver(line)) { - this.update.setNextOpIndex(index); - prepareSwitchover(line); - return; - } - /* ENABLE_REPLICATION end */ - /* ENABLE_MIGRATION if */ - if (hasNotMyKey(line)) { - // Only one NOT_MY_KEY is provided in response of single key piped operation when redirection. - addRedirectSingleKeyOperation(line, key); - if (update.isNotPiped()) { - transitionState(OperationState.REDIRECT); - } else { - update.setNextOpIndex(index); - } - return; - } - /* ENABLE_MIGRATION end */ - - if (update.isNotPiped()) { - OperationStatus status = matchStatus(line, UPDATED, NOT_FOUND, - NOT_FOUND_ELEMENT, NOTHING_TO_UPDATE, TYPE_MISMATCH, - BKEY_MISMATCH, EFLAG_MISMATCH); - if (status.isSuccess()) { - cb.receivedStatus((successAll) ? END : FAILED_END); - } else { - cb.gotStatus(index, status); - cb.receivedStatus(FAILED_END); - } - transitionState(OperationState.COMPLETE); - return; - } - - /* - RESPONSE \r\n - \r\n - [ ... ] - \r\n - END|PIPE_ERROR \r\n - */ - if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { - /* ENABLE_MIGRATION if */ - if (needRedirect()) { - transitionState(OperationState.REDIRECT); - return; - } - /* ENABLE_MIGRATION end */ - cb.receivedStatus((successAll) ? END : FAILED_END); - transitionState(OperationState.COMPLETE); - } else if (line.startsWith("RESPONSE ")) { - getLogger().debug("Got line %s", line); - - // TODO server should be fixed - line = line.replace(" ", " "); - line = line.replace(" ", " "); - - String[] stuff = line.split(" "); - assert "RESPONSE".equals(stuff[0]); - count = Integer.parseInt(stuff[1]); - } else { - OperationStatus status = matchStatus(line, UPDATED, NOT_FOUND, - NOT_FOUND_ELEMENT, NOTHING_TO_UPDATE, TYPE_MISMATCH, - BKEY_MISMATCH, EFLAG_MISMATCH); - - if (!status.isSuccess()) { - cb.gotStatus(index, status); - successAll = false; - } - index++; - } + protected OperationStatus matchStatus(String line) { + return matchStatus(line, UPDATED, NOT_FOUND, + NOT_FOUND_ELEMENT, NOTHING_TO_UPDATE, TYPE_MISMATCH, + BKEY_MISMATCH, EFLAG_MISMATCH); } @Override - public void initialize() { - ByteBuffer buffer = update.getAsciiCommand(); - setBuffer(buffer); - - if (getLogger().isDebugEnabled()) { - getLogger().debug( - "Request in ascii protocol: %s", - (new String(buffer.array())).replace("\r\n", "\\r\\n")); + protected void gotStatus(Object statusKey, OperationStatus status) { + if (!status.isSuccess()) { + successAll = false; + cb.gotStatus((int) statusKey, status); } } @@ -179,27 +68,7 @@ protected void wasCancelled() { getCallback().receivedStatus(STORE_CANCELED); } - public Collection getKeys() { - return Collections.singleton(key); - } - public CollectionPipedUpdate getUpdate() { - return update; + return (CollectionPipedUpdate) getCollectionPipe(); } - - @Override - public boolean isBulkOperation() { - return false; - } - - @Override - public boolean isPipeOperation() { - return true; - } - - @Override - public boolean isIdempotentOperation() { - return true; - } - } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java new file mode 100644 index 000000000..fee5d0690 --- /dev/null +++ b/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java @@ -0,0 +1,235 @@ +package net.spy.memcached.protocol.ascii; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import net.spy.memcached.collection.CollectionBulkInsert; +import net.spy.memcached.collection.CollectionPipe; +import net.spy.memcached.collection.CollectionPipedInsert; +import net.spy.memcached.collection.CollectionResponse; +import net.spy.memcached.ops.CollectionOperationStatus; +import net.spy.memcached.ops.OperationCallback; +import net.spy.memcached.ops.OperationErrorType; +import net.spy.memcached.ops.OperationException; +import net.spy.memcached.ops.OperationState; +import net.spy.memcached.ops.OperationStatus; + +public abstract class PipeOperationImpl extends OperationImpl { + + protected static final OperationStatus STORE_CANCELED = new CollectionOperationStatus( + false, "collection canceled", CollectionResponse.CANCELED); + + protected static final OperationStatus END = new CollectionOperationStatus( + true, "END", CollectionResponse.END); + protected static final OperationStatus FAILED_END = new CollectionOperationStatus( + false, "END", CollectionResponse.END); + + protected static final OperationStatus CREATED_STORED = new CollectionOperationStatus( + true, "CREATED_STORED", CollectionResponse.CREATED_STORED); + protected static final OperationStatus STORED = new CollectionOperationStatus( + true, "STORED", CollectionResponse.STORED); + protected static final OperationStatus UPDATED = new CollectionOperationStatus( + true, "UPDATED", CollectionResponse.UPDATED); + protected static final OperationStatus NOT_FOUND = new CollectionOperationStatus( + false, "NOT_FOUND", CollectionResponse.NOT_FOUND); + protected static final OperationStatus NOT_FOUND_ELEMENT = new CollectionOperationStatus( + false, "NOT_FOUND_ELEMENT", CollectionResponse.NOT_FOUND_ELEMENT); + protected static final OperationStatus NOTHING_TO_UPDATE = new CollectionOperationStatus( + false, "NOTHING_TO_UPDATE", CollectionResponse.NOTHING_TO_UPDATE); + protected static final OperationStatus ELEMENT_EXISTS = new CollectionOperationStatus( + false, "ELEMENT_EXISTS", CollectionResponse.ELEMENT_EXISTS); + protected static final OperationStatus OVERFLOWED = new CollectionOperationStatus( + false, "OVERFLOWED", CollectionResponse.OVERFLOWED); + protected static final OperationStatus OUT_OF_RANGE = new CollectionOperationStatus( + false, "OUT_OF_RANGE", CollectionResponse.OUT_OF_RANGE); + protected static final OperationStatus TYPE_MISMATCH = new CollectionOperationStatus( + false, "TYPE_MISMATCH", CollectionResponse.TYPE_MISMATCH); + protected static final OperationStatus BKEY_MISMATCH = new CollectionOperationStatus( + false, "BKEY_MISMATCH", CollectionResponse.BKEY_MISMATCH); + protected static final OperationStatus EFLAG_MISMATCH = new CollectionOperationStatus( + false, "EFLAG_MISMATCH", CollectionResponse.EFLAG_MISMATCH); + + protected static final OperationStatus EXIST_CANCELED = new CollectionOperationStatus( + false, "collection canceled", CollectionResponse.CANCELED); + protected static final OperationStatus EXIST = new CollectionOperationStatus( + true, "EXIST", CollectionResponse.EXIST); + protected static final OperationStatus NOT_EXIST = new CollectionOperationStatus( + true, "NOT_EXIST", CollectionResponse.NOT_EXIST); + protected static final OperationStatus UNREADABLE = new CollectionOperationStatus( + false, "UNREADABLE", CollectionResponse.UNREADABLE); + + protected boolean successAll = true; + + private final CollectionPipe collectionPipe; + private final List keys; + private final boolean isSingleKey; + private final boolean isIdempotent; + + private int index = 0; + private boolean readUntilLastLine = false; + + protected PipeOperationImpl(List keys, CollectionPipe collectionPipe, + OperationCallback cb) { + super(cb); + if (keys == null || keys.isEmpty()) { + throw new IllegalArgumentException("No keys provided"); + } + this.keys = keys; + this.isSingleKey = keys.size() == 1; + this.collectionPipe = collectionPipe; + this.isIdempotent = !(collectionPipe instanceof CollectionPipedInsert.ListPipedInsert || + collectionPipe instanceof CollectionBulkInsert.ListBulkInsert); + } + + @Override + public void handleLine(String line) { + assert getState() == OperationState.READING + : "Read ``" + line + "'' when in " + getState() + " state"; + + /* ENABLE_REPLICATION if */ + if (handleSwitchover(line)) { + return; + } + /* ENABLE_REPLICATION end */ + + /* ENABLE_MIGRATION if */ + if (hasNotMyKey(line)) { + if (isSingleKey) { + // Only one NOT_MY_KEY is provided in response of + // single key piped operation when redirection. + addRedirectSingleKeyOperation(line, keys.get(0)); + } else { + addRedirectMultiKeyOperation(line, keys.get(index)); + } + if (collectionPipe.isNotPiped()) { + transitionState(OperationState.REDIRECT); + } else { + collectionPipe.setNextOpIndex(index); + } + return; + } + /* ENABLE_MIGRATION end */ + + if (collectionPipe.isNotPiped()) { + OperationStatus status = matchStatus(line); + + gotStatus(isBulkOperation() ? keys.get(index) : index, status); + + getCallback().receivedStatus((successAll) ? END : FAILED_END); + transitionState(OperationState.COMPLETE); + return; + } + + /* + RESPONSE \r\n + \r\n + [ ... ] + \r\n + END|PIPE_ERROR \r\n + */ + if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { + /* ENABLE_MIGRATION if */ + if (needRedirect()) { + transitionState(OperationState.REDIRECT); + return; + } + /* ENABLE_MIGRATION end */ + getCallback().receivedStatus((successAll) ? END : FAILED_END); + transitionState(OperationState.COMPLETE); + } else if (line.startsWith("RESPONSE ")) { + getLogger().debug("Got line %s", line); + + // TODO server should be fixed + line = line.replace(" ", " "); + line = line.replace(" ", " "); + + String[] stuff = line.split(" "); + assert "RESPONSE".equals(stuff[0]); + readUntilLastLine = true; + } else { + OperationStatus status = matchStatus(line); + gotStatus(isBulkOperation() ? keys.get(index) : index, status); + + index++; + } + } + + protected boolean handleSwitchover(String line) { + /* ENABLE_REPLICATION if */ + if (hasSwitchedOver(line)) { + this.collectionPipe.setNextOpIndex(index); + prepareSwitchover(line); + return true; + } + return false; + /* ENABLE_REPLICATION end */ + } + + @Override + protected void handleError(OperationErrorType eType, String line) throws IOException { + if (!readUntilLastLine) { + // this case means that error message came without 'RESPONSE '. + // so it doesn't need to read 'PIPE_ERROR'. + super.handleError(eType, line); + } else { + // this case means that error message came after 'RESPONSE '. + // so it needs to read 'PIPE_ERROR'. + getLogger().error("Error: %s by %s", line, this); + exception = new OperationException(eType, line + " @ " + getHandlingNode().getNodeName()); + } + } + + /** + * call matchStatus() method with proper statuses in the child class + * + * @param line line that is read from the server + * @return status that is matched with the line + */ + protected abstract OperationStatus matchStatus(String line); + + /** + * call gotStatus() method of each callback class in the child class + * + * @param statusKey could be key or index + * @param status status of each command + */ + protected abstract void gotStatus(Object statusKey, OperationStatus status); + + @Override + public void initialize() { + ByteBuffer buffer = collectionPipe.getAsciiCommand(); + setBuffer(buffer); + readUntilLastLine = false; + + if (getLogger().isDebugEnabled()) { + getLogger().debug("Request in ascii protocol: %s", + (new String(buffer.array())).replace("\r\n", "\\r\\n")); + } + } + + public Collection getKeys() { + return Collections.unmodifiableList(keys); + } + + public CollectionPipe getCollectionPipe() { + return collectionPipe; + } + + @Override + public boolean isBulkOperation() { + return false; + } + + @Override + public final boolean isPipeOperation() { + return true; + } + + public boolean isIdempotentOperation() { + return isIdempotent; + } + +}