diff --git a/src/main/java/io/lettuce/core/protocol/CommandHandler.java b/src/main/java/io/lettuce/core/protocol/CommandHandler.java index 6791be04c..59aee61e0 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandHandler.java +++ b/src/main/java/io/lettuce/core/protocol/CommandHandler.java @@ -82,9 +82,9 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCommands { /** - * When we encounter an unexpected IOException we look for these {@link Throwable#getMessage() messages} (because we have no - * better way to distinguish) and log them at DEBUG rather than WARN, since they are generally caused by unclean client - * disconnects rather than an actual problem. + * When we encounter an unexpected {@link IOException} we look for these {@link Throwable#getMessage() messages} (because we + * have no better way to distinguish) and log them at DEBUG rather than WARN, since they are generally caused by unclean + * client disconnects rather than an actual problem. */ static final Set SUPPRESS_IO_EXCEPTION_MESSAGES = LettuceSets.unmodifiableSet("Connection reset by peer", "Broken pipe", "Connection timed out"); @@ -123,7 +123,7 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom private Channel channel; - private ByteBuf buffer; + private ByteBuf readBuffer; private boolean hasDecodeProgress; @@ -182,8 +182,8 @@ protected void setState(LifecycleState lifecycleState) { } } - void setBuffer(ByteBuf buffer) { - this.buffer = buffer; + void setReadBuffer(ByteBuf readBuffer) { + this.readBuffer = readBuffer; } @Override @@ -222,7 +222,7 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception { setState(LifecycleState.REGISTERED); - buffer = ctx.alloc().buffer(8192 * 8); + readBuffer = ctx.alloc().buffer(8192 * 8); rsm = new RedisStateMachine(); ctx.fireChannelRegistered(); } @@ -242,10 +242,15 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); return; } - channel = null; - buffer.release(); - rsm.close(); + + if (readBuffer != null) { + readBuffer.release(); + } + + if (rsm != null) { + rsm.close(); + } rsm = null; reset(); @@ -596,7 +601,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } try { - if (buffer.refCnt() < 1) { + if (readBuffer == null || readBuffer.refCnt() < 1) { logger.warn("{} Ignoring received data for closed or abandoned connection", logPrefix()); return; } @@ -610,10 +615,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception logger.trace("{} Buffer: {}", logPrefix(), input.toString(Charset.defaultCharset()).trim()); } - buffer.touch("CommandHandler.read(…)"); - buffer.writeBytes(input); + readBuffer.touch("CommandHandler.read(…)"); + readBuffer.writeBytes(input); - decode(ctx, buffer); + decode(ctx, readBuffer); } finally { input.release(); } @@ -829,7 +834,7 @@ private boolean decode0(ChannelHandlerContext ctx, ByteBuf buffer, RedisCommand< private boolean decode0(ChannelHandlerContext ctx, ByteBuf buffer, CommandOutput pushOutput) { - if (!rsm.decode(buffer, pushOutput, ctx::fireExceptionCaught)) { + if (rsm != null && !rsm.decode(buffer, pushOutput, ctx::fireExceptionCaught)) { return false; } @@ -852,11 +857,11 @@ private boolean decode0(ChannelHandlerContext ctx, ByteBuf buffer, CommandOutput } protected boolean decode(ByteBuf buffer, CommandOutput output) { - return rsm.decode(buffer, output); + return rsm != null && rsm.decode(buffer, output); } protected boolean decode(ByteBuf buffer, RedisCommand command, CommandOutput output) { - return rsm.decode(buffer, output, command::completeExceptionally); + return rsm != null && rsm.decode(buffer, output, command::completeExceptionally); } /** @@ -918,7 +923,7 @@ private void onProtectedMode(String message) { * @param command */ protected void afterDecode(ChannelHandlerContext ctx, RedisCommand command) { - decodeBufferPolicy.afterCommandDecoded(buffer); + decodeBufferPolicy.afterCommandDecoded(readBuffer); } private void recordLatency(WithLatency withLatency, RedisCommand command) { @@ -960,8 +965,8 @@ private void resetInternals() { rsm.reset(); } - if (buffer.refCnt() > 0) { - buffer.clear(); + if (readBuffer != null && readBuffer.refCnt() > 0) { + readBuffer.clear(); } } diff --git a/src/main/java/io/lettuce/core/protocol/DecodeBufferPolicies.java b/src/main/java/io/lettuce/core/protocol/DecodeBufferPolicies.java index ae675b698..374761f33 100644 --- a/src/main/java/io/lettuce/core/protocol/DecodeBufferPolicies.java +++ b/src/main/java/io/lettuce/core/protocol/DecodeBufferPolicies.java @@ -69,7 +69,7 @@ public static DecodeBufferPolicy ratio(float bufferUsageRatio) { /** * {@link DecodeBufferPolicy} that {@link ByteBuf#discardReadBytes() discards read bytes} after each decoding phase. This - * strategy hast the most memory efficiency but also leads to more CPU pressure. + * strategy has the most memory efficiency but also leads to more CPU pressure. * * @return the strategy object. */ @@ -80,7 +80,7 @@ public static DecodeBufferPolicy always() { /** * {@link DecodeBufferPolicy} that {@link ByteBuf#discardSomeReadBytes() discards some read bytes} after each decoding * phase. This strategy might discard some, all, or none of read bytes depending on its internal implementation to reduce - * overall memory bandwidth consumption at the cost of potentially additional memory consumption. + * overall CPU bandwidth consumption at the cost of potentially additional memory consumption. * * @return the strategy object. */ diff --git a/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java b/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java index 833625df5..e7f7cfec6 100644 --- a/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java +++ b/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java @@ -530,7 +530,7 @@ void shouldNotDiscardReadBytes() throws Exception { // set the command handler buffer capacity to 30, make it easy to test ByteBuf internalBuffer = context.alloc().buffer(30); - sut.setBuffer(internalBuffer); + sut.setReadBuffer(internalBuffer); // mock a multi reply, which will reach the buffer usage ratio ByteBuf msg = context.alloc().buffer(100); @@ -559,7 +559,7 @@ void shouldDiscardReadBytes() throws Exception { // set the command handler buffer capacity to 30, make it easy to test ByteBuf internalBuffer = context.alloc().buffer(30); - sut.setBuffer(internalBuffer); + sut.setReadBuffer(internalBuffer); // mock a multi reply, which will reach the buffer usage ratio ByteBuf msg = context.alloc().buffer(100); @@ -638,4 +638,16 @@ void shouldHandleIncompleteResponses() throws Exception { assertThat(hmgetCommand.get()).hasSize(3); } + /** + * @see Issue 3087 + */ + @Test + void shouldHandleNullBuffers() throws Exception { + sut.setReadBuffer(null); + sut.channelRead(context, Unpooled.wrappedBuffer(("*4\r\n" + "$3\r\nONE\r\n" + "$4\r\n>TW").getBytes())); + assertThat(stack).hasSize(0); + + sut.channelUnregistered(context); + } + }