Skip to content

grpc: Fix cardinality violations in client streaming RPCs #8278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from

Conversation

Pranjali-2501
Copy link
Contributor

@Pranjali-2501 Pranjali-2501 commented Apr 29, 2025

Partially addresses: #7286

Client should ensure an "Internal" error is returned for client-streaming RPCs if the server doesn't send a message before returning status OK.
Currently it is returning "EOF", which should not be the case for cardinality violations.

RELEASE NOTES:

  • grpc: return status code INTERNAL when servers send 0 response messages in unary and client streaming RPCs.

Copy link

linux-foundation-easycla bot commented Apr 29, 2025

CLA Signed

The committers listed above are authorized under a signed CLA.

Copy link

codecov bot commented Apr 29, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 82.25%. Comparing base (399e2d0) to head (a21fcc4).
Report is 41 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #8278      +/-   ##
==========================================
- Coverage   82.29%   82.25%   -0.05%     
==========================================
  Files         417      419       +2     
  Lines       41356    42053     +697     
==========================================
+ Hits        34035    34590     +555     
- Misses       5907     5996      +89     
- Partials     1414     1467      +53     
Files with missing lines Coverage Δ
stream.go 82.75% <100.00%> (+1.20%) ⬆️

... and 55 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

stream.go Outdated
return io.EOF // indicates successful end of stream.
}

