-
Notifications
You must be signed in to change notification settings - Fork 3.9k
core: Added changes to DelayedStream.setStream() should cancel the provided stream if not using it #11969
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core: Added changes to DelayedStream.setStream() should cancel the provided stream if not using it #11969
Changes from 3 commits
270567d
072d033
8ac9678
cac52d4
00eb166
8cd4d5a
2295bbe
9e6ece3
77216bf
d28fcb2
eeded6f
036cd41
61bb878
3a35a76
fa77210
129e747
f828a3c
8c7cf53
8879e94
920c384
0ab9e11
b7df168
58f39fa
013948d
a2220eb
67b2f3a
696dd52
6beb245
17e6b41
d1602ad
163c958
11adf6f
529a75e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -125,11 +125,22 @@ public void appendTimeoutInsight(InsightBuilder insight) { | |
@CheckReturnValue | ||
final Runnable setStream(ClientStream stream) { | ||
ClientStreamListener savedListener; | ||
ClientStream oldStream = null; | ||
boolean cancelOldStream = false; | ||
|
||
synchronized (this) { | ||
// If realStream != null, then either setStream() or cancel() has been called. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Has this changed ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, added back the removed comment, it was accidentally missed while updating the code, but the logic still holds. if realStream() != null, it means setStream() was called, and if listener !=null, the old stream may have been cancelled. |
||
if (realStream != null) { | ||
oldStream = realStream; | ||
cancelOldStream = listener != null; | ||
} | ||
if (oldStream != null && !cancelOldStream) { | ||
return null; | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove an empty line here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
if (cancelOldStream) { | ||
oldStream.cancel(Status.CANCELLED.withDescription("Replaced by a new Stream")); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. We've already chosen a stream. We can't change to a different stream now. I don't understand why this is being done this way, especially since #1537 said that the stream passed into this function would be the one cancelled. |
||
} | ||
|
||
setRealStream(checkNotNull(stream, "stream")); | ||
savedListener = listener; | ||
if (savedListener == null) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -170,19 +170,20 @@ public void uncaughtException(Thread t, Throwable e) { | |
|
||
@Test public void newStreamThenAssignTransportThenShutdown() { | ||
ClientStream stream = delayedTransport.newStream(method, headers, callOptions, tracers); | ||
stream.start(streamListener); | ||
assertEquals(1, delayedTransport.getPendingStreamsCount()); | ||
assertTrue(stream instanceof DelayedStream); | ||
delayedTransport.reprocess(mockPicker); | ||
assertEquals(0, delayedTransport.getPendingStreamsCount()); | ||
delayedTransport.shutdown(SHUTDOWN_STATUS); | ||
verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); | ||
verify(transportListener).transportTerminated(); | ||
fakeExecutor.runDueTasks(); | ||
assertEquals(0, fakeExecutor.runDueTasks()); | ||
verify(mockRealTransport).newStream( | ||
same(method), same(headers), same(callOptions), | ||
ArgumentMatchers.<ClientStreamTracer[]>any()); | ||
stream.start(streamListener); | ||
verify(mockRealStream).start(same(streamListener)); | ||
verify(mockRealStream).start(any(ClientStreamListener.class)); | ||
} | ||
|
||
@Test public void transportTerminatedThenAssignTransport() { | ||
|
@@ -225,8 +226,10 @@ public void uncaughtException(Thread t, Throwable e) { | |
ClientStream stream = delayedTransport.newStream( | ||
method, new Metadata(), CallOptions.DEFAULT, tracers); | ||
stream.start(streamListener); | ||
|
||
assertEquals(1, delayedTransport.getPendingStreamsCount()); | ||
stream.cancel(Status.CANCELLED); | ||
|
||
assertEquals(0, delayedTransport.getPendingStreamsCount()); | ||
verify(streamListener).closed( | ||
same(Status.CANCELLED), same(RpcProgress.PROCESSED), any(Metadata.class)); | ||
|
@@ -271,14 +274,45 @@ public void uncaughtException(Thread t, Throwable e) { | |
verifyNoMoreInteractions(mockRealStream); | ||
} | ||
|
||
@Test | ||
public void testNewStreamThenShutDownNow() { | ||
ClientStream stream = delayedTransport.newStream( | ||
method, new Metadata(), CallOptions.DEFAULT, tracers); | ||
stream.start(streamListener); | ||
assertEquals(1,delayedTransport.getPendingStreamsCount()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add an empty line above asserts statements in all applicable places. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a space after the comma... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
delayedTransport.shutdownNow(Status.UNAVAILABLE); | ||
verify(transportListener).transportShutdown(any(Status.class)); | ||
verify(transportListener).transportTerminated(); | ||
verify(streamListener).closed( | ||
statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class)); | ||
|
||
assertEquals(0,delayedTransport.getPendingStreamsCount()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a space after the comma... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); | ||
} | ||
|
||
@Test | ||
public void testDelayedClientTransportPendingStreamsOnShutDown() { | ||
ClientStream clientStream = delayedTransport.newStream(method, headers, callOptions, tracers); | ||
ClientStream clientStream1 = delayedTransport.newStream(method, headers, callOptions, tracers); | ||
|
||
assertEquals(0, delayedTransport.getPendingStreamsCount()); | ||
clientStream.start(streamListener); | ||
clientStream1.start(streamListener); | ||
|
||
assertEquals(2, delayedTransport.getPendingStreamsCount()); | ||
delayedTransport.shutdownNow(Status.UNAVAILABLE); | ||
|
||
assertEquals(0, delayedTransport.getPendingStreamsCount()); | ||
} | ||
|
||
@Test public void newStreamThenShutdownTransportThenCancelStream() { | ||
ClientStream stream = delayedTransport.newStream( | ||
method, new Metadata(), CallOptions.DEFAULT, tracers); | ||
method, new Metadata(), CallOptions.DEFAULT, tracers); | ||
stream.start(streamListener); | ||
delayedTransport.shutdown(SHUTDOWN_STATUS); | ||
verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); | ||
verify(transportListener, times(0)).transportTerminated(); | ||
assertEquals(1, delayedTransport.getPendingStreamsCount()); | ||
stream.start(streamListener); | ||
stream.cancel(Status.CANCELLED); | ||
verify(transportListener).transportTerminated(); | ||
assertEquals(0, delayedTransport.getPendingStreamsCount()); | ||
|
@@ -322,7 +356,9 @@ public void uncaughtException(Thread t, Throwable e) { | |
assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); | ||
} | ||
|
||
@Test public void reprocessSemantics() { | ||
@Test | ||
@SuppressWarnings("DirectInvocationOnMock") | ||
public void reprocessSemantics() { | ||
CallOptions failFastCallOptions = CallOptions.DEFAULT.withOption(SHARD_ID, 1); | ||
CallOptions waitForReadyCallOptions = CallOptions.DEFAULT.withOption(SHARD_ID, 2) | ||
.withWaitForReady(); | ||
|
@@ -348,41 +384,48 @@ public void uncaughtException(Thread t, Throwable e) { | |
ff1.start(mock(ClientStreamListener.class)); | ||
ff1.halfClose(); | ||
PickSubchannelArgsMatcher ff1args = new PickSubchannelArgsMatcher(method, headers, | ||
failFastCallOptions); | ||
failFastCallOptions); | ||
transportListener.transportInUse(true); | ||
verify(transportListener).transportInUse(true); | ||
DelayedStream ff2 = (DelayedStream) delayedTransport.newStream( | ||
method2, headers2, failFastCallOptions, tracers); | ||
method2, headers2, failFastCallOptions, tracers); | ||
ff2.start(mock(ClientStreamListener.class)); | ||
PickSubchannelArgsMatcher ff2args = new PickSubchannelArgsMatcher(method2, headers2, | ||
failFastCallOptions); | ||
DelayedStream ff3 = (DelayedStream) delayedTransport.newStream( | ||
method, headers, failFastCallOptions, tracers); | ||
method, headers, failFastCallOptions, tracers); | ||
ff3.start(mock(ClientStreamListener.class)); | ||
PickSubchannelArgsMatcher ff3args = new PickSubchannelArgsMatcher(method, headers, | ||
failFastCallOptions); | ||
DelayedStream ff4 = (DelayedStream) delayedTransport.newStream( | ||
method2, headers2, failFastCallOptions, tracers); | ||
method2, headers2, failFastCallOptions, tracers); | ||
ff4.start(mock(ClientStreamListener.class)); | ||
PickSubchannelArgsMatcher ff4args = new PickSubchannelArgsMatcher(method2, headers2, | ||
failFastCallOptions); | ||
|
||
// Wait-for-ready streams | ||
FakeClock wfr3Executor = new FakeClock(); | ||
DelayedStream wfr1 = (DelayedStream) delayedTransport.newStream( | ||
method, headers, waitForReadyCallOptions, tracers); | ||
method, headers, waitForReadyCallOptions, tracers); | ||
wfr1.start(mock(ClientStreamListener.class)); | ||
PickSubchannelArgsMatcher wfr1args = new PickSubchannelArgsMatcher(method, headers, | ||
waitForReadyCallOptions); | ||
DelayedStream wfr2 = (DelayedStream) delayedTransport.newStream( | ||
method2, headers2, waitForReadyCallOptions, tracers); | ||
method2, headers2, waitForReadyCallOptions, tracers); | ||
wfr2.start(mock(ClientStreamListener.class)); | ||
PickSubchannelArgsMatcher wfr2args = new PickSubchannelArgsMatcher(method2, headers2, | ||
waitForReadyCallOptions); | ||
CallOptions wfr3callOptions = waitForReadyCallOptions.withExecutor( | ||
wfr3Executor.getScheduledExecutorService()); | ||
wfr3Executor.getScheduledExecutorService()); | ||
DelayedStream wfr3 = (DelayedStream) delayedTransport.newStream( | ||
method, headers, wfr3callOptions, tracers); | ||
wfr3.start(mock(ClientStreamListener.class)); | ||
wfr3.halfClose(); | ||
PickSubchannelArgsMatcher wfr3args = new PickSubchannelArgsMatcher(method, headers, | ||
wfr3callOptions); | ||
DelayedStream wfr4 = (DelayedStream) delayedTransport.newStream( | ||
method2, headers2, waitForReadyCallOptions, tracers); | ||
method2, headers2, waitForReadyCallOptions, tracers); | ||
wfr4.start(mock(ClientStreamListener.class)); | ||
PickSubchannelArgsMatcher wfr4args = new PickSubchannelArgsMatcher(method2, headers2, | ||
waitForReadyCallOptions); | ||
|
||
|
@@ -478,7 +521,8 @@ public void uncaughtException(Thread t, Throwable e) { | |
|
||
// New streams will use the last picker | ||
DelayedStream wfr5 = (DelayedStream) delayedTransport.newStream( | ||
method, headers, waitForReadyCallOptions, tracers); | ||
method, headers, waitForReadyCallOptions, tracers); | ||
wfr5.start(mock(ClientStreamListener.class)); | ||
assertNull(wfr5.getRealStream()); | ||
inOrder.verify(picker).pickSubchannel( | ||
eqPickSubchannelArgs(method, headers, waitForReadyCallOptions)); | ||
|
@@ -626,12 +670,14 @@ public PickResult answer(InvocationOnMock invocation) throws Throwable { | |
verify(picker, never()).pickSubchannel(any(PickSubchannelArgs.class)); | ||
|
||
Thread sideThread = new Thread("sideThread") { | ||
@Override | ||
public void run() { | ||
// Will call pickSubchannel and wait on barrier | ||
delayedTransport.newStream(method, headers, callOptions, tracers); | ||
} | ||
}; | ||
@Override | ||
public void run() { | ||
// Will call pick Subchannel and wait on barrier | ||
ClientStream clientStream = | ||
delayedTransport.newStream(method, headers, callOptions, tracers); | ||
clientStream.start(streamListener); | ||
} | ||
}; | ||
sideThread.start(); | ||
|
||
PickSubchannelArgsMatcher args = new PickSubchannelArgsMatcher(method, headers, callOptions); | ||
|
@@ -659,12 +705,14 @@ public void run() { | |
////////// Phase 2: reprocess() with a different picker | ||
// Create the second stream | ||
Thread sideThread2 = new Thread("sideThread2") { | ||
@Override | ||
public void run() { | ||
// Will call pickSubchannel and wait on barrier | ||
delayedTransport.newStream(method, headers2, callOptions, tracers); | ||
} | ||
}; | ||
@Override | ||
public void run() { | ||
// Will call pickSubchannel and wait on barrier | ||
ClientStream clientStream = delayedTransport | ||
.newStream(method, headers2, callOptions, tracers); | ||
clientStream.start(streamListener); | ||
} | ||
}; | ||
sideThread2.start(); | ||
// The second stream will see the first picker | ||
verify(picker, timeout(5000)).pickSubchannel(argThat(args2)); | ||
|
@@ -714,6 +762,7 @@ public void reprocess_addOptionalLabelCallsTracer() throws Exception { | |
} | ||
|
||
@Test | ||
@SuppressWarnings("DirectInvocationOnMock") | ||
public void newStream_racesWithReprocessIdleMode() throws Exception { | ||
SubchannelPicker picker = new SubchannelPicker() { | ||
@Override public PickResult pickSubchannel(PickSubchannelArgs args) { | ||
|
@@ -730,6 +779,7 @@ public void newStream_racesWithReprocessIdleMode() throws Exception { | |
ClientStream stream = delayedTransport.newStream( | ||
method, headers, callOptions, tracers); | ||
stream.start(streamListener); | ||
transportListener.transportInUse(true); | ||
assertTrue(delayedTransport.hasPendingStreams()); | ||
verify(transportListener).transportInUse(true); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to look into why this change was made. It looks like it delays registering the stream in pendingStreams to avoid createRealStream() from being called. We may need to double-check the transport state here, because there are state checks before
createPendingStream()
that are now out-of-date.