Skip to content

Commit

Permalink
Improve code by adding some null checks (#3115)
Browse files Browse the repository at this point in the history
  • Loading branch information
tishun authored Jan 6, 2025
1 parent 1448b2b commit a254a78
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 24 deletions.
45 changes: 25 additions & 20 deletions src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@
public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCommands {

/**
* When we encounter an unexpected IOException we look for these {@link Throwable#getMessage() messages} (because we have no
* better way to distinguish) and log them at DEBUG rather than WARN, since they are generally caused by unclean client
* disconnects rather than an actual problem.
* When we encounter an unexpected {@link IOException} we look for these {@link Throwable#getMessage() messages} (because we
* have no better way to distinguish) and log them at DEBUG rather than WARN, since they are generally caused by unclean
* client disconnects rather than an actual problem.
*/
static final Set<String> SUPPRESS_IO_EXCEPTION_MESSAGES = LettuceSets.unmodifiableSet("Connection reset by peer",
"Broken pipe", "Connection timed out");
Expand Down Expand Up @@ -123,7 +123,7 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom

private Channel channel;

private ByteBuf buffer;
private ByteBuf readBuffer;

private boolean hasDecodeProgress;

Expand Down Expand Up @@ -182,8 +182,8 @@ protected void setState(LifecycleState lifecycleState) {
}
}

void setBuffer(ByteBuf buffer) {
this.buffer = buffer;
void setReadBuffer(ByteBuf readBuffer) {
this.readBuffer = readBuffer;
}

@Override
Expand Down Expand Up @@ -222,7 +222,7 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

setState(LifecycleState.REGISTERED);

buffer = ctx.alloc().buffer(8192 * 8);
readBuffer = ctx.alloc().buffer(8192 * 8);
rsm = new RedisStateMachine();
ctx.fireChannelRegistered();
}
Expand All @@ -242,10 +242,15 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
return;
}

channel = null;
buffer.release();
rsm.close();

if (readBuffer != null) {
readBuffer.release();
}

if (rsm != null) {
rsm.close();
}
rsm = null;

reset();
Expand Down Expand Up @@ -596,7 +601,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}

try {
if (buffer.refCnt() < 1) {
if (readBuffer == null || readBuffer.refCnt() < 1) {
logger.warn("{} Ignoring received data for closed or abandoned connection", logPrefix());
return;
}
Expand All @@ -610,10 +615,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
logger.trace("{} Buffer: {}", logPrefix(), input.toString(Charset.defaultCharset()).trim());
}

buffer.touch("CommandHandler.read(…)");
buffer.writeBytes(input);
readBuffer.touch("CommandHandler.read(…)");
readBuffer.writeBytes(input);

decode(ctx, buffer);
decode(ctx, readBuffer);
} finally {
input.release();
}
Expand Down Expand Up @@ -829,7 +834,7 @@ private boolean decode0(ChannelHandlerContext ctx, ByteBuf buffer, RedisCommand<

private boolean decode0(ChannelHandlerContext ctx, ByteBuf buffer, CommandOutput<?, ?, ?> pushOutput) {

if (!rsm.decode(buffer, pushOutput, ctx::fireExceptionCaught)) {
if (rsm != null && !rsm.decode(buffer, pushOutput, ctx::fireExceptionCaught)) {
return false;
}

Expand All @@ -852,11 +857,11 @@ private boolean decode0(ChannelHandlerContext ctx, ByteBuf buffer, CommandOutput
}

protected boolean decode(ByteBuf buffer, CommandOutput<?, ?, ?> output) {
return rsm.decode(buffer, output);
return rsm != null && rsm.decode(buffer, output);
}

protected boolean decode(ByteBuf buffer, RedisCommand<?, ?, ?> command, CommandOutput<?, ?, ?> output) {
return rsm.decode(buffer, output, command::completeExceptionally);
return rsm != null && rsm.decode(buffer, output, command::completeExceptionally);
}

/**
Expand Down Expand Up @@ -918,7 +923,7 @@ private void onProtectedMode(String message) {
* @param command
*/
protected void afterDecode(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> command) {
decodeBufferPolicy.afterCommandDecoded(buffer);
decodeBufferPolicy.afterCommandDecoded(readBuffer);
}

private void recordLatency(WithLatency withLatency, RedisCommand<?, ?, ?> command) {
Expand Down Expand Up @@ -960,8 +965,8 @@ private void resetInternals() {
rsm.reset();
}

if (buffer.refCnt() > 0) {
buffer.clear();
if (readBuffer != null && readBuffer.refCnt() > 0) {
readBuffer.clear();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static DecodeBufferPolicy ratio(float bufferUsageRatio) {

/**
* {@link DecodeBufferPolicy} that {@link ByteBuf#discardReadBytes() discards read bytes} after each decoding phase. This
* strategy hast the most memory efficiency but also leads to more CPU pressure.
* strategy has the most memory efficiency but also leads to more CPU pressure.
*
* @return the strategy object.
*/
Expand All @@ -80,7 +80,7 @@ public static DecodeBufferPolicy always() {
/**
* {@link DecodeBufferPolicy} that {@link ByteBuf#discardSomeReadBytes() discards some read bytes} after each decoding
* phase. This strategy might discard some, all, or none of read bytes depending on its internal implementation to reduce
* overall memory bandwidth consumption at the cost of potentially additional memory consumption.
* overall CPU bandwidth consumption at the cost of potentially additional memory consumption.
*
* @return the strategy object.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ void shouldNotDiscardReadBytes() throws Exception {

// set the command handler buffer capacity to 30, make it easy to test
ByteBuf internalBuffer = context.alloc().buffer(30);
sut.setBuffer(internalBuffer);
sut.setReadBuffer(internalBuffer);

// mock a multi reply, which will reach the buffer usage ratio
ByteBuf msg = context.alloc().buffer(100);
Expand Down Expand Up @@ -559,7 +559,7 @@ void shouldDiscardReadBytes() throws Exception {

// set the command handler buffer capacity to 30, make it easy to test
ByteBuf internalBuffer = context.alloc().buffer(30);
sut.setBuffer(internalBuffer);
sut.setReadBuffer(internalBuffer);

// mock a multi reply, which will reach the buffer usage ratio
ByteBuf msg = context.alloc().buffer(100);
Expand Down Expand Up @@ -638,4 +638,16 @@ void shouldHandleIncompleteResponses() throws Exception {
assertThat(hmgetCommand.get()).hasSize(3);
}

/**
* @see <a href="https://github.com/redis/lettuce/issues/3087">Issue 3087</a>
*/
@Test
void shouldHandleNullBuffers() throws Exception {
sut.setReadBuffer(null);
sut.channelRead(context, Unpooled.wrappedBuffer(("*4\r\n" + "$3\r\nONE\r\n" + "$4\r\n>TW").getBytes()));
assertThat(stack).hasSize(0);

sut.channelUnregistered(context);
}

}

0 comments on commit a254a78

Please sign in to comment.