return toRPCErr(err)
}
if cs.desc.ClientStreams {
cs.recvMsg = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set this boolean without the check? Otherwise we would need to document the variable in a way to make it obvious that this is set only for client streaming RPCs. Even then users may accidentally use it for other types of RPCs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to !cs.desc.ServerStreams, so that it will mark cs.recvFirstMsg=true for both unary and clientStreaming RPCs.

@arjan-bal
Copy link
Contributor

Do we need a similar check on the server side also, i.e. change the status to internal if the server attempts to close without writing a message?

@dfawley dfawley assigned dfawley and unassigned Pranjali-2501 May 12, 2025
stream.go Outdated
@@ -1134,11 +1136,17 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
if statusErr := a.transportStream.Status().Err(); statusErr != nil {
return statusErr
}
if !cs.desc.ServerStreams && !cs.recvFirstMsg {
return status.Errorf(codes.Internal, "client streaming cardinality violation")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's be more specific about what happened here so that a user could understand what's happening when they see the error.

"received no response message from non-streaming RPC"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a change here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return io.EOF // indicates successful end of stream.
}

return toRPCErr(err)
}
if !cs.desc.ServerStreams {
cs.recvFirstMsg = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there also be something here that fails if recvFirstMsg is already set? (That would indicate we received multiple messages in a unary RPC.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For both unary and client-streaming RPCs, client.recvmsg() will wait for the trailers after receiving its first msg over here .

I'll change the status code here from UNKNOWN to INTERNAL in upcoming PR with tests. It will handle the case when trailers are not send from server side for unary and client-streaming RPCs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So why do you need this new field, then?

If the above recv ever returns io.EOF, then we already know there was never a message read from the stream. Either that, or the user called Recv after receiving an error from a previous call to Recv, which is illegal and undefined.

Copy link
Contributor Author

@Pranjali-2501 Pranjali-2501 May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, make sense.
So for non-server streaming rpcs, we can directly return INTERNAL error if the first recv() call gets EOF.

if !cs.desc.ServerStreams {
	return status.Errorf(codes.Internal, "client streaming cardinality violation")
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that should work and avoids adding new state to track.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping - I think we can remove recvFirstMsg now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had reverted that in this commit bbd8366.
But than @arjan-bal pointed out that in case of successful RPC, if user call RecvMsg() twice, the second RecvMsg() call should return with io.EOF instead of cardinality violation.
That's why I reverted back to the older code to fulfill that condition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned before, it's not defined what happens if RecvMsg is called after returning an error.

There are other scenarios where calling recvMsg repeatedly will not cause the previously returned error to occur. E.g. if a received message is too large, recv will return an error. But I believe a subsequent call to RecvMsg would actually return the next message from the stream, or io.EOF at the end of the stream.

@dfawley dfawley assigned Pranjali-2501 and unassigned dfawley May 12, 2025
@Pranjali-2501 Pranjali-2501 requested a review from dfawley May 15, 2025 17:14
@dfawley dfawley assigned dfawley and unassigned Pranjali-2501 May 16, 2025
stream.go Outdated
@@ -1134,11 +1136,17 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
if statusErr := a.transportStream.Status().Err(); statusErr != nil {
return statusErr
}
if !cs.desc.ServerStreams && !cs.recvFirstMsg {
return status.Errorf(codes.Internal, "client streaming cardinality violation")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a change here.

return
}
defer conn.Close()
framer := http2.NewFramer(conn, conn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realistically this test shouldn't need low level framer access. You should be able to implement it with a grpc.Server that has its EmptyCall handler configured as a streaming handling, and with an implementation that just returns nil immediately without sending a message.

I do think that would be preferable, because the framing stuff is tedious and error prone, and it makes the test much more complex. If you're having too much trouble with that approach, let me know and I can try to help, or if it somehow turns out to be a lot more work than I was expecting, feel free to push back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I attempted to resolve this by returning nil from EmptyCall. However, this approach resulted in an empty, successful response rather than a failure, which wasn't the desired outcome.

The scenario that sends no response cannot be replicated within grpc-go itself, as the framework requires a response to be sent.

This test aims to verify behavior in a cross-language context, where a unary client might interact with a unary server implemented in a different language, potentially allowing the server to send no response.

Please let me know if that is not the case or I overlook something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @dfawley is saying that we can register a streaming handler for a unary method on the server. The client will call EmptyCall, but the server will call a streaming handler.

	srv := grpc.NewServer()
	serviceDesc := grpc.ServiceDesc{
		ServiceName: "grpc.testing.TestService",
		HandlerType: (*any)(nil),
		Methods:     []grpc.MethodDesc{},
		Streams: []grpc.StreamDesc{
			{
				StreamName: "EmptyCall",
				Handler: func(any, grpc.ServerStream) error {
					return nil
				},
				ClientStreams: true,
			},
		},
		Metadata: "grpc/testing/test.proto",
	}
	srv.RegisterService(&serviceDesc, &testServer{})
	go srv.Serve(lis)
	defer srv.Stop()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying. I had made the changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option (maybe easier, but no need to change now) is to use the unknown service handler, and don't register anything on the server: https://pkg.go.dev/google.golang.org/grpc#UnknownServiceHandler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnknownServiceHandler uses bidirectional streaming[desc.clientstreams:true, desc.serverstreams:true].
Will it work with the changes I made to addresses cardinality violations?

if !cs.desc.ServerStreams && !cs.recvFirstMsg {
  return status.Errorf(codes.Internal, "cardinality violation: received no response message from non-streaming RPC")
}

@dfawley dfawley assigned Pranjali-2501 and unassigned dfawley May 20, 2025
@Pranjali-2501 Pranjali-2501 removed their assignment May 26, 2025
stream.go Outdated
@@ -1134,11 +1136,17 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
if statusErr := a.transportStream.Status().Err(); statusErr != nil {
return statusErr
}
if !cs.desc.ServerStreams && !cs.recvFirstMsg {
return status.Errorf(codes.Internal, "client streaming cardinality violation")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream.go Outdated
@@ -1134,6 +1134,10 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
if statusErr := a.transportStream.Status().Err(); statusErr != nil {
return statusErr
}
// received no msg and status ok for non-server streaming rpcs.
if !cs.desc.ServerStreams {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a user calls RecvMsg twice for a unary RPC and the server sends 1 response message, I believe they should receive io.EOF for the second .RecvMsg call. Is this still the case or will they receive a cardinality violation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case user will receive cardinality violation. This change is made in #8330 .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the expected behaviour is to keep returning the same error with which the RPC ended. That seems to be the intention of lines just above this.

if statusErr := a.transportStream.Status().Err(); statusErr != nil {
	return statusErr
}

So if the RPC was successful, the caller should keep getting io.EOF on successive calls to RecvMsg(). This change is breaking that expectation. The following code asserts this behaviour:

	stream, err := cc.NewStream(ctx, &grpc.StreamDesc{
		ServerStreams: false,
		ClientStreams: false,
	}, testgrpc.TestService_EmptyCall_FullMethodName)

	if err != nil {
		t.Fatal(err)
	}

	if err := stream.SendMsg(&testpb.Empty{}); err != nil {
		t.Fatal(err)
	}

	if err := stream.RecvMsg(&testpb.Empty{}); err != nil {
		t.Fatal(err)
	}

        // Returning internal error now.
	if err := stream.RecvMsg(&testpb.Empty{}); err != io.EOF {
		t.Fatal(err)
	}

Copy link
Contributor Author

@Pranjali-2501 Pranjali-2501 May 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are expecting second RecvMsg() to get io.EOF for successful non-server streaming RPCs, I can go back to the previous implementation by having a variable recvFirstmsg and set it to true in first RecvMsg() call.

And only return INTERNAL error if EOF is received in first RecvMsg() call.

if !cs.desc.ServerStreams && !recvFirstMsg {
				return status.Errorf(codes.Internal, "cardinality violation: received no response message from non-streaming RPC")
			}

Let me know your thoughts on this.

Copy link
Contributor

@arjan-bal arjan-bal May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what the above comment from Doug mentioned was for returning an error when calling recv returns a message successfully. gRPC should probably return a cardinality violation in that case.

grpc-go/stream.go

Line 1136 in 4cab0e6

if err := recv(a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decompressorV1, false); err != nil {

We should discuss this in the team meeting.

Copy link
Contributor Author

@Pranjali-2501 Pranjali-2501 May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-streaming RPCs:

Server can't send more than one msg, as mentioned in this comment.

For successful RPC, Client can call RecvMsg() twice. Second RecvMsg() call will return with a io.EOF and close the stream.

It will never be the case for non-streaming RPCs, that recvFirstMsg is already set.

@arjan-bal arjan-bal assigned Pranjali-2501 and unassigned arjan-bal May 26, 2025
stream.go Outdated
@@ -1134,6 +1134,10 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
if statusErr := a.transportStream.Status().Err(); statusErr != nil {
return statusErr
}
// received no msg and status ok for non-server streaming rpcs.
if !cs.desc.ServerStreams {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a similar method on addrConnStream that also needs to be updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll made the similar changes once the changes made in client.RecvMsg() gets finalized.

@Pranjali-2501 Pranjali-2501 requested a review from arjan-bal May 27, 2025 07:51
@dfawley dfawley assigned Pranjali-2501 and unassigned dfawley May 27, 2025
@arjan-bal arjan-bal changed the title Cardinality violations in Client streaming RPC. grpc: Fix cardinality violations in client streaming RPCs May 28, 2025
@dfawley dfawley assigned Pranjali-2501 and unassigned dfawley May 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants