Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking โ€œSign up for GitHubโ€, you agree to our terms of service and privacy statement. Weโ€™ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INTERNAL: add PipeOperationImpl to reduce duplicate code #872

Merged
merged 1 commit into from
Jan 31, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -17,187 +17,48 @@
*/
package net.spy.memcached.protocol.ascii;

import java.nio.ByteBuffer;
import java.util.Collection;

import net.spy.memcached.collection.CollectionBulkInsert;
import net.spy.memcached.collection.CollectionResponse;
import net.spy.memcached.ops.APIType;
import net.spy.memcached.ops.CollectionBulkInsertOperation;
import net.spy.memcached.ops.CollectionOperationStatus;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.OperationType;
import net.spy.memcached.ops.PipedOperationCallback;

/**
* Operation to store collection data in a memcached server.
*/
public final class CollectionBulkInsertOperationImpl extends OperationImpl
public final class CollectionBulkInsertOperationImpl extends PipeOperationImpl
implements CollectionBulkInsertOperation {

private static final OperationStatus STORE_CANCELED = new CollectionOperationStatus(
false, "collection canceled", CollectionResponse.CANCELED);

private static final OperationStatus END = new CollectionOperationStatus(
true, "END", CollectionResponse.END);
private static final OperationStatus FAILED_END = new CollectionOperationStatus(
false, "END", CollectionResponse.END);

private static final OperationStatus CREATED_STORED = new CollectionOperationStatus(
true, "CREATED_STORED", CollectionResponse.CREATED_STORED);
private static final OperationStatus STORED = new CollectionOperationStatus(
true, "STORED", CollectionResponse.STORED);
private static final OperationStatus NOT_FOUND = new CollectionOperationStatus(
false, "NOT_FOUND", CollectionResponse.NOT_FOUND);
private static final OperationStatus ELEMENT_EXISTS = new CollectionOperationStatus(
false, "ELEMENT_EXISTS", CollectionResponse.ELEMENT_EXISTS);
private static final OperationStatus OVERFLOWED = new CollectionOperationStatus(
false, "OVERFLOWED", CollectionResponse.OVERFLOWED);
private static final OperationStatus OUT_OF_RANGE = new CollectionOperationStatus(
false, "OUT_OF_RANGE", CollectionResponse.OUT_OF_RANGE);
private static final OperationStatus TYPE_MISMATCH = new CollectionOperationStatus(
false, "TYPE_MISMATCH", CollectionResponse.TYPE_MISMATCH);
private static final OperationStatus BKEY_MISMATCH = new CollectionOperationStatus(
false, "BKEY_MISMATCH", CollectionResponse.BKEY_MISMATCH);

private final CollectionBulkInsert<?> insert;
private final PipedOperationCallback cb;

private int count;
private int index = 0;
private boolean successAll = true;

public CollectionBulkInsertOperationImpl(CollectionBulkInsert<?> insert, OperationCallback cb) {
super(cb);
this.insert = insert;
this.cb = (PipedOperationCallback) cb;
if (this.insert instanceof CollectionBulkInsert.ListBulkInsert) {
super(insert.getKeyList(), insert, cb);
if (insert instanceof CollectionBulkInsert.ListBulkInsert) {
setAPIType(APIType.LOP_INSERT);
} else if (this.insert instanceof CollectionBulkInsert.SetBulkInsert) {
} else if (insert instanceof CollectionBulkInsert.SetBulkInsert) {
setAPIType(APIType.SOP_INSERT);
} else if (this.insert instanceof CollectionBulkInsert.MapBulkInsert) {
} else if (insert instanceof CollectionBulkInsert.MapBulkInsert) {
setAPIType(APIType.MOP_INSERT);
} else if (this.insert instanceof CollectionBulkInsert.BTreeBulkInsert) {
} else if (insert instanceof CollectionBulkInsert.BTreeBulkInsert) {
setAPIType(APIType.BOP_INSERT);
}
setOperationType(OperationType.WRITE);
}

@Override
public void handleLine(String line) {
assert getState() == OperationState.READING
: "Read ``" + line + "'' when in " + getState() + " state";
/* ENABLE_REPLICATION if */
if (hasSwitchedOver(line)) {
this.insert.setNextOpIndex(index);
prepareSwitchover(line);
return;
}
/* ENABLE_REPLICATION end */
/* ENABLE_MIGRATION if */
if (hasNotMyKey(line)) {
addRedirectMultiKeyOperation(line, insert.getKey(index));
if (insert.isNotPiped()) {
transitionState(OperationState.REDIRECT);
} else {
index++;
}
return;
}
/* ENABLE_MIGRATION end */
if (insert.isNotPiped()) {
OperationStatus status = matchStatus(line, STORED, CREATED_STORED,
NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
TYPE_MISMATCH, BKEY_MISMATCH);
if (!status.isSuccess()) {
successAll = false;
}

cb.gotStatus(index, status);
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
return;
}

/*
RESPONSE <count>\r\n
<status of the 1st pipelined command>\r\n
[ ... ]
<status of the last pipelined command>\r\n
END|PIPE_ERROR <error_string>\r\n
*/
if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) {
/* ENABLE_MIGRATION if */
if (needRedirect()) {
transitionState(OperationState.REDIRECT);
return;
}
/* ENABLE_MIGRATION end */
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("RESPONSE ")) {
getLogger().debug("Got line %s", line);

// TODO server should be fixed
line = line.replace(" ", " ");
line = line.replace(" ", " ");

String[] stuff = line.split(" ");
assert "RESPONSE".equals(stuff[0]);
count = Integer.parseInt(stuff[1]);
} else {
OperationStatus status = matchStatus(line, STORED, CREATED_STORED,
NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
TYPE_MISMATCH, BKEY_MISMATCH);

if (!status.isSuccess()) {
successAll = false;
}

cb.gotStatus(index, status);
index++;
}
}

@Override
public void initialize() {
ByteBuffer buffer = insert.getAsciiCommand();
setBuffer(buffer);

if (getLogger().isDebugEnabled()) {
getLogger().debug("Request in ascii protocol: %s",
(new String(buffer.array())).replace("\r\n", "\\r\\n"));
}
protected OperationStatus checkStatus(String line) {
return matchStatus(line, STORED, CREATED_STORED,
NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
TYPE_MISMATCH, BKEY_MISMATCH);
}

@Override
protected void wasCancelled() {
getCallback().receivedStatus(STORE_CANCELED);
}

public Collection<String> getKeys() {
return insert.getKeyList();
}

public CollectionBulkInsert<?> getInsert() {
return insert;
return (CollectionBulkInsert<?>) getCollectionPipe();
}

@Override
public boolean isBulkOperation() {
return true;
}

@Override
public boolean isPipeOperation() {
return true;
}

@Override
public boolean isIdempotentOperation() {
return !(insert instanceof CollectionBulkInsert.ListBulkInsert);
}

}
Original file line number Diff line number Diff line change
@@ -17,172 +17,34 @@
*/
package net.spy.memcached.protocol.ascii;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;

import net.spy.memcached.collection.CollectionResponse;
import net.spy.memcached.collection.SetPipedExist;
import net.spy.memcached.ops.APIType;
import net.spy.memcached.ops.CollectionOperationStatus;
import net.spy.memcached.ops.CollectionPipedExistOperation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.OperationType;
import net.spy.memcached.ops.PipedOperationCallback;

public final class CollectionPipedExistOperationImpl extends OperationImpl implements
public final class CollectionPipedExistOperationImpl extends PipeOperationImpl implements
CollectionPipedExistOperation {

private static final OperationStatus EXIST_CANCELED = new CollectionOperationStatus(
false, "collection canceled", CollectionResponse.CANCELED);

private static final OperationStatus EXIST = new CollectionOperationStatus(
true, "EXIST", CollectionResponse.EXIST);
private static final OperationStatus NOT_EXIST = new CollectionOperationStatus(
true, "NOT_EXIST", CollectionResponse.NOT_EXIST);

private static final OperationStatus END = new CollectionOperationStatus(
true, "END", CollectionResponse.END);
private static final OperationStatus FAILED_END = new CollectionOperationStatus(
false, "END", CollectionResponse.END);

private static final OperationStatus NOT_FOUND = new CollectionOperationStatus(
false, "NOT_FOUND", CollectionResponse.NOT_FOUND);
private static final OperationStatus TYPE_MISMATCH = new CollectionOperationStatus(
false, "TYPE_MISMATCH", CollectionResponse.TYPE_MISMATCH);
private static final OperationStatus UNREADABLE = new CollectionOperationStatus(
false, "UNREADABLE", CollectionResponse.UNREADABLE);

private final String key;
private final SetPipedExist<?> setPipedExist;
private final PipedOperationCallback cb;

private int count;
private int index = 0;
private boolean successAll = true;

public CollectionPipedExistOperationImpl(String key,
SetPipedExist<?> collectionExist,
OperationCallback cb) {
super(cb);
this.key = key;
this.setPipedExist = collectionExist;
this.cb = (PipedOperationCallback) cb;
if (this.setPipedExist instanceof SetPipedExist) {
setAPIType(APIType.SOP_EXIST);
}
SetPipedExist<?> collectionExist, OperationCallback cb) {
super(Collections.singletonList(key), collectionExist, cb);
setAPIType(APIType.SOP_EXIST);
setOperationType(OperationType.READ);
}

@Override
public void handleLine(String line) {
assert getState() == OperationState.READING : "Read ``" + line
+ "'' when in " + getState() + " state";

/* ENABLE_MIGRATION if */
if (hasNotMyKey(line)) {
// Only one NOT_MY_KEY is provided in response of single key piped operation when redirection.
addRedirectSingleKeyOperation(line, key);
if (setPipedExist.isNotPiped()) {
transitionState(OperationState.REDIRECT);
} else {
setPipedExist.setNextOpIndex(index);
}
return;
}
/* ENABLE_MIGRATION end */

if (setPipedExist.isNotPiped()) {
OperationStatus status = matchStatus(line, EXIST, NOT_EXIST,
NOT_FOUND, TYPE_MISMATCH, UNREADABLE);
if (!status.isSuccess()) {
successAll = false;
}

cb.gotStatus(index, status);
cb.receivedStatus(successAll ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
return;
}

/*
RESPONSE <count>\r\n
<status of the 1st pipelined command>\r\n
[ ... ]
<status of the last pipelined command>\r\n
END|PIPE_ERROR <error_string>\r\n
*/
if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) {
/* ENABLE_MIGRATION if */
if (needRedirect()) {
transitionState(OperationState.REDIRECT);
return;
}
/* ENABLE_MIGRATION end */
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("RESPONSE ")) {
getLogger().debug("Got line %s", line);

// TODO server should be fixed
line = line.replace(" ", " ");
line = line.replace(" ", " ");

String[] stuff = line.split(" ");
assert "RESPONSE".equals(stuff[0]);
count = Integer.parseInt(stuff[1]);
} else {
OperationStatus status = matchStatus(line, EXIST, NOT_EXIST,
NOT_FOUND, TYPE_MISMATCH, UNREADABLE);

if (!status.isSuccess()) {
successAll = false;
}

cb.gotStatus(index, status);
index++;
}
protected OperationStatus checkStatus(String line) {
return matchStatus(line, EXIST, NOT_EXIST,
NOT_FOUND, TYPE_MISMATCH, UNREADABLE);
}

@Override
public void initialize() {
ByteBuffer buffer = setPipedExist.getAsciiCommand();
setBuffer(buffer);

if (getLogger().isDebugEnabled()) {
getLogger().debug("Request in ascii protocol: %s",
(new String(buffer.array())).replace("\r\n", "\\r\\n"));
}
}

@Override
protected void wasCancelled() {
getCallback().receivedStatus(EXIST_CANCELED);
}

public Collection<String> getKeys() {
return Collections.singleton(key);
}

public SetPipedExist<?> getExist() {
return setPipedExist;
}

@Override
public boolean isBulkOperation() {
return false;
}

@Override
public boolean isPipeOperation() {
return true;
}

@Override
public boolean isIdempotentOperation() {
return true;
return (SetPipedExist<?>) getCollectionPipe();
}

}
Loading