Skip to content

Neither OnCancelHandler nor onComplete gets invoked after half-close #5882

@TMilasius

Description

@TMilasius

What version of gRPC are you using?

1.21.0 (same applies to older versions as well)

Issue

1
After server sends completion event to the client, i.e., half-closes the call, client doesn't dispatch any other events (although request stream isn't closed/ completed).

2
In addition to that server OnCancelHandler doesn't get ever invoked, although client shuts down and closes connection.

Server:

public class TestServer {

    public static void main(String[] args) throws IOException, InterruptedException {
        ServerBuilder.forPort(12345).addService(new TestServiceGrpc.TestServiceImplBase() {

            @Override
            public StreamObserver<Message> bidiOperation(StreamObserver<Message> responseObserver) {
                ServerCallStreamObserver<Message> serverResponseObserver = (ServerCallStreamObserver<Message>) responseObserver;
                serverResponseObserver.setOnCancelHandler(() -> {
                    log.info("Stream cancelled."); // never invoked
                });
                return new StreamObserver<Message>() {

                    @Override
                    public void onNext(Message value) {
                        log.info("Server: received message.");
                        responseObserver.onNext(value);
                        responseObserver.onCompleted();
                    }

                    @Override
                    public void onError(Throwable t) {
                        log.info("Server: received an error.", t);
                    }

                    @Override
                    public void onCompleted() {
                        log.info("Server: received on completed."); // never invoked
                    }
                };
            }
        }).build().start().awaitTermination();
    }
}

Client:

public class TestClient {

    public static void main(String[] args) throws InterruptedException {
        Channel channel = ManagedChannelBuilder.forAddress("localhost", 12345).usePlaintext().build();
        CountDownLatch receivedOnCompleted = new CountDownLatch(1);
        TestServiceStub asynStub = TestServiceGrpc.newStub(channel);
        StreamObserver<Message> requestStream = asynStub.bidiOperation(new ClientResponseObserver<Message, Message>() {

            private ClientCallStreamObserver<Message> requestStream;

            @Override
            public void beforeStart(ClientCallStreamObserver<Message> requestStream) {
                this.requestStream = requestStream;
            }

            @Override
            public void onNext(Message value) {
                log.info("Client received message.");
            }

            @Override
            public void onError(Throwable t) {
                log.info("Client received error.", t);
                receivedOnCompleted.countDown();
            }

            @Override
            public void onCompleted() {
                log.info("Client received on completed.");
                requestStream.onNext(Message.getDefaultInstance()); // This message doesn't get processed by the server
                requestStream.onCompleted(); // Completion event also gets lost somewhere in the middle
                receivedOnCompleted.countDown();
            }
        });
        requestStream.onNext(Message.getDefaultInstance());
        receivedOnCompleted.await();
        Thread.sleep(10000); // give some time for client to complete sending events
        log.info("Client shutdown.");
    }
}

Client logs:

1298 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] OUTBOUND SETTINGS: ack=false settings={ENABLE_PUSH=0, MAX_CONCURRENT_STREAMS=0, INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=8192}
1323 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] OUTBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041
1539 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] INBOUND SETTINGS: ack=false settings={MAX_CONCURRENT_STREAMS=2147483647, INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=8192}
1542 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] OUTBOUND SETTINGS: ack=true
1545 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] INBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041
1552 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] INBOUND SETTINGS: ack=true
1581 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] OUTBOUND HEADERS: streamId=3 headers=GrpcHttp2OutboundHeaders[:authority: localhost:12345, :path: /TestService/BidiOperation, :method: POST, :scheme: http, content-type: application/grpc, te: trailers, user-agent: grpc-java-netty/1.21.0, grpc-accept-encoding: gzip] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false
1641 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] OUTBOUND DATA: streamId=3 padding=0 endStream=false length=5 bytes=0000000000
1737 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[:status: 200, content-type: application/grpc, grpc-encoding: identity, grpc-accept-encoding: gzip] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false
1763 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] INBOUND DATA: streamId=3 padding=0 endStream=false length=5 bytes=0000000000
1771 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[grpc-status: 0] streamDependency=0 weight=16 exclusive=false padding=0 endStream=true
1772 INFO  TestClient - Client received message.
1777 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] OUTBOUND RST_STREAM: streamId=3 errorCode=8
1781 INFO  TestClient - Client received on completed.
11783 INFO TestClient - Client shutdown.

Server logs:

9704 DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
9704 DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
9706 DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@21c719f5
9826 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] OUTBOUND SETTINGS: ack=false settings={MAX_CONCURRENT_STREAMS=2147483647, INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=8192}
9834 DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096
9834 DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
9835 DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
9835 DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
9849 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] OUTBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041
9865 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] INBOUND SETTINGS: ack=false settings={ENABLE_PUSH=0, MAX_CONCURRENT_STREAMS=0, INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=8192}
9869 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] OUTBOUND SETTINGS: ack=true
9872 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] INBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041
9875 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] INBOUND SETTINGS: ack=true
9930 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] INBOUND HEADERS: streamId=3 headers=GrpcHttp2RequestHeaders[:path: /TestService/BidiOperation, :authority: localhost:12345, :method: POST, :scheme: http, te: trailers, content-type: application/grpc, user-agent: grpc-java-netty/1.21.0, grpc-accept-encoding: gzip] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false
10018 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] INBOUND DATA: streamId=3 padding=0 endStream=false length=5 bytes=0000000000
10036 INFO  TestServer - Server: received message.
10046 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] OUTBOUND HEADERS: streamId=3 headers=GrpcHttp2OutboundHeaders[:status: 200, content-type: application/grpc, grpc-encoding: identity, grpc-accept-encoding: gzip] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false
10081 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] OUTBOUND DATA: streamId=3 padding=0 endStream=false length=5 bytes=0000000000
10082 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] OUTBOUND HEADERS: streamId=3 headers=GrpcHttp2OutboundHeaders[grpc-status: 0] streamDependency=0 weight=16 exclusive=false padding=0 endStream=true
10099 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] INBOUND RST_STREAM: streamId=3 errorCode=8
20528 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] OUTBOUND GO_AWAY: lastStreamId=3 errorCode=2 length=61 bytes=416e206578697374696e6720636f6e6e656374696f6e2077617320666f726369626c7920636c6f736564206279207468652072656d6f746520686f7374
20533 DEBUG io.netty.handler.codec.http2.Http2ConnectionHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 ! R:/127.0.0.1:61183] Sending GOAWAY failed: lastStreamId '3', errorCode '2', debugData 'An existing connection was forcibly closed by the remote host'. Forcing shutdown of the connection.
java.io.IOException: An existing connection was forcibly closed by the remote host
	at sun.nio.ch.SocketDispatcher.writev0(Native Method)
	at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:55)
	at sun.nio.ch.IOUtil.write(IOUtil.java:148)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
	at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:420)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:939)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:906)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1370)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
	at io.netty.handler.codec.http2.Http2ConnectionHandler.onError(Http2ConnectionHandler.java:629)
	at io.grpc.netty.AbstractNettyHandler.exceptionCaught(AbstractNettyHandler.java:81)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:282)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:261)
	at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1375)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:282)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:261)
	at io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:918)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:745)

What did you expect to see?

1
Client should keep on sending events to server until it closes/ completes request stream, independently if server closed response stream (half-closed call). As of now client is not able to reply to server upon response stream completion.

2
Servers OnCancelHandler must be invoked in either case, independently if call has been already half-closed or not and if connection has been forcibly shutdown or not. It should be possible to free-up resources like database connections in OnCancelHandler, but as off now it doesn't seem that there is a single place where we could do this.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions