Skip to content

Commit a6abb1b

Browse files
authored
stub: add ServerCallStreamObserver.setOnCloseHandler(...) (#8452)
This allows for user code to be notified when the messages are actually put on the wire and the stream is closed. Fixes #5895
1 parent 29d238a commit a6abb1b

File tree

3 files changed

+119
-0
lines changed

3 files changed

+119
-0
lines changed

stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.grpc.stub;
1818

19+
import io.grpc.ExperimentalApi;
20+
1921
/**
2022
* A refinement of {@link CallStreamObserver} to allows for interaction with call
2123
* cancellation events on the server side. An instance of this class is obtained by casting the
@@ -145,4 +147,26 @@ public void disableAutoRequest() {
145147
*/
146148
@Override
147149
public abstract void setMessageCompression(boolean enable);
150+
151+
/**
152+
* Sets a {@link Runnable} to be executed when the call is closed cleanly from the server's
153+
* point of view: either {@link #onCompleted()} or {@link #onError(Throwable)} has been called,
154+
* all the messages and trailing metadata have been sent and the stream has been closed. Note
155+
* however that the client still may have not received all the messages due to network delay,
156+
* client crashes, and cancellation races.
157+
*
158+
* <p>Exactly one of {@code onCloseHandler} and {@code onCancelHandler} is guaranteed to be called
159+
* when the RPC terminates.</p>
160+
*
161+
* <p>It is guaranteed that execution of {@code onCloseHandler} is serialized with calls to
162+
* the 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if
163+
* other callbacks are running.</p>
164+
*
165+
* <p>This method may only be called during the initial call to the application, before the
166+
* service returns its {@link StreamObserver request observer}.</p>
167+
*
168+
* @param onCloseHandler to execute when the call has been closed cleanly.
169+
*/
170+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8467")
171+
public abstract void setOnCloseHandler(Runnable onCloseHandler);
148172
}

stub/src/main/java/io/grpc/stub/ServerCalls.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,13 @@ public void onReady() {
206206
responseObserver.onReadyHandler.run();
207207
}
208208
}
209+
210+
@Override
211+
public void onComplete() {
212+
if (responseObserver.onCloseHandler != null) {
213+
responseObserver.onCloseHandler.run();
214+
}
215+
}
209216
}
210217
}
211218

@@ -291,6 +298,13 @@ public void onReady() {
291298
responseObserver.onReadyHandler.run();
292299
}
293300
}
301+
302+
@Override
303+
public void onComplete() {
304+
if (responseObserver.onCloseHandler != null) {
305+
responseObserver.onCloseHandler.run();
306+
}
307+
}
294308
}
295309
}
296310

@@ -320,6 +334,7 @@ private static final class ServerCallStreamObserverImpl<ReqT, RespT>
320334
private Runnable onCancelHandler;
321335
private boolean aborted = false;
322336
private boolean completed = false;
337+
private Runnable onCloseHandler;
323338

324339
// Non private to avoid synthetic class
325340
ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call, boolean serverStreamingOrBidi) {
@@ -423,6 +438,14 @@ public void disableAutoRequest() {
423438
public void request(int count) {
424439
call.request(count);
425440
}
441+
442+
@Override
443+
public void setOnCloseHandler(Runnable onCloseHandler) {
444+
checkState(!frozen, "Cannot alter onCloseHandler after initialization. May only be called "
445+
+ "during the initial call to the application, before the service returns its "
446+
+ "StreamObserver");
447+
this.onCloseHandler = onCloseHandler;
448+
}
426449
}
427450

428451
/**

stub/src/test/java/io/grpc/stub/ServerCallsTest.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,53 @@ public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver)
199199
callObserver.get().onCompleted();
200200
}
201201

202+
@Test
203+
public void onCloseHandlerCalledIfSetInStreamingClientCall() throws Exception {
204+
final AtomicBoolean onCloseHandlerCalled = new AtomicBoolean();
205+
ServerCallHandler<Integer, Integer> callHandler = ServerCalls.asyncBidiStreamingCall(
206+
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
207+
@Override
208+
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
209+
ServerCallStreamObserver<Integer> serverCallObserver =
210+
(ServerCallStreamObserver<Integer>) responseObserver;
211+
serverCallObserver.setOnCloseHandler(new Runnable() {
212+
@Override
213+
public void run() {
214+
onCloseHandlerCalled.set(true);
215+
}
216+
});
217+
return new ServerCalls.NoopStreamObserver<>();
218+
}
219+
});
220+
ServerCall.Listener<Integer> callListener = callHandler.startCall(serverCall, new Metadata());
221+
callListener.onComplete();
222+
assertTrue(onCloseHandlerCalled.get());
223+
}
224+
225+
@Test
226+
public void onCloseHandlerCalledIfSetInUnaryClientCall() throws Exception {
227+
final AtomicBoolean onCloseHandlerCalled = new AtomicBoolean();
228+
ServerCallHandler<Integer, Integer> callHandler = ServerCalls.asyncServerStreamingCall(
229+
new ServerCalls.ServerStreamingMethod<Integer, Integer>() {
230+
@Override
231+
public void invoke(Integer request, StreamObserver<Integer> responseObserver) {
232+
ServerCallStreamObserver<Integer> serverCallObserver =
233+
(ServerCallStreamObserver<Integer>) responseObserver;
234+
serverCallObserver.setOnCloseHandler(new Runnable() {
235+
@Override
236+
public void run() {
237+
onCloseHandlerCalled.set(true);
238+
}
239+
});
240+
}
241+
});
242+
ServerCall.Listener<Integer> callListener = callHandler.startCall(serverCall, new Metadata());
243+
callListener.onMessage(0);
244+
callListener.onHalfClose();
245+
callListener.onComplete();
246+
assertTrue(onCloseHandlerCalled.get());
247+
}
248+
202249
@Test
203250
public void cannotSetOnCancelHandlerAfterServiceInvocation() throws Exception {
204251
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
@@ -255,6 +302,31 @@ public void run() {
255302
}
256303
}
257304

305+
@Test
306+
public void cannotSetOnCloseHandlerAfterServiceInvocation() throws Exception {
307+
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver = new AtomicReference<>();
308+
ServerCallHandler<Integer, Integer> callHandler = ServerCalls.asyncBidiStreamingCall(
309+
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
310+
@Override
311+
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
312+
callObserver.set((ServerCallStreamObserver<Integer>) responseObserver);
313+
return new ServerCalls.NoopStreamObserver<>();
314+
}
315+
});
316+
ServerCall.Listener<Integer> callListener = callHandler.startCall(serverCall, new Metadata());
317+
callListener.onMessage(1);
318+
try {
319+
callObserver.get().setOnCloseHandler(new Runnable() {
320+
@Override
321+
public void run() {
322+
}
323+
});
324+
fail("Cannot set onReady after service invocation");
325+
} catch (IllegalStateException expected) {
326+
// Expected
327+
}
328+
}
329+
258330
@Test
259331
public void cannotDisableAutoRequestAfterServiceInvocation() throws Exception {
260332
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =

0 commit comments

Comments
 (0)