Skip to content

Commit 141ee1e

Browse files
rozzavbabanin
andauthored
Fix Netty reference leak. (#1762)
* Fix Netty reference leak. Fixes netty byte buffer releases in edge case scenarios: - Ensure async select server uses a callback if the cluster had been closed - Ensure that handleReadResponse checks to see if the cluster had been closed before retaining incoming buffers - Ensure closing the netty stream releases all references Test fixes - Ensure tests run using paranoid leak detection - Fail the testsuite if a leak is detected. - Fixed releasing buffers in the ByteBufferBsonOutputTest. JAVA-5901 Co-authored-by: Viacheslav Babanin <[email protected]>
1 parent a3c3857 commit 141ee1e

File tree

8 files changed

+44
-13
lines changed

8 files changed

+44
-13
lines changed

.evergreen/run-tests.sh

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,4 +132,9 @@ echo "Running tests with Java ${JAVA_VERSION}"
132132
${MULTI_MONGOS_URI_SYSTEM_PROPERTY} ${API_VERSION} ${GRADLE_EXTRA_VARS} \
133133
${JAVA_SYSPROP_ASYNC_TRANSPORT} ${JAVA_SYSPROP_NETTY_SSL_PROVIDER} \
134134
-Dorg.mongodb.test.fle.on.demand.credential.test.failure.enabled=true \
135-
--stacktrace --info --continue ${TESTS}
135+
--stacktrace --info --continue ${TESTS} | tee -a logs.txt
136+
137+
if grep -q 'LEAK:' logs.txt ; then
138+
echo "Netty Leak detected, please inspect build log"
139+
exit 1
140+
fi

buildSrc/src/main/kotlin/conventions/testing-base.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ tasks.withType<Test> {
3434

3535
useJUnitPlatform()
3636

37+
jvmArgs.add("-Dio.netty.leakDetection.level=paranoid")
38+
3739
// Pass any `org.mongodb.*` system settings
3840
systemProperties =
3941
System.getProperties()

driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,9 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera
183183
@Override
184184
public void selectServerAsync(final ServerSelector serverSelector, final OperationContext operationContext,
185185
final SingleResultCallback<ServerTuple> callback) {
186-
isTrue("open", !isClosed());
186+
if (isClosed()) {
187+
callback.onResult(null, new MongoClientException("Cluster was closed during server selection."));
188+
}
187189

188190
Timeout computedServerSelectionTimeout = operationContext.getTimeoutContext().computeServerSelectionTimeout();
189191
ServerSelectionRequest request = new ServerSelectionRequest(

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@
7575
import static com.mongodb.assertions.Assertions.assertNull;
7676
import static com.mongodb.assertions.Assertions.isTrue;
7777
import static com.mongodb.assertions.Assertions.notNull;
78-
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
7978
import static com.mongodb.internal.TimeoutContext.createMongoTimeoutException;
79+
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
8080
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
8181
import static com.mongodb.internal.connection.Authenticator.shouldAuthenticate;
8282
import static com.mongodb.internal.connection.CommandHelper.HELLO;
@@ -355,7 +355,7 @@ private Compressor createCompressor(final MongoCompressor mongoCompressor) {
355355
public void close() {
356356
// All but the first call is a no-op
357357
if (!isClosed.getAndSet(true) && (stream != null)) {
358-
stream.close();
358+
stream.close();
359359
}
360360
}
361361

driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ public ByteBuf asReadOnly() {
256256

257257
@Override
258258
public ByteBuf duplicate() {
259-
return new NettyByteBuf(proxied.duplicate().retain(), isWriting);
259+
return new NettyByteBuf(proxied.retainedDuplicate(), isWriting);
260260
}
261261

262262
@Override

driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,7 @@ private void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf>
307307
composite.addComponent(next);
308308
iter.remove();
309309
} else {
310-
next.retain();
311-
composite.addComponent(next.readSlice(bytesNeededFromCurrentBuffer));
310+
composite.addComponent(next.readRetainedSlice(bytesNeededFromCurrentBuffer));
312311
}
313312
composite.writerIndex(composite.writerIndex() + bytesNeededFromCurrentBuffer);
314313
bytesNeeded -= bytesNeededFromCurrentBuffer;
@@ -349,7 +348,11 @@ private boolean hasBytesAvailable(final int numBytes) {
349348
private void handleReadResponse(@Nullable final io.netty.buffer.ByteBuf buffer, @Nullable final Throwable t) {
350349
PendingReader localPendingReader = withLock(lock, () -> {
351350
if (buffer != null) {
352-
pendingInboundBuffers.add(buffer.retain());
351+
if (isClosed) {
352+
pendingException = new MongoSocketException("Received data after the stream was closed.", address);
353+
} else {
354+
pendingInboundBuffers.add(buffer.retain());
355+
}
353356
} else {
354357
pendingException = t;
355358
}
@@ -378,7 +381,9 @@ public void close() {
378381
for (Iterator<io.netty.buffer.ByteBuf> iterator = pendingInboundBuffers.iterator(); iterator.hasNext();) {
379382
io.netty.buffer.ByteBuf nextByteBuf = iterator.next();
380383
iterator.remove();
381-
nextByteBuf.release();
384+
// Drops all retains to prevent silent leaks; assumes callers have already released
385+
// ByteBuffers returned by that NettyStream before calling close.
386+
nextByteBuf.release(nextByteBuf.refCnt());
382387
}
383388
});
384389
}

driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufferBsonOutputTest.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,7 @@ void shouldWriteUtf8CString(final boolean useBranch, final BufferProvider buffer
495495
@ParameterizedTest(name = "should get byte buffers as little endian. Parameters: useBranch={0}, bufferProvider={1}")
496496
@MethodSource("bufferProvidersWithBranches")
497497
void shouldGetByteBuffersAsLittleEndian(final boolean useBranch, final BufferProvider bufferProvider) {
498+
List<ByteBuf> byteBuffers = new ArrayList<>();
498499
try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
499500
byte[] v = {1, 0, 0, 0};
500501
if (useBranch) {
@@ -504,7 +505,11 @@ void shouldGetByteBuffersAsLittleEndian(final boolean useBranch, final BufferPro
504505
} else {
505506
out.writeBytes(v);
506507
}
507-
assertEquals(1, out.getByteBuffers().get(0).getInt());
508+
509+
byteBuffers = out.getByteBuffers();
510+
assertEquals(1, byteBuffers.get(0).getInt());
511+
} finally {
512+
byteBuffers.forEach(ByteBuf::release);
508513
}
509514
}
510515

@@ -1017,6 +1022,7 @@ void shouldWriteInt32WithinSpanningBuffers(
10171022
final int expectedLastBufferPosition,
10181023
final BufferProvider bufferProvider) {
10191024

1025+
List<ByteBuf> buffers = new ArrayList<>();
10201026
try (ByteBufferBsonOutput output =
10211027
new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Integer.BYTES))) {
10221028

@@ -1028,12 +1034,14 @@ void shouldWriteInt32WithinSpanningBuffers(
10281034

10291035
//then
10301036
//getByteBuffers returns ByteBuffers with limit() set to position, position set to 0.
1031-
List<ByteBuf> buffers = output.getByteBuffers();
1037+
buffers = output.getByteBuffers();
10321038
assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch");
10331039
assertBufferContents(expectedBuffers, buffers);
10341040

10351041
assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit());
10361042
assertEquals(expectedOutputPosition, output.getPosition());
1043+
} finally {
1044+
buffers.forEach(ByteBuf::release);
10371045
}
10381046
}
10391047

@@ -1049,6 +1057,7 @@ void shouldWriteInt64WithinSpanningBuffers(
10491057
final int expectedLastBufferPosition,
10501058
final BufferProvider bufferProvider) {
10511059

1060+
List<ByteBuf> buffers = new ArrayList<>();
10521061
try (ByteBufferBsonOutput output =
10531062
new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Long.BYTES))) {
10541063

@@ -1060,12 +1069,14 @@ void shouldWriteInt64WithinSpanningBuffers(
10601069

10611070
//then
10621071
//getByteBuffers returns ByteBuffers with limit() set to position, position set to 0.
1063-
List<ByteBuf> buffers = output.getByteBuffers();
1072+
buffers = output.getByteBuffers();
10641073
assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch");
10651074
assertBufferContents(expectedBuffers, buffers);
10661075

10671076
assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit());
10681077
assertEquals(expectedOutputPosition, output.getPosition());
1078+
} finally {
1079+
buffers.forEach(ByteBuf::release);
10691080
}
10701081
}
10711082

@@ -1081,6 +1092,7 @@ void shouldWriteDoubleWithinSpanningBuffers(
10811092
final int expectedLastBufferPosition,
10821093
final BufferProvider bufferProvider) {
10831094

1095+
List<ByteBuf> buffers = new ArrayList<>();
10841096
try (ByteBufferBsonOutput output =
10851097
new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Long.BYTES))) {
10861098

@@ -1092,12 +1104,14 @@ void shouldWriteDoubleWithinSpanningBuffers(
10921104

10931105
//then
10941106
//getByteBuffers returns ByteBuffers with limit() set to position, position set to 0.
1095-
List<ByteBuf> buffers = output.getByteBuffers();
1107+
buffers = output.getByteBuffers();
10961108
assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch");
10971109
assertBufferContents(expectedBuffers, buffers);
10981110

10991111
assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit());
11001112
assertEquals(expectedOutputPosition, output.getPosition());
1113+
} finally {
1114+
buffers.forEach(ByteBuf::release);
11011115
}
11021116
}
11031117

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL;
4848
import static com.mongodb.MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL;
4949
import static com.mongodb.ReadPreference.primary;
50+
import static com.mongodb.assertions.Assertions.isTrue;
5051
import static com.mongodb.assertions.Assertions.notNull;
5152
import static com.mongodb.internal.TimeoutContext.createTimeoutContext;
5253
import static com.mongodb.reactivestreams.client.internal.MongoOperationPublisher.sinkToCallback;
@@ -73,6 +74,7 @@ public class OperationExecutorImpl implements OperationExecutor {
7374
@Override
7475
public <T> Mono<T> execute(final AsyncReadOperation<T> operation, final ReadPreference readPreference, final ReadConcern readConcern,
7576
@Nullable final ClientSession session) {
77+
isTrue("open", !mongoClient.getCluster().isClosed());
7678
notNull("operation", operation);
7779
notNull("readPreference", readPreference);
7880
notNull("readConcern", readConcern);
@@ -109,6 +111,7 @@ public <T> Mono<T> execute(final AsyncReadOperation<T> operation, final ReadPref
109111
@Override
110112
public <T> Mono<T> execute(final AsyncWriteOperation<T> operation, final ReadConcern readConcern,
111113
@Nullable final ClientSession session) {
114+
isTrue("open", !mongoClient.getCluster().isClosed());
112115
notNull("operation", operation);
113116
notNull("readConcern", readConcern);
114117

0 commit comments

Comments
 (0)