From 39bfa8d5e34c8412cc7f20579345f5db22e9aec8 Mon Sep 17 00:00:00 2001 From: skyjiang <470623352@qq.com> Date: Wed, 18 Jan 2023 10:55:12 +0800 Subject: [PATCH] Support RESP3 --- .../redis/AbstractCollectionRedisMessage.java | 74 ++++ .../codec/redis/AbstractMapRedisMessage.java | 80 ++++ .../redis/AbstractNumberRedisMessage.java | 66 +++ .../redis/AbstractStringRedisMessage.java | 2 +- .../redis/AggregatedHeaderRedisMessage.java | 63 +++ .../codec/redis/AggregatedRedisMessage.java | 22 + .../codec/redis/ArrayHeaderRedisMessage.java | 34 +- .../codec/redis/ArrayRedisMessage.java | 42 +- .../codec/redis/BigNumberRedisMessage.java | 63 +++ .../codec/redis/BooleanRedisMessage.java | 59 +++ .../BulkErrorStringHeaderRedisMessage.java | 31 ++ .../BulkVerbatimStringHeaderRedisMessage.java | 35 ++ .../codec/redis/DoubleRedisMessage.java | 47 +++ .../FullBulkErrorStringRedisMessage.java | 33 ++ .../FullBulkVerbatimStringRedisMessage.java | 67 +++ .../codec/redis/IntegerRedisMessage.java | 17 +- .../codec/redis/MapHeaderRedisMessage.java | 35 ++ .../handler/codec/redis/MapRedisMessage.java | 79 ++++ .../handler/codec/redis/NullRedisMessage.java | 30 ++ .../codec/redis/PushHeaderRedisMessage.java | 34 ++ .../handler/codec/redis/PushRedisMessage.java | 48 +++ .../codec/redis/RedisArrayAggregator.java | 56 ++- .../redis/RedisBulkStringAggregator.java | 14 +- .../handler/codec/redis/RedisCodecUtil.java | 4 + .../handler/codec/redis/RedisConstants.java | 14 + .../handler/codec/redis/RedisDecoder.java | 224 ++++++---- .../handler/codec/redis/RedisEncoder.java | 156 +++++-- .../codec/redis/RedisMapAggregator.java | 112 +++++ .../handler/codec/redis/RedisMessageType.java | 34 +- .../codec/redis/SetHeaderRedisMessage.java | 34 ++ .../handler/codec/redis/SetRedisMessage.java | 92 ++++ .../handler/codec/redis/RedisDecoderTest.java | 398 +++++++++++++++--- .../handler/codec/redis/RedisEncoderTest.java | 180 ++++++++ 33 files changed, 2007 insertions(+), 272 deletions(-) create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AbstractCollectionRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AbstractMapRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AbstractNumberRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AggregatedHeaderRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AggregatedRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/BigNumberRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/BooleanRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/BulkErrorStringHeaderRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/BulkVerbatimStringHeaderRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/DoubleRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/FullBulkErrorStringRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/FullBulkVerbatimStringRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/MapHeaderRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/MapRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/NullRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/PushHeaderRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/PushRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisMapAggregator.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/SetHeaderRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/SetRedisMessage.java diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AbstractCollectionRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AbstractCollectionRedisMessage.java new file mode 100644 index 0000000..7b934a3 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AbstractCollectionRedisMessage.java @@ -0,0 +1,74 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.StringUtil; +import io.netty.util.internal.UnstableApi; + +import java.util.Collection; + +import static java.util.Objects.requireNonNull; + +/** + * Abstract class for Aggregate data types message. + */ +@UnstableApi +public abstract class AbstractCollectionRedisMessage extends AbstractReferenceCounted + implements AggregatedRedisMessage { + + protected final Collection children; + + protected AbstractCollectionRedisMessage(Collection children) { + this.children = requireNonNull(children, "children"); + } + + protected abstract Collection children(); + + @Override + protected void deallocate() { + for (RedisMessage msg : children) { + ReferenceCountUtil.release(msg); + } + } + + @Override + public AbstractCollectionRedisMessage touch(Object hint) { + for (RedisMessage msg : children) { + ReferenceCountUtil.touch(msg); + } + return this; + } + + /** + * Returns whether the content of this message is {@code null}. + * + * @return indicates whether the content of this message is {@code null}. + */ + public boolean isNull() { + return false; + } + + @Override + public String toString() { + return new StringBuilder(StringUtil.simpleClassName(this)) + .append('[') + .append("children=") + .append(children.size()) + .append(']').toString(); + } +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AbstractMapRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AbstractMapRedisMessage.java new file mode 100644 index 0000000..3e9f42e --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AbstractMapRedisMessage.java @@ -0,0 +1,80 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.StringUtil; +import io.netty.util.internal.UnstableApi; + +import java.util.Collections; +import java.util.Map; + +@UnstableApi +public abstract class AbstractMapRedisMessage extends AbstractReferenceCounted + implements AggregatedRedisMessage { + + private final Map children; + + protected AbstractMapRedisMessage(Map children) { + this.children = Collections.unmodifiableMap(children); + } + + /** + * Get children of this Map. It can be null or empty. + * + * @return Map of {@link RedisMessage}s. + */ + public Map children() { + return children; + } + + @Override + protected void deallocate() { + for (Map.Entry messageEntry : children.entrySet()) { + ReferenceCountUtil.release(messageEntry.getKey()); + ReferenceCountUtil.release(messageEntry.getValue()); + } + } + + @Override + public AbstractMapRedisMessage touch(Object hint) { + for (Map.Entry messageEntry : children.entrySet()) { + ReferenceCountUtil.touch(messageEntry.getKey()); + ReferenceCountUtil.touch(messageEntry.getValue()); + } + return this; + } + + /** + * Returns whether the content of this message is {@code null}. + * + * @return indicates whether the content of this message is {@code null}. + */ + public boolean isNull() { + return false; + } + + @Override + public String toString() { + return new StringBuilder(StringUtil.simpleClassName(this)) + .append('[') + .append("children=") + .append(children.size()) + .append(']').toString(); + } + +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AbstractNumberRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AbstractNumberRedisMessage.java new file mode 100644 index 0000000..7021bca --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AbstractNumberRedisMessage.java @@ -0,0 +1,66 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.util.internal.StringUtil; +import io.netty.util.internal.UnstableApi; + +import java.math.BigInteger; + +/** + * Abstract class for Number types message. + */ +@UnstableApi +public abstract class AbstractNumberRedisMessage implements RedisMessage { + + protected final Number value; + + /** + * For create a {@link IntegerRedisMessage} the given int {@code value}. + * + * @param value the message content. + */ + protected AbstractNumberRedisMessage(long value) { + this.value = value; + } + + /** + * For create a {@link DoubleRedisMessage} the given double {@code value}. + * + * @param value the message content. + */ + protected AbstractNumberRedisMessage(double value) { + this.value = value; + } + + /** + * For create a {@link BigNumberRedisMessage} the given BigInteger {@code value}. + * + * @param value the message content. + */ + protected AbstractNumberRedisMessage(BigInteger value) { + this.value = value; + } + + @Override + public String toString() { + return new StringBuilder(StringUtil.simpleClassName(this)) + .append('[') + .append("value=") + .append(value) + .append(']').toString(); + } +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AbstractStringRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AbstractStringRedisMessage.java index dd814d6..97ea1bf 100644 --- a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AbstractStringRedisMessage.java +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AbstractStringRedisMessage.java @@ -27,7 +27,7 @@ public abstract class AbstractStringRedisMessage implements RedisMessage { private final String content; - AbstractStringRedisMessage(String content) { + protected AbstractStringRedisMessage(String content) { this.content = requireNonNull(content, "content"); } diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AggregatedHeaderRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AggregatedHeaderRedisMessage.java new file mode 100644 index 0000000..ec0c4b1 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AggregatedHeaderRedisMessage.java @@ -0,0 +1,63 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.util.internal.StringUtil; +import io.netty.util.internal.UnstableApi; + +/** + * Header of Redis Aggregated types Message. + */ +@UnstableApi +public abstract class AggregatedHeaderRedisMessage implements RedisMessage { + + private final long length; + + /** + * Creates a {@link AggregatedHeaderRedisMessage} for the given {@code length}. + */ + protected AggregatedHeaderRedisMessage(long length) { + if (length < RedisConstants.NULL_VALUE) { + throw new RedisCodecException("length: " + length + " (expected: >= " + RedisConstants.NULL_VALUE + ")"); + } + this.length = length; + } + + /** + * Get length of this array object. + */ + public final long length() { + return length; + } + + /** + * Returns whether the content of this message is {@code null}. + * + * @return indicates whether the content of this message is {@code null}. + */ + public boolean isNull() { + return length == RedisConstants.NULL_VALUE; + } + + @Override + public String toString() { + return new StringBuilder(StringUtil.simpleClassName(this)) + .append('[') + .append("length=") + .append(length) + .append(']').toString(); + } +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AggregatedRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AggregatedRedisMessage.java new file mode 100644 index 0000000..8b17bf9 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/AggregatedRedisMessage.java @@ -0,0 +1,22 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +/** + * Just a marker interface for Aggregated data types + */ +public interface AggregatedRedisMessage extends RedisMessage { +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/ArrayHeaderRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/ArrayHeaderRedisMessage.java index 1bd5a7f..fdbb72f 100644 --- a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/ArrayHeaderRedisMessage.java +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/ArrayHeaderRedisMessage.java @@ -14,49 +14,19 @@ */ package io.netty.contrib.handler.codec.redis; -import io.netty.util.internal.StringUtil; import io.netty.util.internal.UnstableApi; /** * Header of Redis Array Message. */ @UnstableApi -public class ArrayHeaderRedisMessage implements RedisMessage { - - private final long length; +public class ArrayHeaderRedisMessage extends AggregatedHeaderRedisMessage { /** * Creates a {@link ArrayHeaderRedisMessage} for the given {@code length}. */ public ArrayHeaderRedisMessage(long length) { - if (length < RedisConstants.NULL_VALUE) { - throw new RedisCodecException("length: " + length + " (expected: >= " + RedisConstants.NULL_VALUE + ')'); - } - this.length = length; - } - - /** - * Get length of this array object. - */ - public final long length() { - return length; + super(length); } - /** - * Returns whether the content of this message is {@code null}. - * - * @return indicates whether the content of this message is {@code null}. - */ - public boolean isNull() { - return length == RedisConstants.NULL_VALUE; - } - - @Override - public String toString() { - return new StringBuilder(StringUtil.simpleClassName(this)) - .append('[') - .append("length=") - .append(length) - .append(']').toString(); - } } diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/ArrayRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/ArrayRedisMessage.java index df48f39..3e35e73 100644 --- a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/ArrayRedisMessage.java +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/ArrayRedisMessage.java @@ -14,11 +14,7 @@ */ package io.netty.contrib.handler.codec.redis; -import static java.util.Objects.requireNonNull; - -import io.netty.util.AbstractReferenceCounted; import io.netty.util.ReferenceCountUtil; -import io.netty.util.internal.StringUtil; import io.netty.util.internal.UnstableApi; import java.util.Collections; @@ -28,12 +24,10 @@ * Arrays of RESP. */ @UnstableApi -public class ArrayRedisMessage extends AbstractReferenceCounted implements RedisMessage { - - private final List children; +public class ArrayRedisMessage extends AbstractCollectionRedisMessage { private ArrayRedisMessage() { - children = Collections.emptyList(); + super(Collections.emptyList()); } /** @@ -43,32 +37,17 @@ private ArrayRedisMessage() { */ public ArrayRedisMessage(List children) { // do not retain here. children are already retained when created. - this.children = requireNonNull(children, "children"); + super(children); } /** * Get children of this Arrays. It can be null or empty. * - * @return list of {@link RedisMessage}s. - */ - public final List children() { - return children; - } - - /** - * Returns whether the content of this message is {@code null}. - * - * @return indicates whether the content of this message is {@code null}. + * @return List of {@link RedisMessage}s. */ - public boolean isNull() { - return false; - } - @Override - protected void deallocate() { - for (RedisMessage msg : children) { - ReferenceCountUtil.release(msg); - } + public final List children() { + return (List) children; } @Override @@ -79,15 +58,6 @@ public ArrayRedisMessage touch(Object hint) { return this; } - @Override - public String toString() { - return new StringBuilder(StringUtil.simpleClassName(this)) - .append('[') - .append("children=") - .append(children.size()) - .append(']').toString(); - } - /** * A predefined null array instance for {@link ArrayRedisMessage}. */ diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/BigNumberRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/BigNumberRedisMessage.java new file mode 100644 index 0000000..3fce5e1 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/BigNumberRedisMessage.java @@ -0,0 +1,63 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.util.internal.UnstableApi; + +import java.math.BigInteger; + +/** + * Big number of RESP3. + */ +@UnstableApi +public final class BigNumberRedisMessage extends AbstractNumberRedisMessage { + + /** + * Creates a {@link BigNumberRedisMessage} for the given byte {@code content}. + * + * @param value the message content. + */ + public BigNumberRedisMessage(byte[] value) { + this(new String(value)); + } + + /** + * Creates a {@link BigNumberRedisMessage} for the given string {@code content}. + * + * @param value the message content. + */ + public BigNumberRedisMessage(String value) { + this(new BigInteger(value)); + } + + /** + * Creates a {@link BigNumberRedisMessage} for the given BigInteger {@code content}. + * + * @param value the message content. + */ + public BigNumberRedisMessage(BigInteger value) { + super(value); + } + + /** + * Get string represent the value of this {@link DoubleRedisMessage}. + * + * @return string value + */ + public String value() { + return value.toString(); + } +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/BooleanRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/BooleanRedisMessage.java new file mode 100644 index 0000000..7f27889 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/BooleanRedisMessage.java @@ -0,0 +1,59 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.util.internal.StringUtil; +import io.netty.util.internal.UnstableApi; + +/** + * Boolean of RESP3. + */ +@UnstableApi +public final class BooleanRedisMessage implements RedisMessage { + + private boolean value; + + public static final BooleanRedisMessage TRUE = new BooleanRedisMessage(true); + + public static final BooleanRedisMessage FALSE = new BooleanRedisMessage(false); + + /** + * Creates a {@link BooleanRedisMessage} for the given {@code value}. + * + * @param value true or false. + */ + private BooleanRedisMessage(boolean value) { + this.value = value; + } + + /** + * Get boolean value of this {@link BooleanRedisMessage}. + * + * @return boolean value + */ + public boolean value() { + return value; + } + + @Override + public String toString() { + return new StringBuilder(StringUtil.simpleClassName(this)) + .append('[') + .append("value=") + .append(value) + .append(']').toString(); + } +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/BulkErrorStringHeaderRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/BulkErrorStringHeaderRedisMessage.java new file mode 100644 index 0000000..45cfa42 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/BulkErrorStringHeaderRedisMessage.java @@ -0,0 +1,31 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.util.internal.UnstableApi; + +@UnstableApi +public final class BulkErrorStringHeaderRedisMessage extends BulkStringHeaderRedisMessage { + + /** + * Creates a {@link BulkErrorStringHeaderRedisMessage}. + * + * @param bulkStringLength follow content length. + */ + public BulkErrorStringHeaderRedisMessage(int bulkStringLength) { + super(bulkStringLength); + } +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/BulkVerbatimStringHeaderRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/BulkVerbatimStringHeaderRedisMessage.java new file mode 100644 index 0000000..658f616 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/BulkVerbatimStringHeaderRedisMessage.java @@ -0,0 +1,35 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.util.internal.UnstableApi; + +@UnstableApi +public final class BulkVerbatimStringHeaderRedisMessage extends BulkStringHeaderRedisMessage { + + /** + * Creates a {@link BulkVerbatimStringHeaderRedisMessage}. + * + * @param bulkStringLength follow content length. + */ + public BulkVerbatimStringHeaderRedisMessage(int bulkStringLength) { + super(bulkStringLength); + if (bulkStringLength < 4) { + throw new RedisCodecException("Verbatim String Length: " + bulkStringLength + + " must greater than or equal to 4"); + } + } +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/DoubleRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/DoubleRedisMessage.java new file mode 100644 index 0000000..bdc06ad --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/DoubleRedisMessage.java @@ -0,0 +1,47 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.util.internal.UnstableApi; + +/** + * Double of RESP3. + */ +@UnstableApi +public final class DoubleRedisMessage extends AbstractNumberRedisMessage { + + public static final DoubleRedisMessage POSITIVE_INFINITY = new DoubleRedisMessage(Double.MAX_VALUE); + + public static final DoubleRedisMessage NEGATIVE_INFINITY = new DoubleRedisMessage(Double.MIN_VALUE); + + /** + * Creates a {@link DoubleRedisMessage} for the given {@code content}. + * + * @param value the message content. + */ + public DoubleRedisMessage(double value) { + super(value); + } + + /** + * Get long value of this {@link DoubleRedisMessage}. + * + * @return double value + */ + public double value() { + return value.doubleValue(); + } +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/FullBulkErrorStringRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/FullBulkErrorStringRedisMessage.java new file mode 100644 index 0000000..4ef2835 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/FullBulkErrorStringRedisMessage.java @@ -0,0 +1,33 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.buffer.ByteBuf; +import io.netty.util.internal.UnstableApi; + +@UnstableApi +public final class FullBulkErrorStringRedisMessage extends FullBulkStringRedisMessage { + + /** + * Creates a {@link FullBulkErrorStringRedisMessage} for the given {@code content}. + * + * @param content the content, must not be {@code null}. If content is null or empty, + * use {@link NullRedisMessage#INSTANCE instead of constructor. + */ + public FullBulkErrorStringRedisMessage(ByteBuf content) { + super(content); + } +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/FullBulkVerbatimStringRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/FullBulkVerbatimStringRedisMessage.java new file mode 100644 index 0000000..590fa19 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/FullBulkVerbatimStringRedisMessage.java @@ -0,0 +1,67 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.util.internal.StringUtil; +import io.netty.util.internal.UnstableApi; + +@UnstableApi +public final class FullBulkVerbatimStringRedisMessage extends FullBulkStringRedisMessage { + + /** + * Creates a {@link FullBulkVerbatimStringRedisMessage} for the given {@code content}. + * The first three bytes represent the format of the following string, + * such as txt for plain text, mkd for markdown. The fourth byte is always `:`. + * Then the real string follows. + * + * @param content the content include format, must not be {@code null}. + */ + public FullBulkVerbatimStringRedisMessage(ByteBuf content) { + super(content); + } + + /** + * Return the format of the content, which can be `txt` for plain text. + * + * @return the format which length is always 3. + */ + public String format() { + return new String(ByteBufUtil.getBytes(content(), 0, 3)); + } + + /** + * Return the string represent the real content, which exclude format part. + * + * @return the real content. + */ + public String realContent() { + int length = super.content().writerIndex(); + return new String(ByteBufUtil.getBytes(content(), 4, length - 4)); + } + + @Override + public String toString() { + return new StringBuilder(StringUtil.simpleClassName(this)) + .append('[') + .append("format=") + .append(format()) + .append(", content=") + .append(realContent()) + .append(']').toString(); + } +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/IntegerRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/IntegerRedisMessage.java index 1b91f4c..dc045fc 100644 --- a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/IntegerRedisMessage.java +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/IntegerRedisMessage.java @@ -14,16 +14,13 @@ */ package io.netty.contrib.handler.codec.redis; -import io.netty.util.internal.StringUtil; import io.netty.util.internal.UnstableApi; /** * Integers of RESP. */ @UnstableApi -public final class IntegerRedisMessage implements RedisMessage { - - private final long value; +public final class IntegerRedisMessage extends AbstractNumberRedisMessage { /** * Creates a {@link IntegerRedisMessage} for the given {@code content}. @@ -31,7 +28,7 @@ public final class IntegerRedisMessage implements RedisMessage { * @param value the message content. */ public IntegerRedisMessage(long value) { - this.value = value; + super(value); } /** @@ -40,15 +37,7 @@ public IntegerRedisMessage(long value) { * @return long value */ public long value() { - return value; + return value.intValue(); } - @Override - public String toString() { - return new StringBuilder(StringUtil.simpleClassName(this)) - .append('[') - .append("value=") - .append(value) - .append(']').toString(); - } } diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/MapHeaderRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/MapHeaderRedisMessage.java new file mode 100644 index 0000000..071ac1d --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/MapHeaderRedisMessage.java @@ -0,0 +1,35 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.util.internal.UnstableApi; + +/** + * Header of Redis Map Message. the length represent the number of field-value pairs, + * but the number of redis message. + */ +@UnstableApi +public final class MapHeaderRedisMessage extends AggregatedHeaderRedisMessage { + + /** + * Creates a {@link MapHeaderRedisMessage} for the given {@code length}. + * + * @param length + */ + public MapHeaderRedisMessage(long length) { + super(length); + } +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/MapRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/MapRedisMessage.java new file mode 100644 index 0000000..ee24bdb --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/MapRedisMessage.java @@ -0,0 +1,79 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.util.internal.UnstableApi; + +import java.util.Collections; +import java.util.Map; + +@UnstableApi +public class MapRedisMessage extends AbstractMapRedisMessage { + + public MapRedisMessage() { + this(Collections.emptyMap()); + } + + /** + * Creates a {@link MapRedisMessage} for the given {@code content}. + * + * @param children the children. + */ + public MapRedisMessage(Map children) { + super(children); + } + + /** + * A predefined empty map instance for {@link MapRedisMessage}. + */ + public static final MapRedisMessage EMPTY_INSTANCE = new MapRedisMessage() { + + @Override + public MapRedisMessage retain() { + return this; + } + + @Override + public MapRedisMessage retain(int increment) { + return this; + } + + @Override + public MapRedisMessage touch() { + return this; + } + + @Override + public MapRedisMessage touch(Object hint) { + return this; + } + + @Override + public boolean release() { + return false; + } + + @Override + public boolean release(int decrement) { + return false; + } + + @Override + public String toString() { + return "EmptyMapRedisMessage"; + } + }; +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/NullRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/NullRedisMessage.java new file mode 100644 index 0000000..c3fbb1d --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/NullRedisMessage.java @@ -0,0 +1,30 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.util.internal.UnstableApi; + +/** + * NULL of RESP3. + */ +@UnstableApi +public final class NullRedisMessage implements RedisMessage { + + public static final NullRedisMessage INSTANCE = new NullRedisMessage(); + + private NullRedisMessage() { + } +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/PushHeaderRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/PushHeaderRedisMessage.java new file mode 100644 index 0000000..732e6b2 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/PushHeaderRedisMessage.java @@ -0,0 +1,34 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.util.internal.UnstableApi; + +/** + * Header of Redis Push Message. + */ +@UnstableApi +public final class PushHeaderRedisMessage extends AggregatedHeaderRedisMessage { + + /** + * Creates a {@link PushHeaderRedisMessage} for the given {@code length}. + * + * @param length + */ + public PushHeaderRedisMessage(long length) { + super(length); + } +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/PushRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/PushRedisMessage.java new file mode 100644 index 0000000..91016f1 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/PushRedisMessage.java @@ -0,0 +1,48 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.util.internal.UnstableApi; + +import java.util.Collections; +import java.util.List; + +@UnstableApi +public final class PushRedisMessage extends AbstractCollectionRedisMessage { + + private PushRedisMessage() { + super(Collections.emptySet()); + } + + /** + * Creates a {@link PushRedisMessage} for the given {@code content}. + * + * @param children the children. + */ + public PushRedisMessage(List children) { + super(children); + } + + /** + * Get children of this Set. It can be null or empty. + * + * @return List of {@link RedisMessage}s. + */ + @Override + public List children() { + return (List) children; + } +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisArrayAggregator.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisArrayAggregator.java index 4843488..f9e2682 100644 --- a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisArrayAggregator.java +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisArrayAggregator.java @@ -22,22 +22,28 @@ import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collection; import java.util.Deque; +import java.util.HashSet; import java.util.List; +import java.util.Set; /** - * Aggregates {@link RedisMessage} parts into {@link ArrayRedisMessage}. This decoder - * should be used together with {@link RedisDecoder}. + * Aggregates {@link RedisMessage} parts into {@link ArrayRedisMessage} or {@link SetRedisMessage}. + * This decoder should be used together with {@link RedisDecoder}. */ @UnstableApi public final class RedisArrayAggregator extends MessageToMessageDecoder { - private final Deque depths = new ArrayDeque<>(4); + private final Deque depths = new ArrayDeque(4); @Override protected void decode(ChannelHandlerContext ctx, RedisMessage msg) throws Exception { - if (msg instanceof ArrayHeaderRedisMessage) { - msg = decodeRedisArrayHeader((ArrayHeaderRedisMessage) msg); + // only decode Array and Set types + if (msg instanceof ArrayHeaderRedisMessage + || msg instanceof SetHeaderRedisMessage + || msg instanceof PushHeaderRedisMessage) { + msg = decodeRedisCollectionHeader((AggregatedHeaderRedisMessage) msg); if (msg == null) { return; } @@ -51,10 +57,16 @@ protected void decode(ChannelHandlerContext ctx, RedisMessage msg) throws Except // if current aggregation completed, go to parent aggregation. if (current.children.size() == current.length) { - msg = new ArrayRedisMessage(current.children); + if (RedisMessageType.ARRAY_HEADER.equals(current.aggregateType)) { + msg = new ArrayRedisMessage((List) current.children); + } else if (RedisMessageType.SET_HEADER.equals(current.aggregateType)) { + msg = new SetRedisMessage((Set) current.children); + } else if (RedisMessageType.PUSH.equals(current.aggregateType)) { + msg = new PushRedisMessage((List) current.children); + } depths.pop(); } else { - // Not aggregated yet. Try next time. + // not aggregated yet. try next time. return; } } @@ -62,19 +74,21 @@ protected void decode(ChannelHandlerContext ctx, RedisMessage msg) throws Except ctx.fireChannelRead(msg); } - private RedisMessage decodeRedisArrayHeader(ArrayHeaderRedisMessage header) { + private RedisMessage decodeRedisCollectionHeader(AggregatedHeaderRedisMessage header) { + // todo use NullRedisMessage replacing *-1 and $-1 ? if (header.isNull()) { return ArrayRedisMessage.NULL_INSTANCE; } else if (header.length() == 0L) { - return ArrayRedisMessage.EMPTY_INSTANCE; + return (header instanceof SetHeaderRedisMessage) ? + SetRedisMessage.EMPTY_INSTANCE : ArrayRedisMessage.EMPTY_INSTANCE; } else if (header.length() > 0L) { // Currently, this codec doesn't support `long` length for arrays because Java's List.size() is int. if (header.length() > Integer.MAX_VALUE) { throw new CodecException("this codec doesn't support longer length than " + Integer.MAX_VALUE); } - // start aggregating array - depths.push(new AggregateState((int) header.length())); + // start aggregating array or set according header type + depths.push(new AggregateState(header, (int) header.length())); return null; } else { throw new CodecException("bad length: " + header.length()); @@ -83,10 +97,24 @@ private RedisMessage decodeRedisArrayHeader(ArrayHeaderRedisMessage header) { private static final class AggregateState { private final int length; - private final List children; - AggregateState(int length) { + private final Collection children; + private final RedisMessageType aggregateType; + + AggregateState(AggregatedHeaderRedisMessage headerType, int length) { this.length = length; - children = new ArrayList<>(length); + if (headerType instanceof ArrayHeaderRedisMessage) { + this.children = new ArrayList(length); + this.aggregateType = RedisMessageType.ARRAY_HEADER; + } else if (headerType instanceof SetHeaderRedisMessage) { + this.children = new HashSet(length); + this.aggregateType = RedisMessageType.SET_HEADER; + } else if (headerType instanceof PushHeaderRedisMessage) { + this.children = new ArrayList(length); + this.aggregateType = RedisMessageType.PUSH; + } else { + // never going to run here + throw new CodecException("bad header type: " + headerType); + } } } } diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisBulkStringAggregator.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisBulkStringAggregator.java index 63bd248..934481b 100644 --- a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisBulkStringAggregator.java +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisBulkStringAggregator.java @@ -21,8 +21,10 @@ import io.netty.util.internal.UnstableApi; /** - * A {@link ChannelHandler} that aggregates an {@link BulkStringHeaderRedisMessage} - * and its following {@link BulkStringRedisContent}s into a single {@link FullBulkStringRedisMessage} + * A {@link ChannelHandler} that aggregates + * an {@link BulkStringHeaderRedisMessage} or {@link BulkErrorStringHeaderRedisMessage} + * and its following {@link BulkStringRedisContent}s into correspondingly + * a single {@link FullBulkStringRedisMessage} or {@link FullBulkErrorStringRedisMessage} * with no following {@link BulkStringRedisContent}s. It is useful when you don't want to take * care of {@link RedisMessage}s whose transfer encoding is 'chunked'. Insert this * handler after {@link RedisDecoder} in the {@link ChannelPipeline}: @@ -94,6 +96,12 @@ protected boolean ignoreContentAfterContinueResponse(Object msg) throws Exceptio @Override protected FullBulkStringRedisMessage beginAggregation(BulkStringHeaderRedisMessage start, ByteBuf content) throws Exception { - return new FullBulkStringRedisMessage(content); + if (start instanceof BulkErrorStringHeaderRedisMessage) { + return new FullBulkErrorStringRedisMessage(content); + } else if (start instanceof BulkVerbatimStringHeaderRedisMessage) { + return new FullBulkVerbatimStringRedisMessage(content); + } else { + return new FullBulkStringRedisMessage(content); + } } } diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisCodecUtil.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisCodecUtil.java index 0b0abcc..8553d8d 100644 --- a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisCodecUtil.java +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisCodecUtil.java @@ -29,6 +29,10 @@ static byte[] longToAsciiBytes(long value) { return Long.toString(value).getBytes(CharsetUtil.US_ASCII); } + static byte[] doubleToAsciiBytes(double value) { + return Double.toString(value).getBytes(CharsetUtil.US_ASCII); + } + /** * Returns a {@code short} value using endian order. */ diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisConstants.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisConstants.java index 2d0f1d2..039ba4b 100644 --- a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisConstants.java +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisConstants.java @@ -30,6 +30,16 @@ private RedisConstants() { static final int NULL_VALUE = -1; + static final int BOOLEAN_LENGTH = 1; + + static final byte BOOLEAN_TRUE_CONTENT = 't'; + + static final byte BOOLEAN_FALSE_CONTENT = 'f'; + + static final String DOUBLE_POSITIVE_INF_CONTENT = "inf"; + + static final String DOUBLE_NEGATIVE_INF_CONTENT = "-inf"; + static final int REDIS_MESSAGE_MAX_LENGTH = 512 * 1024 * 1024; // 512MB // 64KB is max inline length of current Redis server implementation. @@ -37,8 +47,12 @@ private RedisConstants() { static final int POSITIVE_LONG_MAX_LENGTH = 19; // length of Long.MAX_VALUE + static final int POSITIVE_DOUBLE_MAX_LENGTH = 16; // length of DOUBLE.MAX_VALUE + static final int LONG_MAX_LENGTH = POSITIVE_LONG_MAX_LENGTH + 1; // +1 is sign + static final int DOUBLE_MAX_LENGTH = POSITIVE_DOUBLE_MAX_LENGTH + 1; // +1 is sign + static final short NULL_SHORT = RedisCodecUtil.makeShort('-', '1'); static final short EOL_SHORT = RedisCodecUtil.makeShort('\r', '\n'); diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisDecoder.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisDecoder.java index ceaf95d..f3c5e2c 100644 --- a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisDecoder.java +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisDecoder.java @@ -21,6 +21,15 @@ import io.netty.util.CharsetUtil; import io.netty.util.internal.UnstableApi; +import java.math.BigInteger; + +import static io.netty.contrib.handler.codec.redis.RedisConstants.BOOLEAN_TRUE_CONTENT; +import static io.netty.contrib.handler.codec.redis.RedisConstants.DOUBLE_NEGATIVE_INF_CONTENT; +import static io.netty.contrib.handler.codec.redis.RedisConstants.DOUBLE_POSITIVE_INF_CONTENT; +import static io.netty.contrib.handler.codec.redis.RedisMessageType.BLOB_ERROR; +import static io.netty.contrib.handler.codec.redis.RedisMessageType.BULK_STRING; +import static io.netty.contrib.handler.codec.redis.RedisMessageType.VERBATIM_STRING; + /** * Decodes the Redis protocol into {@link RedisMessage} objects following * RESP (REdis Serialization Protocol). @@ -32,6 +41,7 @@ public final class RedisDecoder extends ByteToMessageDecoder { private final ToPositiveLongProcessor toPositiveLongProcessor = new ToPositiveLongProcessor(); + private final ToPositiveBigIntegerProcessor toPositiveBigIntegerProcessor = new ToPositiveBigIntegerProcessor(); private final boolean decodeInlineCommands; private final int maxInlineMessageLength; @@ -44,8 +54,8 @@ public final class RedisDecoder extends ByteToMessageDecoder { private enum State { DECODE_TYPE, - DECODE_INLINE, // SIMPLE_STRING, ERROR, INTEGER - DECODE_LENGTH, // BULK_STRING, ARRAY_HEADER + DECODE_INLINE, // SIMPLE_STRING, ERROR, INTEGER, DOUBLE, BIG_NUMBER, BOOLEAN, NULL + DECODE_LENGTH, // BULK_STRING, ARRAY_HEADER, BLOB_ERROR, VERBATIM_STRING DECODE_BULK_STRING_EOL, DECODE_BULK_STRING_CONTENT, } @@ -84,7 +94,7 @@ public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool) { public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool, boolean decodeInlineCommands) { if (maxInlineMessageLength <= 0 || maxInlineMessageLength > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) { throw new RedisCodecException("maxInlineMessageLength: " + maxInlineMessageLength + - " (expected: <= " + RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")"); + " (expected: <= " + RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")"); } this.maxInlineMessageLength = maxInlineMessageLength; this.messagePool = messagePool; @@ -96,33 +106,33 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { try { for (;;) { switch (state) { - case DECODE_TYPE: - if (!decodeType(in)) { - return; - } - break; - case DECODE_INLINE: - if (!decodeInline(ctx, in)) { - return; - } - break; - case DECODE_LENGTH: - if (!decodeLength(ctx, in)) { - return; - } - break; - case DECODE_BULK_STRING_EOL: - if (!decodeBulkStringEndOfLine(ctx, in)) { - return; - } - break; - case DECODE_BULK_STRING_CONTENT: - if (!decodeBulkStringContent(ctx, in)) { - return; - } - break; - default: - throw new RedisCodecException("Unknown state: " + state); + case DECODE_TYPE: + if (!decodeType(in)) { + return; + } + break; + case DECODE_INLINE: + if (!decodeInline(ctx, in)) { + return; + } + break; + case DECODE_LENGTH: + if (!decodeLength(ctx, in)) { + return; + } + break; + case DECODE_BULK_STRING_EOL: + if (!decodeBulkStringEndOfLine(ctx, in)) { + return; + } + break; + case DECODE_BULK_STRING_CONTENT: + if (!decodeBulkStringContent(ctx, in)) { + return; + } + break; + default: + throw new RedisCodecException("Unknown state: " + state); } } } catch (RedisCodecException e) { @@ -154,7 +164,7 @@ private boolean decodeInline(ChannelHandlerContext ctx, ByteBuf in) throws Excep if (lineBytes == null) { if (in.readableBytes() > maxInlineMessageLength) { throw new RedisCodecException("length: " + in.readableBytes() + - " (expected: <= " + maxInlineMessageLength + ")"); + " (expected: <= " + maxInlineMessageLength + ")"); } return false; } @@ -173,35 +183,55 @@ private boolean decodeLength(ChannelHandlerContext ctx, ByteBuf in) throws Excep throw new RedisCodecException("length: " + length + " (expected: >= " + RedisConstants.NULL_VALUE + ")"); } switch (type) { - case ARRAY_HEADER: - ctx.fireChannelRead(new ArrayHeaderRedisMessage(length)); - resetDecoder(); - return true; - case BULK_STRING: - if (length > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) { - throw new RedisCodecException("length: " + length + " (expected: <= " + - RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")"); - } - remainingBulkLength = (int) length; // range(int) is already checked. - return decodeBulkString(ctx, in); - default: - throw new RedisCodecException("bad type: " + type); + case ARRAY_HEADER: + ctx.fireChannelRead(new ArrayHeaderRedisMessage(length)); + resetDecoder(); + return true; + case SET_HEADER: + ctx.fireChannelRead(new SetHeaderRedisMessage(length)); + resetDecoder(); + return true; + case PUSH: + ctx.fireChannelRead(new PushHeaderRedisMessage(length)); + resetDecoder(); + return true; + case MAP_HEADER: + ctx.fireChannelRead(new MapHeaderRedisMessage(length)); + resetDecoder(); + return true; + case BULK_STRING: + case BLOB_ERROR: + case VERBATIM_STRING: + if (length > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) { + throw new RedisCodecException("length: " + length + " (expected: <= " + + RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")"); + } + remainingBulkLength = (int) length; // range(int) is already checked. + return decodeBulkString(ctx, in, type); + default: + throw new RedisCodecException("bad type: " + type); } } - private boolean decodeBulkString(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + private boolean decodeBulkString(ChannelHandlerContext ctx, ByteBuf in, RedisMessageType messageType) throws Exception { switch (remainingBulkLength) { - case RedisConstants.NULL_VALUE: // $-1\r\n - ctx.fireChannelRead(FullBulkStringRedisMessage.NULL_INSTANCE); - resetDecoder(); - return true; - case 0: - state = State.DECODE_BULK_STRING_EOL; - return decodeBulkStringEndOfLine(ctx, in); - default: // expectedBulkLength is always positive. - ctx.fireChannelRead(new BulkStringHeaderRedisMessage(remainingBulkLength)); - state = State.DECODE_BULK_STRING_CONTENT; - return decodeBulkStringContent(ctx, in); + case RedisConstants.NULL_VALUE: // $-1\r\n + ctx.fireChannelRead(FullBulkStringRedisMessage.NULL_INSTANCE); + resetDecoder(); + return true; + case 0: + state = State.DECODE_BULK_STRING_EOL; + return decodeBulkStringEndOfLine(ctx, in); + default: // expectedBulkLength is always positive. + if (BULK_STRING.equals(messageType)) { + ctx.fireChannelRead(new BulkStringHeaderRedisMessage(remainingBulkLength)); + } else if (BLOB_ERROR.equals(messageType)) { + ctx.fireChannelRead(new BulkErrorStringHeaderRedisMessage(remainingBulkLength)); + } else if (VERBATIM_STRING.equals(messageType)) { + ctx.fireChannelRead(new BulkVerbatimStringHeaderRedisMessage(remainingBulkLength)); + } + state = State.DECODE_BULK_STRING_CONTENT; + return decodeBulkStringContent(ctx, in); } } @@ -217,6 +247,7 @@ private boolean decodeBulkStringEndOfLine(ChannelHandlerContext ctx, ByteBuf in) } // ${expectedBulkLength}\r\n {data...}\r\n + // or !{expectedBulkLength}\r\n {data...}\r\n private boolean decodeBulkStringContent(ChannelHandlerContext ctx, ByteBuf in) throws Exception { final int readableBytes = in.readableBytes(); if (readableBytes == 0 || remainingBulkLength == 0 && readableBytes < RedisConstants.EOL_LENGTH) { @@ -251,22 +282,42 @@ private static void readEndOfLine(final ByteBuf in) { private RedisMessage newInlineRedisMessage(RedisMessageType messageType, ByteBuf content) { switch (messageType) { - case INLINE_COMMAND: - return new InlineCommandRedisMessage(content.toString(CharsetUtil.UTF_8)); - case SIMPLE_STRING: { - SimpleStringRedisMessage cached = messagePool.getSimpleString(content); - return cached != null ? cached : new SimpleStringRedisMessage(content.toString(CharsetUtil.UTF_8)); - } - case ERROR: { - ErrorRedisMessage cached = messagePool.getError(content); - return cached != null ? cached : new ErrorRedisMessage(content.toString(CharsetUtil.UTF_8)); - } - case INTEGER: { - IntegerRedisMessage cached = messagePool.getInteger(content); - return cached != null ? cached : new IntegerRedisMessage(parseRedisNumber(content)); - } - default: - throw new RedisCodecException("bad type: " + messageType); + case INLINE_COMMAND: + return new InlineCommandRedisMessage(content.toString(CharsetUtil.UTF_8)); + case SIMPLE_STRING: { + SimpleStringRedisMessage cached = messagePool.getSimpleString(content); + return cached != null ? cached : new SimpleStringRedisMessage(content.toString(CharsetUtil.UTF_8)); + } + case ERROR: { + ErrorRedisMessage cached = messagePool.getError(content); + return cached != null ? cached : new ErrorRedisMessage(content.toString(CharsetUtil.UTF_8)); + } + case INTEGER: { + IntegerRedisMessage cached = messagePool.getInteger(content); + return cached != null ? cached : new IntegerRedisMessage(parseRedisNumber(content)); + } + case DOUBLE: { + String value = content.toString(CharsetUtil.UTF_8); + if (DOUBLE_POSITIVE_INF_CONTENT.equals(value)) { + return DoubleRedisMessage.POSITIVE_INFINITY; + } else if (DOUBLE_NEGATIVE_INF_CONTENT.equals(value)) { + return DoubleRedisMessage.NEGATIVE_INFINITY; + } else { + return new DoubleRedisMessage(Double.parseDouble(value)); + } + } + case BIG_NUMBER: { + return new BigNumberRedisMessage(parsePositiveBigInteger(content)); + } + case BOOLEAN: { + return content.readByte() == BOOLEAN_TRUE_CONTENT ? + BooleanRedisMessage.TRUE : BooleanRedisMessage.FALSE; + } + case NULL: { + return NullRedisMessage.INSTANCE; + } + default: + throw new RedisCodecException("bad type: " + messageType); } } @@ -292,7 +343,7 @@ private long parseRedisNumber(ByteBuf byteBuf) { } if (readableBytes > RedisConstants.POSITIVE_LONG_MAX_LENGTH + extraOneByteForNegative) { throw new RedisCodecException("too many characters to be a valid RESP Integer: " + - byteBuf.toString(CharsetUtil.US_ASCII)); + byteBuf.toString(CharsetUtil.US_ASCII)); } if (negative) { return -parsePositiveNumber(byteBuf.skipBytes(extraOneByteForNegative)); @@ -306,6 +357,12 @@ private long parsePositiveNumber(ByteBuf byteBuf) { return toPositiveLongProcessor.content(); } + private BigInteger parsePositiveBigInteger(ByteBuf byteBuf) { + toPositiveBigIntegerProcessor.reset(); + byteBuf.forEachByte(toPositiveBigIntegerProcessor); + return toPositiveBigIntegerProcessor.content(); + } + private static final class ToPositiveLongProcessor implements ByteProcessor { private long result; @@ -326,4 +383,25 @@ public void reset() { result = 0; } } + + private static final class ToPositiveBigIntegerProcessor implements ByteProcessor { + private BigInteger result = BigInteger.ZERO; + + @Override + public boolean process(byte value) { + if (value < '0' || value > '9') { + throw new RedisCodecException("bad byte in number: " + value); + } + result = result.multiply(BigInteger.TEN).add(BigInteger.valueOf(value - '0')); + return true; + } + + public BigInteger content() { + return result; + } + + public void reset() { + result = BigInteger.ZERO; + } + } } diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisEncoder.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisEncoder.java index e0bf4e3..1ce8a9e 100644 --- a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisEncoder.java +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisEncoder.java @@ -14,21 +14,26 @@ */ package io.netty.contrib.handler.codec.redis; -import static java.util.Objects.requireNonNull; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.CodecException; import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.util.CharsetUtil; import io.netty.util.internal.UnstableApi; import java.util.List; +import java.util.Map; + +import static io.netty.contrib.handler.codec.redis.RedisConstants.DOUBLE_NEGATIVE_INF_CONTENT; +import static io.netty.contrib.handler.codec.redis.RedisConstants.DOUBLE_POSITIVE_INF_CONTENT; +import static java.util.Objects.requireNonNull; /** * Encodes {@link RedisMessage} into bytes following - * RESP (REdis Serialization Protocol). + * RESP (Redis Serialization Protocol) + * and RESP3. */ @UnstableApi public class RedisEncoder extends MessageToMessageEncoder { @@ -70,23 +75,84 @@ private void writeRedisMessage(ByteBufAllocator allocator, RedisMessage msg, Lis writeErrorMessage(allocator, (ErrorRedisMessage) msg, out); } else if (msg instanceof IntegerRedisMessage) { writeIntegerMessage(allocator, (IntegerRedisMessage) msg, out); + } else if (msg instanceof FullBulkErrorStringRedisMessage) { + writeFullBulkStringMessage(allocator, (FullBulkErrorStringRedisMessage) msg, + RedisMessageType.BLOB_ERROR, out); + } else if (msg instanceof FullBulkVerbatimStringRedisMessage) { + writeFullBulkStringMessage(allocator, (FullBulkVerbatimStringRedisMessage) msg, + RedisMessageType.VERBATIM_STRING, out); } else if (msg instanceof FullBulkStringRedisMessage) { - writeFullBulkStringMessage(allocator, (FullBulkStringRedisMessage) msg, out); + writeFullBulkStringMessage(allocator, (FullBulkStringRedisMessage) msg, + RedisMessageType.BULK_STRING, out); } else if (msg instanceof BulkStringRedisContent) { writeBulkStringContent(allocator, (BulkStringRedisContent) msg, out); + } else if (msg instanceof BulkErrorStringHeaderRedisMessage) { + writeBulkStringHeader(allocator, (BulkErrorStringHeaderRedisMessage) msg, + RedisMessageType.BLOB_ERROR, out); + } else if (msg instanceof BulkVerbatimStringHeaderRedisMessage) { + writeBulkStringHeader(allocator, (BulkVerbatimStringHeaderRedisMessage) msg, + RedisMessageType.VERBATIM_STRING, out); } else if (msg instanceof BulkStringHeaderRedisMessage) { - writeBulkStringHeader(allocator, (BulkStringHeaderRedisMessage) msg, out); + writeBulkStringHeader(allocator, (BulkStringHeaderRedisMessage) msg, + RedisMessageType.BULK_STRING, out); } else if (msg instanceof ArrayHeaderRedisMessage) { - writeArrayHeader(allocator, (ArrayHeaderRedisMessage) msg, out); + writeAggregatedHeader(allocator, RedisMessageType.ARRAY_HEADER, (ArrayHeaderRedisMessage) msg, out); } else if (msg instanceof ArrayRedisMessage) { - writeArrayMessage(allocator, (ArrayRedisMessage) msg, out); + writeCollectionMessage(allocator, RedisMessageType.ARRAY_HEADER, (ArrayRedisMessage) msg, out); + } else if (msg instanceof PushHeaderRedisMessage) { + writeAggregatedHeader(allocator, RedisMessageType.PUSH, (PushHeaderRedisMessage) msg, out); + } else if (msg instanceof PushRedisMessage) { + writeCollectionMessage(allocator, RedisMessageType.PUSH, (PushRedisMessage) msg, out); + } else if (msg instanceof SetHeaderRedisMessage) { + writeAggregatedHeader(allocator, RedisMessageType.SET_HEADER, (SetHeaderRedisMessage) msg, out); + } else if (msg instanceof SetRedisMessage) { + writeCollectionMessage(allocator, RedisMessageType.SET_HEADER, (SetRedisMessage) msg, out); + } else if (msg instanceof MapHeaderRedisMessage) { + writeAggregatedHeader(allocator, RedisMessageType.MAP_HEADER, (MapHeaderRedisMessage) msg, out); + } else if (msg instanceof MapRedisMessage) { + writeMapMessage(allocator, RedisMessageType.MAP_HEADER, (MapRedisMessage) msg, out); + } else if (msg instanceof DoubleRedisMessage) { + writeDoubleMessage(allocator, (DoubleRedisMessage) msg, out); + } else if (msg instanceof BigNumberRedisMessage) { + writeBigNumberMessage(allocator, (BigNumberRedisMessage) msg, out); + } else if (msg instanceof BooleanRedisMessage) { + writeBooleanMessage(allocator, (BooleanRedisMessage) msg, out); + } else if (msg instanceof NullRedisMessage) { + writeNullMessage(allocator, (NullRedisMessage) msg, out); } else { throw new CodecException("unknown message type: " + msg); } } + private void writeBigNumberMessage(ByteBufAllocator allocator, BigNumberRedisMessage msg, List out) { + writeString(allocator, RedisMessageType.BIG_NUMBER, msg.value(), out); + } + + private void writeDoubleMessage(ByteBufAllocator allocator, DoubleRedisMessage msg, List out) { + ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.DOUBLE_MAX_LENGTH + + RedisConstants.EOL_LENGTH); + RedisMessageType.DOUBLE.writeTo(buf); + buf.writeBytes(doubleToBytes(msg.value())); + buf.writeShort(RedisConstants.EOL_SHORT); + out.add(buf); + } + + private void writeBooleanMessage(ByteBufAllocator allocator, BooleanRedisMessage msg, List out) { + ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.BOOLEAN_LENGTH + + RedisConstants.EOL_LENGTH); + RedisMessageType.BOOLEAN.writeTo(buf); + byte content = msg.value() ? RedisConstants.BOOLEAN_TRUE_CONTENT : RedisConstants.BOOLEAN_FALSE_CONTENT; + buf.writeByte(content); + buf.writeShort(RedisConstants.EOL_SHORT); + out.add(buf); + } + + private void writeNullMessage(ByteBufAllocator allocator, NullRedisMessage msg, List out) { + writeString(allocator, RedisMessageType.NULL, "", out); + } + private static void writeInlineCommandMessage(ByteBufAllocator allocator, InlineCommandRedisMessage msg, - List out) { + List out) { writeString(allocator, RedisMessageType.INLINE_COMMAND, msg.content(), out); } @@ -102,7 +168,7 @@ private static void writeErrorMessage(ByteBufAllocator allocator, ErrorRedisMess private static void writeString(ByteBufAllocator allocator, RedisMessageType type, String content, List out) { ByteBuf buf = allocator.ioBuffer(type.length() + ByteBufUtil.utf8MaxBytes(content) + - RedisConstants.EOL_LENGTH); + RedisConstants.EOL_LENGTH); type.writeTo(buf); ByteBufUtil.writeUtf8(buf, content); buf.writeShort(RedisConstants.EOL_SHORT); @@ -111,18 +177,19 @@ private static void writeString(ByteBufAllocator allocator, RedisMessageType typ private void writeIntegerMessage(ByteBufAllocator allocator, IntegerRedisMessage msg, List out) { ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + - RedisConstants.EOL_LENGTH); + RedisConstants.EOL_LENGTH); RedisMessageType.INTEGER.writeTo(buf); buf.writeBytes(numberToBytes(msg.value())); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); } - private void writeBulkStringHeader(ByteBufAllocator allocator, BulkStringHeaderRedisMessage msg, List out) { + private void writeBulkStringHeader(ByteBufAllocator allocator, BulkStringHeaderRedisMessage msg, + RedisMessageType messageType, List out) { final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + - (msg.isNull() ? RedisConstants.NULL_LENGTH : - RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH)); - RedisMessageType.BULK_STRING.writeTo(buf); + (msg.isNull() ? RedisConstants.NULL_LENGTH : + RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH)); + messageType.writeTo(buf); if (msg.isNull()) { buf.writeShort(RedisConstants.NULL_SHORT); } else { @@ -141,18 +208,18 @@ private static void writeBulkStringContent(ByteBufAllocator allocator, BulkStrin } private void writeFullBulkStringMessage(ByteBufAllocator allocator, FullBulkStringRedisMessage msg, - List out) { + RedisMessageType messageType, List out) { if (msg.isNull()) { ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.NULL_LENGTH + - RedisConstants.EOL_LENGTH); - RedisMessageType.BULK_STRING.writeTo(buf); + RedisConstants.EOL_LENGTH); + messageType.writeTo(buf); buf.writeShort(RedisConstants.NULL_SHORT); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); } else { ByteBuf headerBuf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + - RedisConstants.EOL_LENGTH); - RedisMessageType.BULK_STRING.writeTo(headerBuf); + RedisConstants.EOL_LENGTH); + messageType.writeTo(headerBuf); headerBuf.writeBytes(numberToBytes(msg.content().readableBytes())); headerBuf.writeShort(RedisConstants.EOL_SHORT); out.add(headerBuf); @@ -164,36 +231,55 @@ private void writeFullBulkStringMessage(ByteBufAllocator allocator, FullBulkStri /** * Write array header only without body. Use this if you want to write arrays as streaming. */ - private void writeArrayHeader(ByteBufAllocator allocator, ArrayHeaderRedisMessage msg, List out) { - writeArrayHeader(allocator, msg.isNull(), msg.length(), out); + private void writeAggregatedHeader(ByteBufAllocator allocator, RedisMessageType messageType, + AggregatedHeaderRedisMessage msg, List out) { + writeAggregatedHeader(allocator, messageType, msg.isNull(), msg.length(), out); } /** - * Write full constructed array message. + * Write full constructed collection message. */ - private void writeArrayMessage(ByteBufAllocator allocator, ArrayRedisMessage msg, List out) { + private void writeCollectionMessage(ByteBufAllocator allocator, RedisMessageType messageType, + AbstractCollectionRedisMessage msg, List out) { if (msg.isNull()) { - writeArrayHeader(allocator, msg.isNull(), RedisConstants.NULL_VALUE, out); + writeAggregatedHeader(allocator, messageType, msg.isNull(), RedisConstants.NULL_VALUE, out); } else { - writeArrayHeader(allocator, msg.isNull(), msg.children().size(), out); + writeAggregatedHeader(allocator, messageType, msg.isNull(), msg.children().size(), out); for (RedisMessage child : msg.children()) { writeRedisMessage(allocator, child, out); } } } - private void writeArrayHeader(ByteBufAllocator allocator, boolean isNull, long length, List out) { + /** + * Write full constructed map message. + */ + private void writeMapMessage(ByteBufAllocator allocator, RedisMessageType messageType, + AbstractMapRedisMessage msg, List out) { + if (msg.isNull()) { + writeAggregatedHeader(allocator, messageType, msg.isNull(), RedisConstants.NULL_VALUE, out); + } else { + writeAggregatedHeader(allocator, messageType, msg.isNull(), msg.children().size(), out); + for (Map.Entry child : msg.children().entrySet()) { + writeRedisMessage(allocator, child.getKey(), out); + writeRedisMessage(allocator, child.getValue(), out); + } + } + } + + private void writeAggregatedHeader(ByteBufAllocator allocator, RedisMessageType messageType, + boolean isNull, long length, List out) { if (isNull) { final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.NULL_LENGTH + - RedisConstants.EOL_LENGTH); - RedisMessageType.ARRAY_HEADER.writeTo(buf); + RedisConstants.EOL_LENGTH); + messageType.writeTo(buf); buf.writeShort(RedisConstants.NULL_SHORT); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); } else { final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + - RedisConstants.EOL_LENGTH); - RedisMessageType.ARRAY_HEADER.writeTo(buf); + RedisConstants.EOL_LENGTH); + messageType.writeTo(buf); buf.writeBytes(numberToBytes(length)); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); @@ -204,4 +290,14 @@ private byte[] numberToBytes(long value) { byte[] bytes = messagePool.getByteBufOfInteger(value); return bytes != null ? bytes : RedisCodecUtil.longToAsciiBytes(value); } + + private static byte[] doubleToBytes(double value) { + if (value == Double.MAX_VALUE) { + return DOUBLE_POSITIVE_INF_CONTENT.getBytes(CharsetUtil.US_ASCII); + } + if (value == Double.MIN_VALUE) { + return DOUBLE_NEGATIVE_INF_CONTENT.getBytes(CharsetUtil.US_ASCII); + } + return RedisCodecUtil.doubleToAsciiBytes(value); + } } diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisMapAggregator.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisMapAggregator.java new file mode 100644 index 0000000..a9d045f --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisMapAggregator.java @@ -0,0 +1,112 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.contrib.handler.codec.redis; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.CodecException; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.UnstableApi; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Aggregates {@link RedisMessage} parts into {@link MapRedisMessage}. + * This decoder should be used together with {@link RedisDecoder}. + */ +@UnstableApi +public final class RedisMapAggregator extends MessageToMessageDecoder { + + private final Deque depths = new ArrayDeque(4); + + @Override + protected void decode(ChannelHandlerContext ctx, RedisMessage msg) throws Exception { + if (msg instanceof MapHeaderRedisMessage) { + msg = decodeRedisMapHeader((AggregatedHeaderRedisMessage) msg); + if (msg == null) { + return; + } + } else { + ReferenceCountUtil.retain(msg); + } + + while (!depths.isEmpty()) { + AggregateState current = depths.peek(); + current.add(msg); + + // if current aggregation completed, go to parent aggregation. + if (current.children.size() == current.length) { + msg = new MapRedisMessage(current.children); + depths.pop(); + } else { + // not aggregated yet. try next time. + return; + } + } + + ctx.fireChannelRead(msg); + } + + private RedisMessage decodeRedisMapHeader(AggregatedHeaderRedisMessage header) { + // encode to Null types message if map is null or empty + if (header.isNull()) { + return NullRedisMessage.INSTANCE; + } + if (header.length() == 0L) { + return MapRedisMessage.EMPTY_INSTANCE; + } + if (header.length() > 0L) { + // Currently, this codec doesn't support `long` length for arrays because Java's Map.size() is int. + if (header.length() > Integer.MAX_VALUE) { + throw new CodecException("this codec doesn't support longer length than " + Integer.MAX_VALUE); + } + + // start aggregating array or set according header type + depths.push(new AggregateState(header, (int) header.length())); + } + return null; + } + + private static final class AggregateState { + private final int length; + private final Map children; + private final RedisMessageType aggregateType; + private RedisMessage cache; + + AggregateState(AggregatedHeaderRedisMessage headerType, int length) { + this.length = length; + if (headerType instanceof MapHeaderRedisMessage) { + this.children = new HashMap(length); + this.aggregateType = RedisMessageType.MAP_HEADER; + } else { + throw new CodecException("bad header type: " + headerType); + } + } + + // aggregate msg to map key and value + void add(RedisMessage msg) { + if (cache == null) { + cache = msg; + } else { + children.put(cache, msg); + cache = null; + } + } + } +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisMessageType.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisMessageType.java index 8faf81e..6b4d539 100644 --- a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisMessageType.java +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/RedisMessageType.java @@ -18,7 +18,7 @@ import io.netty.util.internal.UnstableApi; /** - * Type of RESP (REdis Serialization Protocol). + * Type of RESP3. */ @UnstableApi public enum RedisMessageType { @@ -28,7 +28,17 @@ public enum RedisMessageType { ERROR((byte) '-', true), INTEGER((byte) ':', true), BULK_STRING((byte) '$', false), - ARRAY_HEADER((byte) '*', false); + ARRAY_HEADER((byte) '*', false), + NULL((byte) '_', true), + DOUBLE((byte) ',', true), + BOOLEAN((byte) '#', true), + BLOB_ERROR((byte) '!', false), + VERBATIM_STRING((byte) '=', false), + BIG_NUMBER((byte) '(', true), + MAP_HEADER((byte) '%', false), + SET_HEADER((byte) '~', false), + ATTRIBUTE_HEADER((byte) '|', false), + PUSH((byte) '>', false); private final Byte value; private final boolean inline; @@ -91,6 +101,26 @@ private static RedisMessageType valueOf(byte value) { return BULK_STRING; case '*': return ARRAY_HEADER; + case '_': + return NULL; + case ',': + return DOUBLE; + case '#': + return BOOLEAN; + case '!': + return BLOB_ERROR; + case '=': + return VERBATIM_STRING; + case '(': + return BIG_NUMBER; + case '%': + return MAP_HEADER; + case '~': + return SET_HEADER; + case '|': + return ATTRIBUTE_HEADER; + case '>': + return PUSH; default: return INLINE_COMMAND; } diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/SetHeaderRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/SetHeaderRedisMessage.java new file mode 100644 index 0000000..23b0ef7 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/SetHeaderRedisMessage.java @@ -0,0 +1,34 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.util.internal.UnstableApi; + +/** + * Header of Redis Set Message. + */ +@UnstableApi +public final class SetHeaderRedisMessage extends AggregatedHeaderRedisMessage { + + /** + * Creates a {@link SetHeaderRedisMessage} for the given {@code length}. + * + * @param length + */ + public SetHeaderRedisMessage(long length) { + super(length); + } +} diff --git a/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/SetRedisMessage.java b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/SetRedisMessage.java new file mode 100644 index 0000000..112e5f7 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/contrib/handler/codec/redis/SetRedisMessage.java @@ -0,0 +1,92 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.contrib.handler.codec.redis; + +import io.netty.util.internal.UnstableApi; + +import java.util.Collections; +import java.util.Set; + +/** + * Set of RESP3. + */ +@UnstableApi +public class SetRedisMessage extends AbstractCollectionRedisMessage { + + private SetRedisMessage() { + super(Collections.emptySet()); + } + + /** + * Creates a {@link SetRedisMessage} for the given {@code content}. + * + * @param children the children. + */ + public SetRedisMessage(Set children) { + super(children); + } + + /** + * Get children of this Set. It can be null or empty. + * + * @return Set of {@link RedisMessage}s. + */ + @Override + public final Set children() { + return (Set) children; + } + + /** + * A predefined empty set instance for {@link SetRedisMessage}. + */ + public static final SetRedisMessage EMPTY_INSTANCE = new SetRedisMessage() { + + @Override + public SetRedisMessage retain() { + return this; + } + + @Override + public SetRedisMessage retain(int increment) { + return this; + } + + @Override + public SetRedisMessage touch() { + return this; + } + + @Override + public SetRedisMessage touch(Object hint) { + return this; + } + + @Override + public boolean release() { + return false; + } + + @Override + public boolean release(int decrement) { + return false; + } + + @Override + public String toString() { + return "EmptySetRedisMessage"; + } + }; +} diff --git a/codec-redis/src/test/java/io/netty/contrib/handler/codec/redis/RedisDecoderTest.java b/codec-redis/src/test/java/io/netty/contrib/handler/codec/redis/RedisDecoderTest.java index 34f008a..8305f4f 100644 --- a/codec-redis/src/test/java/io/netty/contrib/handler/codec/redis/RedisDecoderTest.java +++ b/codec-redis/src/test/java/io/netty/contrib/handler/codec/redis/RedisDecoderTest.java @@ -24,14 +24,21 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; import java.util.List; +import java.util.Map; +import java.util.Set; +import static io.netty.contrib.handler.codec.redis.RedisCodecTestUtil.byteBufOf; +import static io.netty.contrib.handler.codec.redis.RedisCodecTestUtil.bytesOf; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.internal.bytebuddy.implementation.FixedValue.nullValue; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; /** * Verifies the correct functionality of the {@link RedisDecoder} and {@link RedisArrayAggregator}. @@ -50,7 +57,8 @@ private static EmbeddedChannel newChannel(boolean decodeInlineCommands) { return new EmbeddedChannel( new RedisDecoder(decodeInlineCommands), new RedisBulkStringAggregator(), - new RedisArrayAggregator()); + new RedisArrayAggregator(), + new RedisMapAggregator()); } @AfterEach @@ -60,8 +68,8 @@ public void tearDown() throws Exception { @Test public void splitEOLDoesNotInfiniteLoop() throws Exception { - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("$6\r\nfoobar\r"))); - assertTrue(channel.writeInbound(RedisCodecTestUtil.byteBufOf("\n"))); + assertFalse(channel.writeInbound(byteBufOf("$6\r\nfoobar\r"))); + assertTrue(channel.writeInbound(byteBufOf("\n"))); RedisMessage msg = channel.readInbound(); assertTrue(msg instanceof FullBulkStringRedisMessage); @@ -71,11 +79,11 @@ public void splitEOLDoesNotInfiniteLoop() throws Exception { @Test public void shouldNotDecodeInlineCommandByDefault() { assertThrows(DecoderException.class, () -> { - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("P"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("I"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("N"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("G"))); - assertTrue(channel.writeInbound(RedisCodecTestUtil.byteBufOf("\r\n"))); + assertFalse(channel.writeInbound(byteBufOf("P"))); + assertFalse(channel.writeInbound(byteBufOf("I"))); + assertFalse(channel.writeInbound(byteBufOf("N"))); + assertFalse(channel.writeInbound(byteBufOf("G"))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); channel.readInbound(); }); @@ -85,11 +93,11 @@ public void shouldNotDecodeInlineCommandByDefault() { public void shouldDecodeInlineCommand() { channel = newChannel(true); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("P"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("I"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("N"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("G"))); - assertTrue(channel.writeInbound(RedisCodecTestUtil.byteBufOf("\r\n"))); + assertFalse(channel.writeInbound(byteBufOf("P"))); + assertFalse(channel.writeInbound(byteBufOf("I"))); + assertFalse(channel.writeInbound(byteBufOf("N"))); + assertFalse(channel.writeInbound(byteBufOf("G"))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); InlineCommandRedisMessage msg = channel.readInbound(); @@ -100,10 +108,10 @@ public void shouldDecodeInlineCommand() { @Test public void shouldDecodeSimpleString() { - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("+"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("O"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("K"))); - assertTrue(channel.writeInbound(RedisCodecTestUtil.byteBufOf("\r\n"))); + assertFalse(channel.writeInbound(byteBufOf("+"))); + assertFalse(channel.writeInbound(byteBufOf("O"))); + assertFalse(channel.writeInbound(byteBufOf("K"))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); SimpleStringRedisMessage msg = channel.readInbound(); @@ -114,11 +122,11 @@ public void shouldDecodeSimpleString() { @Test public void shouldDecodeTwoSimpleStrings() { - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("+"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("O"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("K"))); - assertTrue(channel.writeInbound(RedisCodecTestUtil.byteBufOf("\r\n+SEC"))); - assertTrue(channel.writeInbound(RedisCodecTestUtil.byteBufOf("OND\r\n"))); + assertFalse(channel.writeInbound(byteBufOf("+"))); + assertFalse(channel.writeInbound(byteBufOf("O"))); + assertFalse(channel.writeInbound(byteBufOf("K"))); + assertTrue(channel.writeInbound(byteBufOf("\r\n+SEC"))); + assertTrue(channel.writeInbound(byteBufOf("OND\r\n"))); SimpleStringRedisMessage msg1 = channel.readInbound(); assertThat(msg1.content()).isEqualTo("OK"); @@ -132,10 +140,10 @@ public void shouldDecodeTwoSimpleStrings() { @Test public void shouldDecodeError() { String content = "ERROR sample message"; - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("-"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf(content))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("\r"))); - assertTrue(channel.writeInbound(RedisCodecTestUtil.byteBufOf("\n"))); + assertFalse(channel.writeInbound(byteBufOf("-"))); + assertFalse(channel.writeInbound(byteBufOf(content))); + assertFalse(channel.writeInbound(byteBufOf("\r"))); + assertTrue(channel.writeInbound(byteBufOf("\n"))); ErrorRedisMessage msg = channel.readInbound(); @@ -147,10 +155,10 @@ public void shouldDecodeError() { @Test public void shouldDecodeInteger() { long value = 1234L; - byte[] content = RedisCodecTestUtil.bytesOf(value); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf(":"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf(content))); - assertTrue(channel.writeInbound(RedisCodecTestUtil.byteBufOf("\r\n"))); + byte[] content = bytesOf(value); + assertFalse(channel.writeInbound(byteBufOf(":"))); + assertFalse(channel.writeInbound(byteBufOf(content))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); IntegerRedisMessage msg = channel.readInbound(); @@ -163,46 +171,46 @@ public void shouldDecodeInteger() { public void shouldDecodeBulkString() { String buf1 = "bulk\nst"; String buf2 = "ring\ntest\n1234"; - byte[] content = RedisCodecTestUtil.bytesOf(buf1 + buf2); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("$"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf(Integer.toString(content.length)))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("\r\n"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf(buf1))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf(buf2))); - assertTrue(channel.writeInbound(RedisCodecTestUtil.byteBufOf("\r\n"))); + byte[] content = bytesOf(buf1 + buf2); + assertFalse(channel.writeInbound(byteBufOf("$"))); + assertFalse(channel.writeInbound(byteBufOf(Integer.toString(content.length)))); + assertFalse(channel.writeInbound(byteBufOf("\r\n"))); + assertFalse(channel.writeInbound(byteBufOf(buf1))); + assertFalse(channel.writeInbound(byteBufOf(buf2))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); FullBulkStringRedisMessage msg = channel.readInbound(); - assertThat(RedisCodecTestUtil.bytesOf(msg.content())).isEqualTo(content); + assertThat(bytesOf(msg.content())).isEqualTo(content); ReferenceCountUtil.release(msg); } @Test public void shouldDecodeEmptyBulkString() { - byte[] content = RedisCodecTestUtil.bytesOf(""); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("$"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf(Integer.toString(content.length)))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("\r\n"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf(content))); - assertTrue(channel.writeInbound(RedisCodecTestUtil.byteBufOf("\r\n"))); + byte[] content = bytesOf(""); + assertFalse(channel.writeInbound(byteBufOf("$"))); + assertFalse(channel.writeInbound(byteBufOf(Integer.toString(content.length)))); + assertFalse(channel.writeInbound(byteBufOf("\r\n"))); + assertFalse(channel.writeInbound(byteBufOf(content))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); FullBulkStringRedisMessage msg = channel.readInbound(); - assertThat(RedisCodecTestUtil.bytesOf(msg.content())).isEqualTo(content); + assertThat(bytesOf(msg.content())).isEqualTo(content); ReferenceCountUtil.release(msg); } @Test public void shouldDecodeNullBulkString() { - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("$"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf(Integer.toString(-1)))); - assertTrue(channel.writeInbound(RedisCodecTestUtil.byteBufOf("\r\n"))); + assertFalse(channel.writeInbound(byteBufOf("$"))); + assertFalse(channel.writeInbound(byteBufOf(Integer.toString(-1)))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); - assertTrue(channel.writeInbound(RedisCodecTestUtil.byteBufOf("$"))); - assertTrue(channel.writeInbound(RedisCodecTestUtil.byteBufOf(Integer.toString(-1)))); - assertTrue(channel.writeInbound(RedisCodecTestUtil.byteBufOf("\r\n"))); + assertTrue(channel.writeInbound(byteBufOf("$"))); + assertTrue(channel.writeInbound(byteBufOf(Integer.toString(-1)))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); FullBulkStringRedisMessage msg1 = channel.readInbound(); assertThat(msg1.isNull()).isTrue(); @@ -218,11 +226,11 @@ public void shouldDecodeNullBulkString() { @Test public void shouldDecodeSimpleArray() throws Exception { - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("*3\r\n"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf(":1234\r\n"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("+sim"))); - assertFalse(channel.writeInbound(RedisCodecTestUtil.byteBufOf("ple\r\n-err"))); - assertTrue(channel.writeInbound(RedisCodecTestUtil.byteBufOf("or\r\n"))); + assertFalse(channel.writeInbound(byteBufOf("*3\r\n"))); + assertFalse(channel.writeInbound(byteBufOf(":1234\r\n"))); + assertFalse(channel.writeInbound(byteBufOf("+sim"))); + assertFalse(channel.writeInbound(byteBufOf("ple\r\n-err"))); + assertTrue(channel.writeInbound(byteBufOf("or\r\n"))); ArrayRedisMessage msg = channel.readInbound(); List children = msg.children(); @@ -242,9 +250,9 @@ public void shouldDecodeSimpleArray() throws Exception { @Test public void shouldDecodeNestedArray() throws Exception { ByteBuf buf = Unpooled.buffer(); - buf.writeBytes(RedisCodecTestUtil.byteBufOf("*2\r\n")); - buf.writeBytes(RedisCodecTestUtil.byteBufOf("*3\r\n:1\r\n:2\r\n:3\r\n")); - buf.writeBytes(RedisCodecTestUtil.byteBufOf("*2\r\n+Foo\r\n-Bar\r\n")); + buf.writeBytes(byteBufOf("*2\r\n")); + buf.writeBytes(byteBufOf("*3\r\n:1\r\n:2\r\n:3\r\n")); + buf.writeBytes(byteBufOf("*2\r\n+Foo\r\n-Bar\r\n")); assertTrue(channel.writeInbound(buf)); ArrayRedisMessage msg = channel.readInbound(); @@ -270,9 +278,9 @@ public void shouldDecodeNestedArray() throws Exception { @Test public void shouldErrorOnDoubleReleaseArrayReferenceCounted() { ByteBuf buf = Unpooled.buffer(); - buf.writeBytes(RedisCodecTestUtil.byteBufOf("*2\r\n")); - buf.writeBytes(RedisCodecTestUtil.byteBufOf("*3\r\n:1\r\n:2\r\n:3\r\n")); - buf.writeBytes(RedisCodecTestUtil.byteBufOf("*2\r\n+Foo\r\n-Bar\r\n")); + buf.writeBytes(byteBufOf("*2\r\n")); + buf.writeBytes(byteBufOf("*3\r\n:1\r\n:2\r\n:3\r\n")); + buf.writeBytes(byteBufOf("*2\r\n+Foo\r\n-Bar\r\n")); assertTrue(channel.writeInbound(buf)); ArrayRedisMessage msg = channel.readInbound(); @@ -284,9 +292,9 @@ public void shouldErrorOnDoubleReleaseArrayReferenceCounted() { @Test public void shouldErrorOnReleaseArrayChildReferenceCounted() { ByteBuf buf = Unpooled.buffer(); - buf.writeBytes(RedisCodecTestUtil.byteBufOf("*2\r\n")); - buf.writeBytes(RedisCodecTestUtil.byteBufOf("*3\r\n:1\r\n:2\r\n:3\r\n")); - buf.writeBytes(RedisCodecTestUtil.byteBufOf("$3\r\nFoo\r\n")); + buf.writeBytes(byteBufOf("*2\r\n")); + buf.writeBytes(byteBufOf("*3\r\n:1\r\n:2\r\n:3\r\n")); + buf.writeBytes(byteBufOf("$3\r\nFoo\r\n")); assertTrue(channel.writeInbound(buf)); ArrayRedisMessage msg = channel.readInbound(); @@ -299,8 +307,8 @@ public void shouldErrorOnReleaseArrayChildReferenceCounted() { @Test public void shouldErrorOnReleaseContentOfArrayChildReferenceCounted() throws Exception { ByteBuf buf = Unpooled.buffer(); - buf.writeBytes(RedisCodecTestUtil.byteBufOf("*2\r\n")); - buf.writeBytes(RedisCodecTestUtil.byteBufOf("$3\r\nFoo\r\n$3\r\nBar\r\n")); + buf.writeBytes(byteBufOf("*2\r\n")); + buf.writeBytes(byteBufOf("$3\r\nFoo\r\n$3\r\nBar\r\n")); assertTrue(channel.writeInbound(buf)); ArrayRedisMessage msg = channel.readInbound(); @@ -318,4 +326,260 @@ public void testPredefinedMessagesNotEqual() { assertNotEquals(FullBulkStringRedisMessage.EMPTY_INSTANCE, FullBulkStringRedisMessage.NULL_INSTANCE); assertNotEquals(FullBulkStringRedisMessage.NULL_INSTANCE, FullBulkStringRedisMessage.EMPTY_INSTANCE); } + + @Test + public void shouldDecodeNull() { + assertFalse(channel.writeInbound(byteBufOf("_"))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); + + RedisMessage msg = channel.readInbound(); + + assertTrue(msg instanceof NullRedisMessage); + + ReferenceCountUtil.release(msg); + } + + @Test + public void shouldDecodeBoolean() { + assertFalse(channel.writeInbound(byteBufOf("#"))); + assertFalse(channel.writeInbound(byteBufOf("t"))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); + + assertTrue(channel.writeInbound(byteBufOf("#f\r\n"))); + + BooleanRedisMessage msgTrue = channel.readInbound(); + assertTrue(msgTrue.value()); + ReferenceCountUtil.release(msgTrue); + + BooleanRedisMessage msgFalse = channel.readInbound(); + assertFalse(msgFalse.value()); + ReferenceCountUtil.release(msgFalse); + } + + @Test + public void shouldDecodeDouble() { + assertFalse(channel.writeInbound(byteBufOf(","))); + assertFalse(channel.writeInbound(byteBufOf("1.23"))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); + + DoubleRedisMessage msg = channel.readInbound(); + assertThat(msg.value()).isEqualTo(1.23d); + + assertTrue(channel.writeInbound(byteBufOf(",-1.23\r\n"))); + + msg = channel.readInbound(); + assertThat(msg.value()).isEqualTo(-1.23d); + + ReferenceCountUtil.release(msg); + } + + @Test + public void shouldDecodeInfinityDouble() { + assertFalse(channel.writeInbound(byteBufOf(",inf"))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); + assertTrue(channel.writeInbound(byteBufOf(",-inf\r\n"))); + + DoubleRedisMessage msg = channel.readInbound(); + assertThat(msg.value()).isEqualTo(Double.MAX_VALUE); + + msg = channel.readInbound(); + assertThat(msg.value()).isEqualTo(Double.MIN_VALUE); + + ReferenceCountUtil.release(msg); + } + + @Test + public void shouldErrorOnDecodeDoubleOnNotValidRepresentation() { + assertThrows(DecoderException.class, new Executable() { + @Override + public void execute() { + assertFalse(channel.writeInbound(byteBufOf(",-1.23a"))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); + BigNumberRedisMessage msg = channel.readInbound(); + ReferenceCountUtil.release(msg); + } + }); + } + + @Test + public void shouldDecodeBigNumber() { + assertFalse(channel.writeInbound(byteBufOf("(3492890328409238509324850943850943825024385"))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); + + BigNumberRedisMessage msg = channel.readInbound(); + assertThat(msg.value()).isEqualTo("3492890328409238509324850943850943825024385"); + + ReferenceCountUtil.release(msg); + } + + @Test + public void shouldErrorOnDecodeBigNumberOnNotValidRepresentation() { + assertThrows(DecoderException.class, new Executable() { + @Override + public void execute() { + assertFalse(channel.writeInbound(byteBufOf("(3492890328409238509324850943850943825024385"))); + assertFalse(channel.writeInbound(byteBufOf("Error"))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); + BigNumberRedisMessage msg = channel.readInbound(); + ReferenceCountUtil.release(msg); + } + }); + } + + @Test + public void shouldDecodeBulkErrorString() { + String buf1 = "bulk\nst"; + String buf2 = "ring\ntest\n1234"; + byte[] content = bytesOf(buf1 + buf2); + assertFalse(channel.writeInbound(byteBufOf("!"))); + assertFalse(channel.writeInbound(byteBufOf(Integer.toString(content.length)))); + assertFalse(channel.writeInbound(byteBufOf("\r\n"))); + assertFalse(channel.writeInbound(byteBufOf(buf1))); + assertFalse(channel.writeInbound(byteBufOf(buf2))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); + + FullBulkErrorStringRedisMessage msg = channel.readInbound(); + + assertThat(bytesOf(msg.content())).isEqualTo(content); + + ReferenceCountUtil.release(msg); + } + + @Test + public void shouldDecodeBulkVerbatimString() { + String buf1 = "txt:bulk\nst"; + String buf2 = "ring\ntest\n1234"; + byte[] content = bytesOf(buf1 + buf2); + assertFalse(channel.writeInbound(byteBufOf("="))); + assertFalse(channel.writeInbound(byteBufOf(Integer.toString(content.length)))); + assertFalse(channel.writeInbound(byteBufOf("\r\n"))); + assertFalse(channel.writeInbound(byteBufOf(buf1))); + assertFalse(channel.writeInbound(byteBufOf(buf2))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); + + FullBulkVerbatimStringRedisMessage msg = channel.readInbound(); + + assertThat(bytesOf(msg.content())).isEqualTo(content); + assertThat(msg.format()).isEqualTo("txt"); + assertThat(msg.realContent()).isEqualTo("bulk\nstring\ntest\n1234"); + + ReferenceCountUtil.release(msg); + } + + @Test + public void shouldDecodeSet() { + assertFalse(channel.writeInbound(byteBufOf("~3\r\n"))); + assertFalse(channel.writeInbound(byteBufOf(":1234\r\n"))); + assertFalse(channel.writeInbound(byteBufOf("+sim"))); + assertFalse(channel.writeInbound(byteBufOf("ple\r\n-err"))); + assertTrue(channel.writeInbound(byteBufOf("or\r\n"))); + + SetRedisMessage msg = channel.readInbound(); + Set children = msg.children(); + + assertThat(children.size()).isEqualTo(3); + for (RedisMessage child : children) { + if (child instanceof IntegerRedisMessage) { + assertThat(((IntegerRedisMessage) child).value()).isEqualTo(1234L); + } else if (child instanceof SimpleStringRedisMessage) { + assertThat(((SimpleStringRedisMessage) child).content()).isEqualTo("simple"); + } else if (child instanceof ErrorRedisMessage) { + assertThat(((ErrorRedisMessage) child).content()).isEqualTo("error"); + } else { + fail("Unexpected types"); + } + } + + ReferenceCountUtil.release(msg); + } + + @Test + public void shouldDecodeEmptySet() { + assertTrue(channel.writeInbound(byteBufOf("~0\r\n"))); + + RedisMessage msg = channel.readInbound(); + + assertTrue(msg instanceof SetRedisMessage); + assertThat(((SetRedisMessage) msg).children().size()).isEqualTo(0); + + ReferenceCountUtil.release(msg); + } + + @Test + public void shouldDecodeMap() { + assertFalse(channel.writeInbound(byteBufOf("%2\r\n"))); + assertFalse(channel.writeInbound(byteBufOf("+first\r\n"))); + assertFalse(channel.writeInbound(byteBufOf(":1\r\n"))); + assertFalse(channel.writeInbound(byteBufOf("+second\r\n-err"))); + assertTrue(channel.writeInbound(byteBufOf("or\r\n"))); + + MapRedisMessage msg = channel.readInbound(); + Map children = msg.children(); + + assertThat(children.size()).isEqualTo(2); + + for (Map.Entry messageEntry : children.entrySet()) { + assertThat(messageEntry.getKey()).isInstanceOf(SimpleStringRedisMessage.class); + String key = ((SimpleStringRedisMessage) messageEntry.getKey()).content(); + if ("first".equals(key)) { + assertThat(messageEntry.getValue()).isInstanceOf(IntegerRedisMessage.class); + assertThat(((IntegerRedisMessage) messageEntry.getValue()).value()).isEqualTo(1L); + } else if ("second".equals(key)) { + assertThat(messageEntry.getValue()).isInstanceOf(ErrorRedisMessage.class); + assertThat(((ErrorRedisMessage) messageEntry.getValue()).content()).isEqualTo("error"); + } else { + fail("Unexpected key"); + } + } + + ReferenceCountUtil.release(msg); + } + + @Test + public void shouldDecodeEmptyMap() { + assertTrue(channel.writeInbound(byteBufOf("%0\r\n"))); + + RedisMessage msg = channel.readInbound(); + + assertTrue(msg instanceof MapRedisMessage); + assertThat(((MapRedisMessage) msg).children().size()).isEqualTo(0); + + ReferenceCountUtil.release(msg); + } + + @Test + public void shouldNotBeDecodedMapOnNotEnoughMessage() { + assertFalse(channel.writeInbound(byteBufOf("%2\r\n"))); + assertFalse(channel.writeInbound(byteBufOf("+first\r\n"))); + assertFalse(channel.writeInbound(byteBufOf(":1\r\n"))); + assertFalse(channel.writeInbound(byteBufOf("+second\r\n"))); + + MapRedisMessage msg = channel.readInbound(); + + assertThat(msg).isNull(); + } + + @Test + public void shouldDecodePush() throws Exception { + assertFalse(channel.writeInbound(byteBufOf(">3\r\n"))); + assertFalse(channel.writeInbound(byteBufOf(":1234\r\n"))); + assertFalse(channel.writeInbound(byteBufOf("+sim"))); + assertFalse(channel.writeInbound(byteBufOf("ple\r\n-err"))); + assertTrue(channel.writeInbound(byteBufOf("or\r\n"))); + + PushRedisMessage msg = channel.readInbound(); + List children = msg.children(); + + assertThat(msg.children().size()).isEqualTo(3); + + assertThat(children.get(0)).isInstanceOf(IntegerRedisMessage.class); + assertThat(((IntegerRedisMessage) children.get(0)).value()).isEqualTo(1234L); + assertThat(children.get(1)).isInstanceOf(SimpleStringRedisMessage.class); + assertThat(((SimpleStringRedisMessage) children.get(1)).content()).isEqualTo("simple"); + assertThat(children.get(2)).isInstanceOf(ErrorRedisMessage.class); + assertThat(((ErrorRedisMessage) children.get(2)).content()).isEqualTo("error"); + + ReferenceCountUtil.release(msg); + } + } diff --git a/codec-redis/src/test/java/io/netty/contrib/handler/codec/redis/RedisEncoderTest.java b/codec-redis/src/test/java/io/netty/contrib/handler/codec/redis/RedisEncoderTest.java index 55f75e8..5858c7c 100644 --- a/codec-redis/src/test/java/io/netty/contrib/handler/codec/redis/RedisEncoderTest.java +++ b/codec-redis/src/test/java/io/netty/contrib/handler/codec/redis/RedisEncoderTest.java @@ -24,7 +24,10 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static io.netty.contrib.handler.codec.redis.RedisCodecTestUtil.byteBufOf; import static io.netty.contrib.handler.codec.redis.RedisCodecTestUtil.bytesOf; @@ -192,4 +195,181 @@ private static ByteBuf readAll(EmbeddedChannel channel) { } return buf; } + + @Test + public void shouldEncodeNull() { + boolean result = channel.writeOutbound(NullRedisMessage.INSTANCE); + assertTrue(result); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written)).isEqualTo(bytesOf("_\r\n")); + written.release(); + } + + @Test + public void shouldEncodeBoolean() { + boolean result = channel.writeOutbound(BooleanRedisMessage.TRUE); + assertTrue(result); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written)).isEqualTo(bytesOf("#t\r\n")); + written.release(); + + result = channel.writeOutbound(BooleanRedisMessage.FALSE); + assertTrue(result); + + written = readAll(channel); + assertThat(bytesOf(written)).isEqualTo(bytesOf("#f\r\n")); + written.release(); + } + + @Test + public void shouldEncodeDouble() { + boolean result = channel.writeOutbound(new DoubleRedisMessage(1.23d)); + assertTrue(result); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written)).isEqualTo(bytesOf(",1.23\r\n")); + written.release(); + + result = channel.writeOutbound(DoubleRedisMessage.POSITIVE_INFINITY); + assertTrue(result); + + written = readAll(channel); + assertThat(bytesOf(written)).isEqualTo((bytesOf(",inf\r\n"))); + written.release(); + } + + @Test + public void shouldEncodeBigNumber() { + BigNumberRedisMessage message = + new BigNumberRedisMessage(bytesOf("3492890328409238509324850943850943825024385")); + boolean result = channel.writeOutbound(message); + assertTrue(result); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written)).isEqualTo(bytesOf("(3492890328409238509324850943850943825024385\r\n")); + + written.release(); + } + + @Test + public void shouldEncodeFullBulkErrorString() { + ByteBuf bulkString = byteBufOf("bulk\nstring\ntest").retain(); + int length = bulkString.readableBytes(); + RedisMessage msg = new FullBulkErrorStringRedisMessage(bulkString); + + boolean result = channel.writeOutbound(msg); + assertTrue(result); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written)).isEqualTo(bytesOf("!" + length + "\r\nbulk\nstring\ntest\r\n")); + written.release(); + } + + @Test + public void shouldEncodeBulkErrorStringContent() { + RedisMessage header = new BulkErrorStringHeaderRedisMessage(16); + RedisMessage body1 = new DefaultBulkStringRedisContent(byteBufOf("bulk\nstr").retain()); + RedisMessage body2 = new DefaultLastBulkStringRedisContent(byteBufOf("ing\ntest").retain()); + + assertTrue(channel.writeOutbound(header)); + assertTrue(channel.writeOutbound(body1)); + assertTrue(channel.writeOutbound(body2)); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written)).isEqualTo(bytesOf("!16\r\nbulk\nstring\ntest\r\n")); + written.release(); + } + + @Test + public void shouldEncodeFullBulkVerbatimString() { + ByteBuf bulkString = byteBufOf("txt:bulk\nstring\ntest").retain(); + int length = bulkString.readableBytes(); + RedisMessage msg = new FullBulkVerbatimStringRedisMessage(bulkString); + + boolean result = channel.writeOutbound(msg); + assertTrue(result); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written)).isEqualTo(bytesOf("=" + length + "\r\ntxt:bulk\nstring\ntest\r\n")); + written.release(); + } + + @Test + public void shouldEncodeBulkVerbatimStringContent() { + RedisMessage header = new BulkVerbatimStringHeaderRedisMessage(20); + RedisMessage body1 = new DefaultBulkStringRedisContent(byteBufOf("txt:bulk\nstr").retain()); + RedisMessage body2 = new DefaultLastBulkStringRedisContent(byteBufOf("ing\ntest").retain()); + + assertTrue(channel.writeOutbound(header)); + assertTrue(channel.writeOutbound(body1)); + assertTrue(channel.writeOutbound(body2)); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written)).isEqualTo(bytesOf("=20\r\ntxt:bulk\nstring\ntest\r\n")); + written.release(); + } + + @Test + public void shouldEncodeSet() { + Set children = new HashSet(); + children.add(new SimpleStringRedisMessage("apple")); + children.add(new FullBulkStringRedisMessage(byteBufOf("orange").retain())); + children.add(BooleanRedisMessage.TRUE); + children.add(new IntegerRedisMessage(100)); + RedisMessage msg = new SetRedisMessage(children); + + boolean result = channel.writeOutbound(msg); + assertTrue(result); + + ByteBuf written = readAll(channel); + + String encodeResult = new String(bytesOf(written)); + assertThat(encodeResult).startsWith("~4\r\n"); + // out-of-order + assertThat(encodeResult).contains("$6\r\norange\r\n"); + assertThat(encodeResult).contains("#t\r\n"); + assertThat(encodeResult).contains(":100\r\n"); + assertThat(encodeResult).contains("+apple\r\n"); + + written.release(); + } + + @Test + public void shouldEncodeMap() { + HashMap map = new HashMap(); + map.put(new SimpleStringRedisMessage("first"), new IntegerRedisMessage(1)); + map.put(new SimpleStringRedisMessage("second"), new IntegerRedisMessage(2)); + MapRedisMessage mapMsg = new MapRedisMessage(map); + + boolean result = channel.writeOutbound(mapMsg); + assertTrue(result); + + ByteBuf written = readAll(channel); + + String encodeResult = new String(bytesOf(written)); + assertThat(encodeResult).startsWith("%2\r\n"); + + assertThat(encodeResult).contains("+first\r\n:1\r\n"); + assertThat(encodeResult).contains("+second\r\n:2\r\n"); + + written.release(); + } + + @Test + public void shouldEncodePush() { + List children = new ArrayList(); + children.add(new FullBulkStringRedisMessage(byteBufOf("foo").retain())); + children.add(new FullBulkStringRedisMessage(byteBufOf("bar").retain())); + RedisMessage msg = new PushRedisMessage(children); + + boolean result = channel.writeOutbound(msg); + assertTrue(result); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written)).isEqualTo((bytesOf(">2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"))); + written.release(); + } + }