From 845df106a6098d3476d114af198c664b79527fb6 Mon Sep 17 00:00:00 2001 From: oliviarla Date: Wed, 22 Jan 2025 11:18:45 +0900 Subject: [PATCH] INTERNAL: add PipeOperationImpl to reduce duplicate code --- .../CollectionBulkInsertOperationImpl.java | 161 +----------- .../CollectionPipedExistOperationImpl.java | 154 +----------- .../CollectionPipedInsertOperationImpl.java | 197 +-------------- .../CollectionPipedUpdateOperationImpl.java | 169 +------------ .../protocol/ascii/PipeOperationImpl.java | 232 ++++++++++++++++++ 5 files changed, 274 insertions(+), 639 deletions(-) create mode 100644 src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java diff --git a/src/main/java/net/spy/memcached/protocol/ascii/CollectionBulkInsertOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/CollectionBulkInsertOperationImpl.java index 06c6adf15..922fe91da 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/CollectionBulkInsertOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/CollectionBulkInsertOperationImpl.java @@ -17,172 +17,43 @@ */ package net.spy.memcached.protocol.ascii; -import java.nio.ByteBuffer; -import java.util.Collection; - import net.spy.memcached.collection.CollectionBulkInsert; -import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.ops.APIType; import net.spy.memcached.ops.CollectionBulkInsertOperation; -import net.spy.memcached.ops.CollectionOperationStatus; import net.spy.memcached.ops.OperationCallback; -import net.spy.memcached.ops.OperationState; import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.OperationType; -import net.spy.memcached.ops.PipedOperationCallback; /** * Operation to store collection data in a memcached server. */ -public final class CollectionBulkInsertOperationImpl extends OperationImpl +public final class CollectionBulkInsertOperationImpl extends PipeOperationImpl implements CollectionBulkInsertOperation { - private static final OperationStatus STORE_CANCELED = new CollectionOperationStatus( - false, "collection canceled", CollectionResponse.CANCELED); - - private static final OperationStatus END = new CollectionOperationStatus( - true, "END", CollectionResponse.END); - private static final OperationStatus FAILED_END = new CollectionOperationStatus( - false, "END", CollectionResponse.END); - - private static final OperationStatus CREATED_STORED = new CollectionOperationStatus( - true, "CREATED_STORED", CollectionResponse.CREATED_STORED); - private static final OperationStatus STORED = new CollectionOperationStatus( - true, "STORED", CollectionResponse.STORED); - private static final OperationStatus NOT_FOUND = new CollectionOperationStatus( - false, "NOT_FOUND", CollectionResponse.NOT_FOUND); - private static final OperationStatus ELEMENT_EXISTS = new CollectionOperationStatus( - false, "ELEMENT_EXISTS", CollectionResponse.ELEMENT_EXISTS); - private static final OperationStatus OVERFLOWED = new CollectionOperationStatus( - false, "OVERFLOWED", CollectionResponse.OVERFLOWED); - private static final OperationStatus OUT_OF_RANGE = new CollectionOperationStatus( - false, "OUT_OF_RANGE", CollectionResponse.OUT_OF_RANGE); - private static final OperationStatus TYPE_MISMATCH = new CollectionOperationStatus( - false, "TYPE_MISMATCH", CollectionResponse.TYPE_MISMATCH); - private static final OperationStatus BKEY_MISMATCH = new CollectionOperationStatus( - false, "BKEY_MISMATCH", CollectionResponse.BKEY_MISMATCH); - - private final CollectionBulkInsert insert; - private final PipedOperationCallback cb; - - private int count; - private int index = 0; - private boolean successAll = true; - public CollectionBulkInsertOperationImpl(CollectionBulkInsert insert, OperationCallback cb) { - super(cb); - this.insert = insert; - this.cb = (PipedOperationCallback) cb; - if (this.insert instanceof CollectionBulkInsert.ListBulkInsert) { + super(insert.getKeyList(), insert, cb); + if (insert instanceof CollectionBulkInsert.ListBulkInsert) { setAPIType(APIType.LOP_INSERT); - } else if (this.insert instanceof CollectionBulkInsert.SetBulkInsert) { + } else if (insert instanceof CollectionBulkInsert.SetBulkInsert) { setAPIType(APIType.SOP_INSERT); - } else if (this.insert instanceof CollectionBulkInsert.MapBulkInsert) { + } else if (insert instanceof CollectionBulkInsert.MapBulkInsert) { setAPIType(APIType.MOP_INSERT); - } else if (this.insert instanceof CollectionBulkInsert.BTreeBulkInsert) { + } else if (insert instanceof CollectionBulkInsert.BTreeBulkInsert) { setAPIType(APIType.BOP_INSERT); } setOperationType(OperationType.WRITE); } @Override - public void handleLine(String line) { - assert getState() == OperationState.READING - : "Read ``" + line + "'' when in " + getState() + " state"; - /* ENABLE_REPLICATION if */ - if (hasSwitchedOver(line)) { - this.insert.setNextOpIndex(index); - prepareSwitchover(line); - return; - } - /* ENABLE_REPLICATION end */ - /* ENABLE_MIGRATION if */ - if (hasNotMyKey(line)) { - addRedirectMultiKeyOperation(line, insert.getKey(index)); - if (insert.isNotPiped()) { - transitionState(OperationState.REDIRECT); - } else { - index++; - } - return; - } - /* ENABLE_MIGRATION end */ - if (insert.isNotPiped()) { - OperationStatus status = matchStatus(line, STORED, CREATED_STORED, - NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, - TYPE_MISMATCH, BKEY_MISMATCH); - if (!status.isSuccess()) { - successAll = false; - } - - cb.gotStatus(index, status); - cb.receivedStatus((successAll) ? END : FAILED_END); - transitionState(OperationState.COMPLETE); - return; - } - - /* - RESPONSE \r\n - \r\n - [ ... ] - \r\n - END|PIPE_ERROR \r\n - */ - if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { - /* ENABLE_MIGRATION if */ - if (needRedirect()) { - transitionState(OperationState.REDIRECT); - return; - } - /* ENABLE_MIGRATION end */ - cb.receivedStatus((successAll) ? END : FAILED_END); - transitionState(OperationState.COMPLETE); - } else if (line.startsWith("RESPONSE ")) { - getLogger().debug("Got line %s", line); - - // TODO server should be fixed - line = line.replace(" ", " "); - line = line.replace(" ", " "); - - String[] stuff = line.split(" "); - assert "RESPONSE".equals(stuff[0]); - count = Integer.parseInt(stuff[1]); - } else { - OperationStatus status = matchStatus(line, STORED, CREATED_STORED, - NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, - TYPE_MISMATCH, BKEY_MISMATCH); - - if (!status.isSuccess()) { - successAll = false; - } - - cb.gotStatus(index, status); - index++; - } - } - - @Override - public void initialize() { - ByteBuffer buffer = insert.getAsciiCommand(); - setBuffer(buffer); - - if (getLogger().isDebugEnabled()) { - getLogger().debug("Request in ascii protocol: %s", - (new String(buffer.array())).replace("\r\n", "\\r\\n")); - } + protected OperationStatus checkStatus(String line) { + return matchStatus(line, STORED, CREATED_STORED, + NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, + TYPE_MISMATCH, BKEY_MISMATCH); } @Override - protected void wasCancelled() { - getCallback().receivedStatus(STORE_CANCELED); - } - - public Collection getKeys() { - return insert.getKeyList(); - } - public CollectionBulkInsert getInsert() { - return insert; + return (CollectionBulkInsert) getCollectionPipe(); } @Override @@ -190,14 +61,4 @@ public boolean isBulkOperation() { return true; } - @Override - public boolean isPipeOperation() { - return true; - } - - @Override - public boolean isIdempotentOperation() { - return !(insert instanceof CollectionBulkInsert.ListBulkInsert); - } - } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedExistOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedExistOperationImpl.java index 03d381a4c..56fd00fbc 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedExistOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedExistOperationImpl.java @@ -17,172 +17,34 @@ */ package net.spy.memcached.protocol.ascii; -import java.nio.ByteBuffer; -import java.util.Collection; import java.util.Collections; -import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.collection.SetPipedExist; import net.spy.memcached.ops.APIType; -import net.spy.memcached.ops.CollectionOperationStatus; import net.spy.memcached.ops.CollectionPipedExistOperation; import net.spy.memcached.ops.OperationCallback; -import net.spy.memcached.ops.OperationState; import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.OperationType; -import net.spy.memcached.ops.PipedOperationCallback; -public final class CollectionPipedExistOperationImpl extends OperationImpl implements +public final class CollectionPipedExistOperationImpl extends PipeOperationImpl implements CollectionPipedExistOperation { - private static final OperationStatus EXIST_CANCELED = new CollectionOperationStatus( - false, "collection canceled", CollectionResponse.CANCELED); - - private static final OperationStatus EXIST = new CollectionOperationStatus( - true, "EXIST", CollectionResponse.EXIST); - private static final OperationStatus NOT_EXIST = new CollectionOperationStatus( - true, "NOT_EXIST", CollectionResponse.NOT_EXIST); - - private static final OperationStatus END = new CollectionOperationStatus( - true, "END", CollectionResponse.END); - private static final OperationStatus FAILED_END = new CollectionOperationStatus( - false, "END", CollectionResponse.END); - - private static final OperationStatus NOT_FOUND = new CollectionOperationStatus( - false, "NOT_FOUND", CollectionResponse.NOT_FOUND); - private static final OperationStatus TYPE_MISMATCH = new CollectionOperationStatus( - false, "TYPE_MISMATCH", CollectionResponse.TYPE_MISMATCH); - private static final OperationStatus UNREADABLE = new CollectionOperationStatus( - false, "UNREADABLE", CollectionResponse.UNREADABLE); - - private final String key; - private final SetPipedExist setPipedExist; - private final PipedOperationCallback cb; - - private int count; - private int index = 0; - private boolean successAll = true; - public CollectionPipedExistOperationImpl(String key, - SetPipedExist collectionExist, - OperationCallback cb) { - super(cb); - this.key = key; - this.setPipedExist = collectionExist; - this.cb = (PipedOperationCallback) cb; - if (this.setPipedExist instanceof SetPipedExist) { - setAPIType(APIType.SOP_EXIST); - } + SetPipedExist collectionExist, OperationCallback cb) { + super(Collections.singletonList(key), collectionExist, cb); + setAPIType(APIType.SOP_EXIST); setOperationType(OperationType.READ); } @Override - public void handleLine(String line) { - assert getState() == OperationState.READING : "Read ``" + line - + "'' when in " + getState() + " state"; - - /* ENABLE_MIGRATION if */ - if (hasNotMyKey(line)) { - // Only one NOT_MY_KEY is provided in response of single key piped operation when redirection. - addRedirectSingleKeyOperation(line, key); - if (setPipedExist.isNotPiped()) { - transitionState(OperationState.REDIRECT); - } else { - setPipedExist.setNextOpIndex(index); - } - return; - } - /* ENABLE_MIGRATION end */ - - if (setPipedExist.isNotPiped()) { - OperationStatus status = matchStatus(line, EXIST, NOT_EXIST, - NOT_FOUND, TYPE_MISMATCH, UNREADABLE); - if (!status.isSuccess()) { - successAll = false; - } - - cb.gotStatus(index, status); - cb.receivedStatus(successAll ? END : FAILED_END); - transitionState(OperationState.COMPLETE); - return; - } - - /* - RESPONSE \r\n - \r\n - [ ... ] - \r\n - END|PIPE_ERROR \r\n - */ - if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { - /* ENABLE_MIGRATION if */ - if (needRedirect()) { - transitionState(OperationState.REDIRECT); - return; - } - /* ENABLE_MIGRATION end */ - cb.receivedStatus((successAll) ? END : FAILED_END); - transitionState(OperationState.COMPLETE); - } else if (line.startsWith("RESPONSE ")) { - getLogger().debug("Got line %s", line); - - // TODO server should be fixed - line = line.replace(" ", " "); - line = line.replace(" ", " "); - - String[] stuff = line.split(" "); - assert "RESPONSE".equals(stuff[0]); - count = Integer.parseInt(stuff[1]); - } else { - OperationStatus status = matchStatus(line, EXIST, NOT_EXIST, - NOT_FOUND, TYPE_MISMATCH, UNREADABLE); - - if (!status.isSuccess()) { - successAll = false; - } - - cb.gotStatus(index, status); - index++; - } + protected OperationStatus checkStatus(String line) { + return matchStatus(line, EXIST, NOT_EXIST, + NOT_FOUND, TYPE_MISMATCH, UNREADABLE); } @Override - public void initialize() { - ByteBuffer buffer = setPipedExist.getAsciiCommand(); - setBuffer(buffer); - - if (getLogger().isDebugEnabled()) { - getLogger().debug("Request in ascii protocol: %s", - (new String(buffer.array())).replace("\r\n", "\\r\\n")); - } - } - - @Override - protected void wasCancelled() { - getCallback().receivedStatus(EXIST_CANCELED); - } - - public Collection getKeys() { - return Collections.singleton(key); - } - public SetPipedExist getExist() { - return setPipedExist; - } - - @Override - public boolean isBulkOperation() { - return false; - } - - @Override - public boolean isPipeOperation() { - return true; - } - - @Override - public boolean isIdempotentOperation() { - return true; + return (SetPipedExist) getCollectionPipe(); } } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java index e7de971a7..65c9b6fb3 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java @@ -17,218 +17,47 @@ */ package net.spy.memcached.protocol.ascii; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collection; import java.util.Collections; import net.spy.memcached.collection.CollectionPipedInsert; -import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.ops.APIType; -import net.spy.memcached.ops.CollectionOperationStatus; import net.spy.memcached.ops.CollectionPipedInsertOperation; import net.spy.memcached.ops.OperationCallback; -import net.spy.memcached.ops.OperationErrorType; -import net.spy.memcached.ops.OperationException; -import net.spy.memcached.ops.OperationState; import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.OperationType; -import net.spy.memcached.ops.PipedOperationCallback; /** * Operation to store collection data in a memcached server. */ -public final class CollectionPipedInsertOperationImpl extends OperationImpl +public final class CollectionPipedInsertOperationImpl extends PipeOperationImpl implements CollectionPipedInsertOperation { - private static final OperationStatus STORE_CANCELED = new CollectionOperationStatus( - false, "collection canceled", CollectionResponse.CANCELED); - - private static final OperationStatus END = new CollectionOperationStatus( - true, "END", CollectionResponse.END); - private static final OperationStatus FAILED_END = new CollectionOperationStatus( - false, "END", CollectionResponse.END); - - private static final OperationStatus CREATED_STORED = new CollectionOperationStatus( - true, "CREATED_STORED", CollectionResponse.CREATED_STORED); - private static final OperationStatus STORED = new CollectionOperationStatus( - true, "STORED", CollectionResponse.STORED); - private static final OperationStatus NOT_FOUND = new CollectionOperationStatus( - false, "NOT_FOUND", CollectionResponse.NOT_FOUND); - private static final OperationStatus ELEMENT_EXISTS = new CollectionOperationStatus( - false, "ELEMENT_EXISTS", CollectionResponse.ELEMENT_EXISTS); - private static final OperationStatus OVERFLOWED = new CollectionOperationStatus( - false, "OVERFLOWED", CollectionResponse.OVERFLOWED); - private static final OperationStatus OUT_OF_RANGE = new CollectionOperationStatus( - false, "OUT_OF_RANGE", CollectionResponse.OUT_OF_RANGE); - private static final OperationStatus TYPE_MISMATCH = new CollectionOperationStatus( - false, "TYPE_MISMATCH", CollectionResponse.TYPE_MISMATCH); - private static final OperationStatus BKEY_MISMATCH = new CollectionOperationStatus( - false, "BKEY_MISMATCH", CollectionResponse.BKEY_MISMATCH); - - private final String key; - private final CollectionPipedInsert insert; - private final PipedOperationCallback cb; - - private int count; - private int index = 0; - private boolean successAll = true; - private boolean readUntilLastLine = false; - public CollectionPipedInsertOperationImpl(String key, - CollectionPipedInsert insert, - OperationCallback cb) { - super(cb); - this.key = key; - this.insert = insert; - this.cb = (PipedOperationCallback) cb; - if (this.insert instanceof CollectionPipedInsert.ListPipedInsert) { + CollectionPipedInsert insert, OperationCallback cb) { + super(Collections.singletonList(key), insert, cb); + if (insert instanceof CollectionPipedInsert.ListPipedInsert) { setAPIType(APIType.LOP_INSERT); - } else if (this.insert instanceof CollectionPipedInsert.SetPipedInsert) { + } else if (insert instanceof CollectionPipedInsert.SetPipedInsert) { setAPIType(APIType.SOP_INSERT); - } else if (this.insert instanceof CollectionPipedInsert.MapPipedInsert) { + } else if (insert instanceof CollectionPipedInsert.MapPipedInsert) { setAPIType(APIType.MOP_INSERT); - } else if (this.insert instanceof CollectionPipedInsert.BTreePipedInsert) { + } else if (insert instanceof CollectionPipedInsert.BTreePipedInsert) { setAPIType(APIType.BOP_INSERT); - } else if (this.insert instanceof CollectionPipedInsert.ByteArraysBTreePipedInsert) { + } else if (insert instanceof CollectionPipedInsert.ByteArraysBTreePipedInsert) { setAPIType(APIType.BOP_INSERT); } setOperationType(OperationType.WRITE); } @Override - public void handleLine(String line) { - assert getState() == OperationState.READING - : "Read ``" + line + "'' when in " + getState() + " state"; - - /* ENABLE_REPLICATION if */ - if (hasSwitchedOver(line)) { - this.insert.setNextOpIndex(index); - prepareSwitchover(line); - return; - } - /* ENABLE_REPLICATION end */ - - /* ENABLE_MIGRATION if */ - if (hasNotMyKey(line)) { - // Only one NOT_MY_KEY is provided in response of single key piped operation when redirection. - addRedirectSingleKeyOperation(line, key); - if (insert.isNotPiped()) { - transitionState(OperationState.REDIRECT); - } else { - insert.setNextOpIndex(index); - } - return; - } - /* ENABLE_MIGRATION end */ - - if (insert.isNotPiped()) { - OperationStatus status = matchStatus(line, STORED, CREATED_STORED, - NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, - TYPE_MISMATCH, BKEY_MISMATCH); - if (!status.isSuccess()) { - successAll = false; - } - - cb.gotStatus(index, status); - cb.receivedStatus((successAll) ? END : FAILED_END); - transitionState(OperationState.COMPLETE); - return; - } - - /* - RESPONSE \r\n - \r\n - [ ... ] - \r\n - END|PIPE_ERROR \r\n - */ - if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { - /* ENABLE_MIGRATION if */ - if (needRedirect()) { - transitionState(OperationState.REDIRECT); - return; - } - /* ENABLE_MIGRATION end */ - cb.receivedStatus((successAll) ? END : FAILED_END); - transitionState(OperationState.COMPLETE); - } else if (line.startsWith("RESPONSE ")) { - getLogger().debug("Got line %s", line); - - // TODO server should be fixed - line = line.replace(" ", " "); - line = line.replace(" ", " "); - - String[] stuff = line.split(" "); - assert "RESPONSE".equals(stuff[0]); - count = Integer.parseInt(stuff[1]); - readUntilLastLine = true; - } else { - OperationStatus status = matchStatus(line, STORED, CREATED_STORED, - NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, - TYPE_MISMATCH, BKEY_MISMATCH); - - if (!status.isSuccess()) { - successAll = false; - } - - cb.gotStatus(index, status); - index++; - } + protected OperationStatus checkStatus(String line) { + return matchStatus(line, STORED, CREATED_STORED, + NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, + TYPE_MISMATCH, BKEY_MISMATCH); } @Override - protected void handleError(OperationErrorType eType, String line) throws IOException { - if (!readUntilLastLine) { - // this case means that error message came without 'RESPONSE '. - // so it doesn't need to read 'PIPE_ERROR'. - super.handleError(eType, line); - } else { - // this case means that error message came after 'RESPONSE '. - // so it needs to read 'PIPE_ERROR'. - getLogger().error("Error: %s by %s", line, this); - exception = new OperationException(eType, line + " @ " + getHandlingNode().getNodeName()); - } - } - - @Override - public void initialize() { - ByteBuffer buffer = insert.getAsciiCommand(); - setBuffer(buffer); - readUntilLastLine = false; - - if (getLogger().isDebugEnabled()) { - getLogger().debug("Request in ascii protocol: %s", - (new String(buffer.array())).replace("\r\n", "\\r\\n")); - } - } - - @Override - protected void wasCancelled() { - getCallback().receivedStatus(STORE_CANCELED); - } - - public Collection getKeys() { - return Collections.singleton(key); - } - public CollectionPipedInsert getInsert() { - return insert; + return (CollectionPipedInsert) getCollectionPipe(); } - - @Override - public boolean isBulkOperation() { - return false; - } - - @Override - public boolean isPipeOperation() { - return true; - } - - @Override - public boolean isIdempotentOperation() { - return !(insert instanceof CollectionPipedInsert.ListPipedInsert); - } - } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedUpdateOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedUpdateOperationImpl.java index 8702bea15..38b52951b 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedUpdateOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedUpdateOperationImpl.java @@ -17,192 +17,43 @@ */ package net.spy.memcached.protocol.ascii; -import java.nio.ByteBuffer; -import java.util.Collection; import java.util.Collections; import net.spy.memcached.collection.CollectionPipedUpdate; import net.spy.memcached.collection.CollectionPipedUpdate.BTreePipedUpdate; import net.spy.memcached.collection.CollectionPipedUpdate.MapPipedUpdate; -import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.ops.APIType; -import net.spy.memcached.ops.CollectionOperationStatus; import net.spy.memcached.ops.CollectionPipedUpdateOperation; import net.spy.memcached.ops.OperationCallback; -import net.spy.memcached.ops.OperationState; import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.OperationType; -import net.spy.memcached.ops.PipedOperationCallback; /** * Operation to update collection data in a memcached server. */ -public final class CollectionPipedUpdateOperationImpl extends OperationImpl implements +public final class CollectionPipedUpdateOperationImpl extends PipeOperationImpl implements CollectionPipedUpdateOperation { - private static final OperationStatus STORE_CANCELED = new CollectionOperationStatus( - false, "collection canceled", CollectionResponse.CANCELED); - - private static final OperationStatus END = new CollectionOperationStatus( - true, "END", CollectionResponse.END); - private static final OperationStatus FAILED_END = new CollectionOperationStatus( - false, "END", CollectionResponse.END); - - private static final OperationStatus UPDATED = new CollectionOperationStatus( - true, "UPDATED", CollectionResponse.UPDATED); - private static final OperationStatus NOT_FOUND = new CollectionOperationStatus( - false, "NOT_FOUND", CollectionResponse.NOT_FOUND); - private static final OperationStatus NOT_FOUND_ELEMENT = new CollectionOperationStatus( - false, "NOT_FOUND_ELEMENT", CollectionResponse.NOT_FOUND_ELEMENT); - private static final OperationStatus NOTHING_TO_UPDATE = new CollectionOperationStatus( - false, "NOTHING_TO_UPDATE", CollectionResponse.NOTHING_TO_UPDATE); - private static final OperationStatus TYPE_MISMATCH = new CollectionOperationStatus( - false, "TYPE_MISMATCH", CollectionResponse.TYPE_MISMATCH); - private static final OperationStatus BKEY_MISMATCH = new CollectionOperationStatus( - false, "BKEY_MISMATCH", CollectionResponse.BKEY_MISMATCH); - private static final OperationStatus EFLAG_MISMATCH = new CollectionOperationStatus( - false, "EFLAG_MISMATCH", CollectionResponse.EFLAG_MISMATCH); - - private final String key; - private final CollectionPipedUpdate update; - private final PipedOperationCallback cb; - - private int count; - private int index = 0; - private boolean successAll = true; - public CollectionPipedUpdateOperationImpl(String key, - CollectionPipedUpdate update, - OperationCallback cb) { - super(cb); - this.key = key; - this.update = update; - this.cb = (PipedOperationCallback) cb; - if (this.update instanceof BTreePipedUpdate) { + CollectionPipedUpdate update, OperationCallback cb) { + super(Collections.singletonList(key), update, cb); + if (update instanceof BTreePipedUpdate) { setAPIType(APIType.BOP_UPDATE); - } else if (this.update instanceof MapPipedUpdate) { + } else if (update instanceof MapPipedUpdate) { setAPIType(APIType.MOP_UPDATE); } setOperationType(OperationType.WRITE); } @Override - public void handleLine(String line) { - assert getState() == OperationState.READING : "Read ``" + line - + "'' when in " + getState() + " state"; - - /* ENABLE_REPLICATION if */ - if (hasSwitchedOver(line)) { - this.update.setNextOpIndex(index); - prepareSwitchover(line); - return; - } - /* ENABLE_REPLICATION end */ - /* ENABLE_MIGRATION if */ - if (hasNotMyKey(line)) { - // Only one NOT_MY_KEY is provided in response of single key piped operation when redirection. - addRedirectSingleKeyOperation(line, key); - if (update.isNotPiped()) { - transitionState(OperationState.REDIRECT); - } else { - update.setNextOpIndex(index); - } - return; - } - /* ENABLE_MIGRATION end */ - - if (update.isNotPiped()) { - OperationStatus status = matchStatus(line, UPDATED, NOT_FOUND, - NOT_FOUND_ELEMENT, NOTHING_TO_UPDATE, TYPE_MISMATCH, - BKEY_MISMATCH, EFLAG_MISMATCH); - if (!status.isSuccess()) { - successAll = false; - } - - cb.gotStatus(index, status); - cb.receivedStatus((successAll) ? END : FAILED_END); - transitionState(OperationState.COMPLETE); - return; - } - - /* - RESPONSE \r\n - \r\n - [ ... ] - \r\n - END|PIPE_ERROR \r\n - */ - if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { - /* ENABLE_MIGRATION if */ - if (needRedirect()) { - transitionState(OperationState.REDIRECT); - return; - } - /* ENABLE_MIGRATION end */ - cb.receivedStatus((successAll) ? END : FAILED_END); - transitionState(OperationState.COMPLETE); - } else if (line.startsWith("RESPONSE ")) { - getLogger().debug("Got line %s", line); - - // TODO server should be fixed - line = line.replace(" ", " "); - line = line.replace(" ", " "); - - String[] stuff = line.split(" "); - assert "RESPONSE".equals(stuff[0]); - count = Integer.parseInt(stuff[1]); - } else { - OperationStatus status = matchStatus(line, UPDATED, NOT_FOUND, - NOT_FOUND_ELEMENT, NOTHING_TO_UPDATE, TYPE_MISMATCH, - BKEY_MISMATCH, EFLAG_MISMATCH); - - if (!status.isSuccess()) { - successAll = false; - } - - cb.gotStatus(index, status); - index++; - } - } - - @Override - public void initialize() { - ByteBuffer buffer = update.getAsciiCommand(); - setBuffer(buffer); - - if (getLogger().isDebugEnabled()) { - getLogger().debug( - "Request in ascii protocol: %s", - (new String(buffer.array())).replace("\r\n", "\\r\\n")); - } + protected OperationStatus checkStatus(String line) { + return matchStatus(line, UPDATED, NOT_FOUND, + NOT_FOUND_ELEMENT, NOTHING_TO_UPDATE, TYPE_MISMATCH, + BKEY_MISMATCH, EFLAG_MISMATCH); } @Override - protected void wasCancelled() { - getCallback().receivedStatus(STORE_CANCELED); - } - - public Collection getKeys() { - return Collections.singleton(key); - } - public CollectionPipedUpdate getUpdate() { - return update; - } - - @Override - public boolean isBulkOperation() { - return false; - } - - @Override - public boolean isPipeOperation() { - return true; + return (CollectionPipedUpdate) getCollectionPipe(); } - - @Override - public boolean isIdempotentOperation() { - return true; - } - } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java new file mode 100644 index 000000000..f982f7334 --- /dev/null +++ b/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java @@ -0,0 +1,232 @@ +package net.spy.memcached.protocol.ascii; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import net.spy.memcached.collection.CollectionBulkInsert; +import net.spy.memcached.collection.CollectionPipe; +import net.spy.memcached.collection.CollectionPipedInsert; +import net.spy.memcached.collection.CollectionResponse; +import net.spy.memcached.ops.CollectionOperationStatus; +import net.spy.memcached.ops.OperationCallback; +import net.spy.memcached.ops.OperationErrorType; +import net.spy.memcached.ops.OperationException; +import net.spy.memcached.ops.OperationState; +import net.spy.memcached.ops.OperationStatus; +import net.spy.memcached.ops.PipedOperationCallback; + +abstract class PipeOperationImpl extends OperationImpl { + + protected static final OperationStatus CANCELED = new CollectionOperationStatus( + false, "collection canceled", CollectionResponse.CANCELED); + + protected static final OperationStatus END = new CollectionOperationStatus( + true, "END", CollectionResponse.END); + protected static final OperationStatus FAILED_END = new CollectionOperationStatus( + false, "END", CollectionResponse.END); + + protected static final OperationStatus CREATED_STORED = new CollectionOperationStatus( + true, "CREATED_STORED", CollectionResponse.CREATED_STORED); + protected static final OperationStatus STORED = new CollectionOperationStatus( + true, "STORED", CollectionResponse.STORED); + protected static final OperationStatus UPDATED = new CollectionOperationStatus( + true, "UPDATED", CollectionResponse.UPDATED); + protected static final OperationStatus NOT_FOUND = new CollectionOperationStatus( + false, "NOT_FOUND", CollectionResponse.NOT_FOUND); + protected static final OperationStatus NOT_FOUND_ELEMENT = new CollectionOperationStatus( + false, "NOT_FOUND_ELEMENT", CollectionResponse.NOT_FOUND_ELEMENT); + protected static final OperationStatus NOTHING_TO_UPDATE = new CollectionOperationStatus( + false, "NOTHING_TO_UPDATE", CollectionResponse.NOTHING_TO_UPDATE); + protected static final OperationStatus ELEMENT_EXISTS = new CollectionOperationStatus( + false, "ELEMENT_EXISTS", CollectionResponse.ELEMENT_EXISTS); + protected static final OperationStatus OVERFLOWED = new CollectionOperationStatus( + false, "OVERFLOWED", CollectionResponse.OVERFLOWED); + protected static final OperationStatus OUT_OF_RANGE = new CollectionOperationStatus( + false, "OUT_OF_RANGE", CollectionResponse.OUT_OF_RANGE); + protected static final OperationStatus TYPE_MISMATCH = new CollectionOperationStatus( + false, "TYPE_MISMATCH", CollectionResponse.TYPE_MISMATCH); + protected static final OperationStatus BKEY_MISMATCH = new CollectionOperationStatus( + false, "BKEY_MISMATCH", CollectionResponse.BKEY_MISMATCH); + protected static final OperationStatus EFLAG_MISMATCH = new CollectionOperationStatus( + false, "EFLAG_MISMATCH", CollectionResponse.EFLAG_MISMATCH); + + protected static final OperationStatus EXIST = new CollectionOperationStatus( + true, "EXIST", CollectionResponse.EXIST); + protected static final OperationStatus NOT_EXIST = new CollectionOperationStatus( + true, "NOT_EXIST", CollectionResponse.NOT_EXIST); + protected static final OperationStatus UNREADABLE = new CollectionOperationStatus( + false, "UNREADABLE", CollectionResponse.UNREADABLE); + + protected boolean successAll = true; + + private final CollectionPipe collectionPipe; + private final PipedOperationCallback cb; + private final List keys; + private final boolean isIdempotent; + + private int index = 0; + private boolean readUntilLastLine = false; + + protected PipeOperationImpl(List keys, CollectionPipe collectionPipe, + OperationCallback cb) { + super(cb); + this.cb = (PipedOperationCallback) cb; + if (keys == null || keys.isEmpty()) { + throw new IllegalArgumentException("No keys provided"); + } + this.keys = keys; + this.collectionPipe = collectionPipe; + this.isIdempotent = !(collectionPipe instanceof CollectionPipedInsert.ListPipedInsert || + collectionPipe instanceof CollectionBulkInsert.ListBulkInsert); + } + + @Override + public void handleLine(String line) { + assert getState() == OperationState.READING + : "Read ``" + line + "'' when in " + getState() + " state"; + + /* ENABLE_REPLICATION if */ + if (isWriteOperation() && hasSwitchedOver(line)) { + this.collectionPipe.setNextOpIndex(index); + prepareSwitchover(line); + return; + } + /* ENABLE_REPLICATION end */ + + /* ENABLE_MIGRATION if */ + if (hasNotMyKey(line)) { + if (isBulkOperation()) { + addRedirectMultiKeyOperation(line, keys.get(index)); + if (collectionPipe.isNotPiped()) { + transitionState(OperationState.REDIRECT); + } else { + index++; + } + } else { + // Only one NOT_MY_KEY is provided in response of + // single key piped operation when redirection. + addRedirectSingleKeyOperation(line, keys.get(0)); + if (collectionPipe.isNotPiped()) { + transitionState(OperationState.REDIRECT); + } else { + collectionPipe.setNextOpIndex(index); + } + } + return; + } + /* ENABLE_MIGRATION end */ + + if (collectionPipe.isNotPiped()) { + OperationStatus status = checkStatus(line); + if (!status.isSuccess()) { + successAll = false; + } + cb.gotStatus(index, status); + + cb.receivedStatus((successAll) ? END : FAILED_END); + transitionState(OperationState.COMPLETE); + return; + } + + /* + RESPONSE \r\n + \r\n + [ ... ] + \r\n + END|PIPE_ERROR \r\n + */ + if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { + /* ENABLE_MIGRATION if */ + if (needRedirect()) { + transitionState(OperationState.REDIRECT); + return; + } + /* ENABLE_MIGRATION end */ + cb.receivedStatus((successAll) ? END : FAILED_END); + transitionState(OperationState.COMPLETE); + } else if (line.startsWith("RESPONSE ")) { + getLogger().debug("Got line %s", line); + + // TODO server should be fixed + line = line.replace(" ", " "); + line = line.replace(" ", " "); + + String[] stuff = line.split(" "); + assert "RESPONSE".equals(stuff[0]); + readUntilLastLine = true; + } else { + OperationStatus status = checkStatus(line); + if (!status.isSuccess()) { + successAll = false; + } + cb.gotStatus(index, status); + + index++; + } + } + + @Override + protected void handleError(OperationErrorType eType, String line) throws IOException { + if (!readUntilLastLine) { + // this case means that error message came without 'RESPONSE '. + // so it doesn't need to read 'PIPE_ERROR'. + super.handleError(eType, line); + } else { + // this case means that error message came after 'RESPONSE '. + // so it needs to read 'PIPE_ERROR'. + getLogger().error("Error: %s by %s", line, this); + exception = new OperationException(eType, line + " @ " + getHandlingNode().getNodeName()); + } + } + + /** + * call matchStatus() method with proper statuses in the child class + * + * @param line line that is read from the server + * @return status that is matched with the line + */ + protected abstract OperationStatus checkStatus(String line); + + @Override + public void initialize() { + ByteBuffer buffer = collectionPipe.getAsciiCommand(); + setBuffer(buffer); + readUntilLastLine = false; + + if (getLogger().isDebugEnabled()) { + getLogger().debug("Request in ascii protocol: %s", + (new String(buffer.array())).replace("\r\n", "\\r\\n")); + } + } + + @Override + protected void wasCancelled() { + getCallback().receivedStatus(CANCELED); + } + + public Collection getKeys() { + return Collections.unmodifiableList(keys); + } + + public CollectionPipe getCollectionPipe() { + return collectionPipe; + } + + @Override + public boolean isBulkOperation() { + return false; + } + + @Override + public final boolean isPipeOperation() { + return true; + } + + @Override + public boolean isIdempotentOperation() { + return isIdempotent; + } +}