From 3ba457ecf008471c84a141fbf33ef2fae5d16035 Mon Sep 17 00:00:00 2001 From: Vinothkumar Date: Mon, 19 May 2025 09:10:25 +0000 Subject: [PATCH 1/7] Fixed retry attempts in HandleRPC --- stats/opentelemetry/client_tracing.go | 1 + stats/opentelemetry/opentelemetry.go | 1 + stats/opentelemetry/server_tracing.go | 1 + stats/opentelemetry/trace.go | 7 +++++++ stream.go | 3 +++ 5 files changed, 13 insertions(+) diff --git a/stats/opentelemetry/client_tracing.go b/stats/opentelemetry/client_tracing.go index 868d6a2fc9c1..fbe9a7c5e254 100644 --- a/stats/opentelemetry/client_tracing.go +++ b/stats/opentelemetry/client_tracing.go @@ -121,6 +121,7 @@ func (h *clientTracingHandler) HandleConn(context.Context, stats.ConnStats) {} // TagRPC implements per RPC attempt context management for traces. func (h *clientTracingHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { ctx, ai := getOrCreateRPCAttemptInfo(ctx) + ai.ctx = ctx ctx, ai = h.traceTagRPC(ctx, ai, info.NameResolutionDelay) return setRPCInfo(ctx, &rpcInfo{ai: ai}) } diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index cd01f86c4981..c73f2ee4aa2d 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -242,6 +242,7 @@ type attemptInfo struct { countSentMsg uint32 countRecvMsg uint32 previousRPCAttempts uint32 + ctx context.Context } type clientMetrics struct { diff --git a/stats/opentelemetry/server_tracing.go b/stats/opentelemetry/server_tracing.go index 0e2181bf114c..d87785082b8d 100644 --- a/stats/opentelemetry/server_tracing.go +++ b/stats/opentelemetry/server_tracing.go @@ -41,6 +41,7 @@ func (h *serverTracingHandler) initializeTraces() { // TagRPC implements per RPC attempt context management for traces. func (h *serverTracingHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { ctx, ai := getOrCreateRPCAttemptInfo(ctx) + ai.ctx = ctx ctx, ai = h.traceTagRPC(ctx, ai) return setRPCInfo(ctx, &rpcInfo{ai: ai}) } diff --git a/stats/opentelemetry/trace.go b/stats/opentelemetry/trace.go index efafdd0756eb..203f57613469 100644 --- a/stats/opentelemetry/trace.go +++ b/stats/opentelemetry/trace.go @@ -26,6 +26,8 @@ import ( "google.golang.org/grpc/status" ) +type clientStreamKey struct{} + // populateSpan populates span information based on stats passed in, representing // invariants of the RPC lifecycle. It ends the span, triggering its export. // This function handles attempt spans on the client-side and call spans on the @@ -50,6 +52,11 @@ func populateSpan(rs stats.RPCStats, ai *attemptInfo) { attribute.Int64("previous-rpc-attempts", int64(ai.previousRPCAttempts)), attribute.Bool("transparent-retry", rs.IsTransparentRetryAttempt), ) + if !rs.IsTransparentRetryAttempt { + if retries, ok := ai.ctx.Value(clientStreamKey{}).(int); ok { + span.SetAttributes(attribute.Int("grpc.previous-rpc-attempts", retries)) + } + } // increment previous rpc attempts applicable for next attempt atomic.AddUint32(&ai.previousRPCAttempts, 1) case *stats.PickerUpdated: diff --git a/stream.go b/stream.go index d58bb6471a8a..c1f3d31a450f 100644 --- a/stream.go +++ b/stream.go @@ -406,6 +406,8 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client return cs, nil } +type clientStreamKey struct{} + // newAttemptLocked creates a new csAttempt without a transport or stream. func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) { if err := cs.ctx.Err(); err != nil { @@ -430,6 +432,7 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) IsServerStream: cs.desc.ServerStreams, IsTransparentRetryAttempt: isTransparent, } + ctx = context.WithValue(ctx, clientStreamKey{}, cs.numRetries) sh.HandleRPC(ctx, begin) } From 5d197791ab27c3a72df90637e9b0943e8fa08082 Mon Sep 17 00:00:00 2001 From: Vinothkumar Date: Tue, 20 May 2025 06:55:17 +0000 Subject: [PATCH 2/7] Fixed the review changes --- stats/opentelemetry/client_tracing.go | 6 +++- stats/opentelemetry/e2e_test.go | 52 ++++++++++++++++++++++++++- stats/opentelemetry/opentelemetry.go | 2 +- stats/opentelemetry/server_tracing.go | 1 - stats/opentelemetry/trace.go | 9 +++-- stream.go | 3 -- 6 files changed, 61 insertions(+), 12 deletions(-) diff --git a/stats/opentelemetry/client_tracing.go b/stats/opentelemetry/client_tracing.go index fbe9a7c5e254..59376a6d7f3f 100644 --- a/stats/opentelemetry/client_tracing.go +++ b/stats/opentelemetry/client_tracing.go @@ -20,6 +20,7 @@ import ( "context" "log" "strings" + "sync/atomic" otelcodes "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -121,7 +122,10 @@ func (h *clientTracingHandler) HandleConn(context.Context, stats.ConnStats) {} // TagRPC implements per RPC attempt context management for traces. func (h *clientTracingHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { ctx, ai := getOrCreateRPCAttemptInfo(ctx) - ai.ctx = ctx + if ai.previousRPCAttempts > 0 { + atomic.AddUint32(&ai.explicitRetryCount, 1) + } + atomic.AddUint32(&ai.previousRPCAttempts, 1) ctx, ai = h.traceTagRPC(ctx, ai, info.NameResolutionDelay) return setRPCInfo(ctx, &rpcInfo{ai: ai}) } diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 4dbaadb2078e..befdec677b92 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -872,6 +872,10 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(0), }, + { + Key: "retry-attempts", + Value: attribute.IntValue(0), + }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -928,6 +932,10 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { }, { Key: "previous-rpc-attempts", + Value: attribute.IntValue(1), + }, + { + Key: "retry-attempts", Value: attribute.IntValue(0), }, { @@ -994,6 +1002,10 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(0), }, + { + Key: "retry-attempts", + Value: attribute.IntValue(0), + }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -1021,6 +1033,10 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { }, { Key: "previous-rpc-attempts", + Value: attribute.IntValue(1), + }, + { + Key: "retry-attempts", Value: attribute.IntValue(0), }, { @@ -1096,6 +1112,10 @@ func (s) TestSpan(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(0), }, + { + Key: "retry-attempts", + Value: attribute.IntValue(0), + }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -1144,6 +1164,10 @@ func (s) TestSpan(t *testing.T) { }, { Key: "previous-rpc-attempts", + Value: attribute.IntValue(1), + }, + { + Key: "retry-attempts", Value: attribute.IntValue(0), }, { @@ -1202,6 +1226,10 @@ func (s) TestSpan(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(0), }, + { + Key: "retry-attempts", + Value: attribute.IntValue(0), + }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -1229,6 +1257,10 @@ func (s) TestSpan(t *testing.T) { }, { Key: "previous-rpc-attempts", + Value: attribute.IntValue(1), + }, + { + Key: "retry-attempts", Value: attribute.IntValue(0), }, { @@ -1306,6 +1338,10 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(0), }, + { + Key: "retry-attempts", + Value: attribute.IntValue(0), + }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -1354,6 +1390,10 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { }, { Key: "previous-rpc-attempts", + Value: attribute.IntValue(1), + }, + { + Key: "retry-attempts", Value: attribute.IntValue(0), }, { @@ -1412,6 +1452,10 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(0), }, + { + Key: "retry-attempts", + Value: attribute.IntValue(0), + }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -1439,6 +1483,10 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { }, { Key: "previous-rpc-attempts", + Value: attribute.IntValue(1), + }, + { + Key: "retry-attempts", Value: attribute.IntValue(0), }, { @@ -1767,6 +1815,7 @@ func (s) TestStreamingRPC_TraceSequenceNumbers(t *testing.T) { attribute.Bool("Client", false), attribute.Bool("FailFast", false), attribute.Int("previous-rpc-attempts", 0), + attribute.Int("retry-attempts", 0), attribute.Bool("transparent-retry", false), }, }, @@ -1777,7 +1826,8 @@ func (s) TestStreamingRPC_TraceSequenceNumbers(t *testing.T) { attributes: []attribute.KeyValue{ attribute.Bool("Client", true), attribute.Bool("FailFast", true), - attribute.Int("previous-rpc-attempts", 0), + attribute.Int("previous-rpc-attempts", 1), + attribute.Int("retry-attempts", 0), attribute.Bool("transparent-retry", false), }, }, diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index c73f2ee4aa2d..aa0e826b7d96 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -242,7 +242,7 @@ type attemptInfo struct { countSentMsg uint32 countRecvMsg uint32 previousRPCAttempts uint32 - ctx context.Context + explicitRetryCount uint32 } type clientMetrics struct { diff --git a/stats/opentelemetry/server_tracing.go b/stats/opentelemetry/server_tracing.go index d87785082b8d..0e2181bf114c 100644 --- a/stats/opentelemetry/server_tracing.go +++ b/stats/opentelemetry/server_tracing.go @@ -41,7 +41,6 @@ func (h *serverTracingHandler) initializeTraces() { // TagRPC implements per RPC attempt context management for traces. func (h *serverTracingHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { ctx, ai := getOrCreateRPCAttemptInfo(ctx) - ai.ctx = ctx ctx, ai = h.traceTagRPC(ctx, ai) return setRPCInfo(ctx, &rpcInfo{ai: ai}) } diff --git a/stats/opentelemetry/trace.go b/stats/opentelemetry/trace.go index 203f57613469..9841378d28d3 100644 --- a/stats/opentelemetry/trace.go +++ b/stats/opentelemetry/trace.go @@ -43,6 +43,9 @@ func populateSpan(rs stats.RPCStats, ai *attemptInfo) { switch rs := rs.(type) { case *stats.Begin: + if rs.IsTransparentRetryAttempt { + atomic.AddUint32(&ai.explicitRetryCount, ^uint32(0)) + } // Note: Go always added Client and FailFast attributes even though they are not // defined by the OpenCensus gRPC spec. Thus, they are unimportant for // correctness. @@ -50,13 +53,9 @@ func populateSpan(rs stats.RPCStats, ai *attemptInfo) { attribute.Bool("Client", rs.Client), attribute.Bool("FailFast", rs.FailFast), attribute.Int64("previous-rpc-attempts", int64(ai.previousRPCAttempts)), + attribute.Int64("retry-attempts", int64(atomic.LoadUint32(&ai.explicitRetryCount))), attribute.Bool("transparent-retry", rs.IsTransparentRetryAttempt), ) - if !rs.IsTransparentRetryAttempt { - if retries, ok := ai.ctx.Value(clientStreamKey{}).(int); ok { - span.SetAttributes(attribute.Int("grpc.previous-rpc-attempts", retries)) - } - } // increment previous rpc attempts applicable for next attempt atomic.AddUint32(&ai.previousRPCAttempts, 1) case *stats.PickerUpdated: diff --git a/stream.go b/stream.go index c1f3d31a450f..d58bb6471a8a 100644 --- a/stream.go +++ b/stream.go @@ -406,8 +406,6 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client return cs, nil } -type clientStreamKey struct{} - // newAttemptLocked creates a new csAttempt without a transport or stream. func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) { if err := cs.ctx.Err(); err != nil { @@ -432,7 +430,6 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) IsServerStream: cs.desc.ServerStreams, IsTransparentRetryAttempt: isTransparent, } - ctx = context.WithValue(ctx, clientStreamKey{}, cs.numRetries) sh.HandleRPC(ctx, begin) } From 42459505c70e49c51b56797e8c62c82c0e0e19e8 Mon Sep 17 00:00:00 2001 From: Vinothkumar Date: Tue, 20 May 2025 07:02:25 +0000 Subject: [PATCH 3/7] Fixed vet issues --- stats/opentelemetry/trace.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/stats/opentelemetry/trace.go b/stats/opentelemetry/trace.go index 9841378d28d3..144a902d4bf9 100644 --- a/stats/opentelemetry/trace.go +++ b/stats/opentelemetry/trace.go @@ -26,8 +26,6 @@ import ( "google.golang.org/grpc/status" ) -type clientStreamKey struct{} - // populateSpan populates span information based on stats passed in, representing // invariants of the RPC lifecycle. It ends the span, triggering its export. // This function handles attempt spans on the client-side and call spans on the From 5347db12b07d8bda77442558925aada3d3f08de7 Mon Sep 17 00:00:00 2001 From: Vinothkumar Date: Wed, 21 May 2025 05:58:53 +0000 Subject: [PATCH 4/7] Fixed the review changes --- stats/opentelemetry/client_tracing.go | 13 +++++-- stats/opentelemetry/e2e_test.go | 50 --------------------------- stats/opentelemetry/opentelemetry.go | 2 +- stats/opentelemetry/server_tracing.go | 1 + stats/opentelemetry/trace.go | 11 +++--- 5 files changed, 19 insertions(+), 58 deletions(-) diff --git a/stats/opentelemetry/client_tracing.go b/stats/opentelemetry/client_tracing.go index 59376a6d7f3f..7df423645a44 100644 --- a/stats/opentelemetry/client_tracing.go +++ b/stats/opentelemetry/client_tracing.go @@ -119,13 +119,20 @@ func (h *clientTracingHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo // HandleConn exists to satisfy stats.Handler for tracing. func (h *clientTracingHandler) HandleConn(context.Context, stats.ConnStats) {} +type retryCountKey struct{} + // TagRPC implements per RPC attempt context management for traces. func (h *clientTracingHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { ctx, ai := getOrCreateRPCAttemptInfo(ctx) - if ai.previousRPCAttempts > 0 { - atomic.AddUint32(&ai.explicitRetryCount, 1) + var counter *int32 + if val := ctx.Value(retryCountKey{}); val != nil { + counter = val.(*int32) + } else { + counter = new(int32) + ctx = context.WithValue(ctx, retryCountKey{}, counter) } - atomic.AddUint32(&ai.previousRPCAttempts, 1) + ai.previousRPCAttempts = uint32(atomic.LoadInt32(counter)) + ai.ctx = ctx ctx, ai = h.traceTagRPC(ctx, ai, info.NameResolutionDelay) return setRPCInfo(ctx, &rpcInfo{ai: ai}) } diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index befdec677b92..426f7601ca5a 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -872,10 +872,6 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(0), }, - { - Key: "retry-attempts", - Value: attribute.IntValue(0), - }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -934,10 +930,6 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(1), }, - { - Key: "retry-attempts", - Value: attribute.IntValue(0), - }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -1002,10 +994,6 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(0), }, - { - Key: "retry-attempts", - Value: attribute.IntValue(0), - }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -1035,10 +1023,6 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(1), }, - { - Key: "retry-attempts", - Value: attribute.IntValue(0), - }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -1112,10 +1096,6 @@ func (s) TestSpan(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(0), }, - { - Key: "retry-attempts", - Value: attribute.IntValue(0), - }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -1166,10 +1146,6 @@ func (s) TestSpan(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(1), }, - { - Key: "retry-attempts", - Value: attribute.IntValue(0), - }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -1226,10 +1202,6 @@ func (s) TestSpan(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(0), }, - { - Key: "retry-attempts", - Value: attribute.IntValue(0), - }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -1259,10 +1231,6 @@ func (s) TestSpan(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(1), }, - { - Key: "retry-attempts", - Value: attribute.IntValue(0), - }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -1338,10 +1306,6 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(0), }, - { - Key: "retry-attempts", - Value: attribute.IntValue(0), - }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -1392,10 +1356,6 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(1), }, - { - Key: "retry-attempts", - Value: attribute.IntValue(0), - }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -1452,10 +1412,6 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(0), }, - { - Key: "retry-attempts", - Value: attribute.IntValue(0), - }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -1485,10 +1441,6 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { Key: "previous-rpc-attempts", Value: attribute.IntValue(1), }, - { - Key: "retry-attempts", - Value: attribute.IntValue(0), - }, { Key: "transparent-retry", Value: attribute.BoolValue(false), @@ -1815,7 +1767,6 @@ func (s) TestStreamingRPC_TraceSequenceNumbers(t *testing.T) { attribute.Bool("Client", false), attribute.Bool("FailFast", false), attribute.Int("previous-rpc-attempts", 0), - attribute.Int("retry-attempts", 0), attribute.Bool("transparent-retry", false), }, }, @@ -1827,7 +1778,6 @@ func (s) TestStreamingRPC_TraceSequenceNumbers(t *testing.T) { attribute.Bool("Client", true), attribute.Bool("FailFast", true), attribute.Int("previous-rpc-attempts", 1), - attribute.Int("retry-attempts", 0), attribute.Bool("transparent-retry", false), }, }, diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index aa0e826b7d96..c73f2ee4aa2d 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -242,7 +242,7 @@ type attemptInfo struct { countSentMsg uint32 countRecvMsg uint32 previousRPCAttempts uint32 - explicitRetryCount uint32 + ctx context.Context } type clientMetrics struct { diff --git a/stats/opentelemetry/server_tracing.go b/stats/opentelemetry/server_tracing.go index 0e2181bf114c..d87785082b8d 100644 --- a/stats/opentelemetry/server_tracing.go +++ b/stats/opentelemetry/server_tracing.go @@ -41,6 +41,7 @@ func (h *serverTracingHandler) initializeTraces() { // TagRPC implements per RPC attempt context management for traces. func (h *serverTracingHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { ctx, ai := getOrCreateRPCAttemptInfo(ctx) + ai.ctx = ctx ctx, ai = h.traceTagRPC(ctx, ai) return setRPCInfo(ctx, &rpcInfo{ai: ai}) } diff --git a/stats/opentelemetry/trace.go b/stats/opentelemetry/trace.go index 144a902d4bf9..dc58d0afba7d 100644 --- a/stats/opentelemetry/trace.go +++ b/stats/opentelemetry/trace.go @@ -41,8 +41,12 @@ func populateSpan(rs stats.RPCStats, ai *attemptInfo) { switch rs := rs.(type) { case *stats.Begin: - if rs.IsTransparentRetryAttempt { - atomic.AddUint32(&ai.explicitRetryCount, ^uint32(0)) + retryCount := ai.previousRPCAttempts + if !rs.IsTransparentRetryAttempt { + if val := ai.ctx.Value(retryCountKey{}); val != nil { + // Atomic increment and get new value + retryCount = uint32(atomic.AddInt32(val.(*int32), 1)) + } } // Note: Go always added Client and FailFast attributes even though they are not // defined by the OpenCensus gRPC spec. Thus, they are unimportant for @@ -50,8 +54,7 @@ func populateSpan(rs stats.RPCStats, ai *attemptInfo) { span.SetAttributes( attribute.Bool("Client", rs.Client), attribute.Bool("FailFast", rs.FailFast), - attribute.Int64("previous-rpc-attempts", int64(ai.previousRPCAttempts)), - attribute.Int64("retry-attempts", int64(atomic.LoadUint32(&ai.explicitRetryCount))), + attribute.Int64("previous-rpc-attempts", int64(retryCount)), attribute.Bool("transparent-retry", rs.IsTransparentRetryAttempt), ) // increment previous rpc attempts applicable for next attempt From 99e88d88d58029116a2769a964c617d2d0e8826d Mon Sep 17 00:00:00 2001 From: Vinothkumar Date: Thu, 22 May 2025 07:37:10 +0000 Subject: [PATCH 5/7] Fixed the review changes --- stats/opentelemetry/client_tracing.go | 13 ++++----- stats/opentelemetry/e2e_test.go | 42 ++++++++++++++++++++++----- stats/opentelemetry/opentelemetry.go | 12 ++++++++ stats/opentelemetry/trace.go | 18 +++++------- 4 files changed, 61 insertions(+), 24 deletions(-) diff --git a/stats/opentelemetry/client_tracing.go b/stats/opentelemetry/client_tracing.go index 7df423645a44..030e3594093c 100644 --- a/stats/opentelemetry/client_tracing.go +++ b/stats/opentelemetry/client_tracing.go @@ -48,6 +48,8 @@ func (h *clientTracingHandler) initializeTraces() { } func (h *clientTracingHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + ci := &callInfo{numRetries: 0} + ctx = setRetryCount(ctx, ci) ctx, _ = getOrCreateCallInfo(ctx, cc, method, opts...) var span trace.Span @@ -58,6 +60,8 @@ func (h *clientTracingHandler) unaryInterceptor(ctx context.Context, method stri } func (h *clientTracingHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + ci := &callInfo{numRetries: 0} + ctx = setRetryCount(ctx, ci) ctx, _ = getOrCreateCallInfo(ctx, cc, method, opts...) var span trace.Span @@ -124,14 +128,9 @@ type retryCountKey struct{} // TagRPC implements per RPC attempt context management for traces. func (h *clientTracingHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { ctx, ai := getOrCreateRPCAttemptInfo(ctx) - var counter *int32 - if val := ctx.Value(retryCountKey{}); val != nil { - counter = val.(*int32) - } else { - counter = new(int32) - ctx = context.WithValue(ctx, retryCountKey{}, counter) + if ci, ok := getRetryCount(ctx); ok { + ai.previousRPCAttempts = uint32(atomic.LoadInt32(&ci.numRetries)) } - ai.previousRPCAttempts = uint32(atomic.LoadInt32(counter)) ai.ctx = ctx ctx, ai = h.traceTagRPC(ctx, ai, info.NameResolutionDelay) return setRPCInfo(ctx, &rpcInfo{ai: ai}) diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 426f7601ca5a..d8003353e4d2 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -22,6 +22,7 @@ import ( "io" "slices" "strconv" + "strings" "testing" "time" @@ -928,7 +929,7 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { }, { Key: "previous-rpc-attempts", - Value: attribute.IntValue(1), + Value: attribute.IntValue(0), }, { Key: "transparent-retry", @@ -1021,7 +1022,7 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { }, { Key: "previous-rpc-attempts", - Value: attribute.IntValue(1), + Value: attribute.IntValue(0), }, { Key: "transparent-retry", @@ -1144,7 +1145,7 @@ func (s) TestSpan(t *testing.T) { }, { Key: "previous-rpc-attempts", - Value: attribute.IntValue(1), + Value: attribute.IntValue(0), }, { Key: "transparent-retry", @@ -1229,7 +1230,7 @@ func (s) TestSpan(t *testing.T) { }, { Key: "previous-rpc-attempts", - Value: attribute.IntValue(1), + Value: attribute.IntValue(0), }, { Key: "transparent-retry", @@ -1354,7 +1355,7 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { }, { Key: "previous-rpc-attempts", - Value: attribute.IntValue(1), + Value: attribute.IntValue(0), }, { Key: "transparent-retry", @@ -1439,7 +1440,7 @@ func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { }, { Key: "previous-rpc-attempts", - Value: attribute.IntValue(1), + Value: attribute.IntValue(0), }, { Key: "transparent-retry", @@ -1687,6 +1688,7 @@ func (s) TestTraceSpan_WithRetriesAndNameResolutionDelay(t *testing.T) { t.Fatal(err) } verifyTrace(t, spans, wantSpanInfo) + verifyPreviousRPCAttempts(t, spans) }) } } @@ -1708,6 +1710,32 @@ func verifyTrace(t *testing.T, spans tracetest.SpanStubs, want traceSpanInfo) { } } +func verifyPreviousRPCAttempts(t *testing.T, spans tracetest.SpanStubs) { + t.Helper() + const maxAttempts = 3 + foundAttempts := make(map[int]bool) + observedSpans := make(map[int][]string) + + for _, span := range spans { + if !strings.HasPrefix(span.Name, "Attempt.") { + continue + } + for _, attr := range span.Attributes { + if attr.Key == "previous-rpc-attempts" { + val := int(attr.Value.AsInt64()) + foundAttempts[val] = true + observedSpans[val] = append(observedSpans[val], span.Name) + } + } + } + + for i := range maxAttempts { + if !foundAttempts[i] { + t.Errorf("Missing span for retry attempt #%d (expected previous-rpc-attempts = %d)", i+1, i) + } + } +} + // TestStreamingRPC_TraceSequenceNumbers verifies that sequence numbers // are incremented correctly for multiple messages sent and received // during a streaming RPC. @@ -1777,7 +1805,7 @@ func (s) TestStreamingRPC_TraceSequenceNumbers(t *testing.T) { attributes: []attribute.KeyValue{ attribute.Bool("Client", true), attribute.Bool("FailFast", true), - attribute.Int("previous-rpc-attempts", 1), + attribute.Int("previous-rpc-attempts", 0), attribute.Bool("transparent-retry", false), }, }, diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index c73f2ee4aa2d..e01b74b690a8 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -179,6 +179,8 @@ type callInfo struct { // nameResolutionEventAdded is set when the resolver delay trace event // is added. Prevents duplicate events, since it is reported per-attempt. nameResolutionEventAdded atomic.Bool + // numRetries holds the count of non-transparent retry attempts. + numRetries int32 } type callInfoKey struct{} @@ -213,6 +215,16 @@ func getRPCInfo(ctx context.Context) *rpcInfo { return ri } +func setRetryCount(ctx context.Context, ci *callInfo) context.Context { + return context.WithValue(ctx, retryCountKey{}, ci) +} + +// getRetryCount retrieves the retry count tracking struct from the context. +func getRetryCount(ctx context.Context) (*callInfo, bool) { + ci, ok := ctx.Value(retryCountKey{}).(*callInfo) + return ci, ok +} + func removeLeadingSlash(mn string) string { return strings.TrimLeft(mn, "/") } diff --git a/stats/opentelemetry/trace.go b/stats/opentelemetry/trace.go index dc58d0afba7d..b2541a269fbd 100644 --- a/stats/opentelemetry/trace.go +++ b/stats/opentelemetry/trace.go @@ -41,24 +41,22 @@ func populateSpan(rs stats.RPCStats, ai *attemptInfo) { switch rs := rs.(type) { case *stats.Begin: - retryCount := ai.previousRPCAttempts - if !rs.IsTransparentRetryAttempt { - if val := ai.ctx.Value(retryCountKey{}); val != nil { - // Atomic increment and get new value - retryCount = uint32(atomic.AddInt32(val.(*int32), 1)) - } - } // Note: Go always added Client and FailFast attributes even though they are not // defined by the OpenCensus gRPC spec. Thus, they are unimportant for // correctness. span.SetAttributes( attribute.Bool("Client", rs.Client), attribute.Bool("FailFast", rs.FailFast), - attribute.Int64("previous-rpc-attempts", int64(retryCount)), + attribute.Int64("previous-rpc-attempts", int64(ai.previousRPCAttempts)), attribute.Bool("transparent-retry", rs.IsTransparentRetryAttempt), ) - // increment previous rpc attempts applicable for next attempt - atomic.AddUint32(&ai.previousRPCAttempts, 1) + // Increment retry count for the next attempt if not a transparent + // retry. + if !rs.IsTransparentRetryAttempt { + if ci, ok := getRetryCount(ai.ctx); ok { + atomic.AddInt32(&ci.numRetries, 1) + } + } case *stats.PickerUpdated: span.AddEvent("Delayed LB pick complete") case *stats.InPayload: From 586cf639714ca64b014e11d09c9264c4318e2d84 Mon Sep 17 00:00:00 2001 From: Vinothkumar Date: Mon, 26 May 2025 12:58:10 +0000 Subject: [PATCH 6/7] Fixed the review changes --- stats/opentelemetry/client_metrics.go | 1 + stats/opentelemetry/client_tracing.go | 9 +--- stats/opentelemetry/e2e_test.go | 61 ++++++++++++++------------- stats/opentelemetry/opentelemetry.go | 7 ++- stats/opentelemetry/trace.go | 7 ++- 5 files changed, 40 insertions(+), 45 deletions(-) diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index 7422bebd4f6e..2f25991906b5 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -81,6 +81,7 @@ func getOrCreateCallInfo(ctx context.Context, cc *grpc.ClientConn, method string } ctx = setCallInfo(ctx, ci) } + ctx = setRetryCount(ctx, ci) return ctx, ci } diff --git a/stats/opentelemetry/client_tracing.go b/stats/opentelemetry/client_tracing.go index 030e3594093c..0377582c93cc 100644 --- a/stats/opentelemetry/client_tracing.go +++ b/stats/opentelemetry/client_tracing.go @@ -20,7 +20,6 @@ import ( "context" "log" "strings" - "sync/atomic" otelcodes "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -48,8 +47,6 @@ func (h *clientTracingHandler) initializeTraces() { } func (h *clientTracingHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - ci := &callInfo{numRetries: 0} - ctx = setRetryCount(ctx, ci) ctx, _ = getOrCreateCallInfo(ctx, cc, method, opts...) var span trace.Span @@ -60,8 +57,6 @@ func (h *clientTracingHandler) unaryInterceptor(ctx context.Context, method stri } func (h *clientTracingHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { - ci := &callInfo{numRetries: 0} - ctx = setRetryCount(ctx, ci) ctx, _ = getOrCreateCallInfo(ctx, cc, method, opts...) var span trace.Span @@ -128,8 +123,8 @@ type retryCountKey struct{} // TagRPC implements per RPC attempt context management for traces. func (h *clientTracingHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { ctx, ai := getOrCreateRPCAttemptInfo(ctx) - if ci, ok := getRetryCount(ctx); ok { - ai.previousRPCAttempts = uint32(atomic.LoadInt32(&ci.numRetries)) + if ci, ok := retryCount(ctx); ok { + ai.previousRPCAttempts = uint32(ci.previousRPCAttempts.Load()) } ai.ctx = ctx ctx, ai = h.traceTagRPC(ctx, ai, info.NameResolutionDelay) diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index d8003353e4d2..98052a7bf56c 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -1678,22 +1678,36 @@ func (s) TestTraceSpan_WithRetriesAndNameResolutionDelay(t *testing.T) { t.Fatalf("%s call failed: %v", tt.name, err) } - wantSpanInfo := traceSpanInfo{ + methodName := strings.TrimPrefix(tt.spanName, "Sent.") + var wantSpanInfos []traceSpanInfo + wantSpanInfos = append(wantSpanInfos, traceSpanInfo{ name: tt.spanName, spanKind: oteltrace.SpanKindClient.String(), events: []trace.Event{{Name: delayedResolutionEventName}}, + }) + for i := range 3 { + wantSpanInfos = append(wantSpanInfos, traceSpanInfo{ + name: "Attempt." + methodName, + spanKind: oteltrace.SpanKindInternal.String(), + attributes: []attribute.KeyValue{ + attribute.Int64("previous-rpc-attempts", int64(i)), + }, + }) } - spans, err := waitForTraceSpans(ctx, exporter, []traceSpanInfo{wantSpanInfo}) + + spans, err := waitForTraceSpans(ctx, exporter, wantSpanInfos) if err != nil { t.Fatal(err) } - verifyTrace(t, spans, wantSpanInfo) - verifyPreviousRPCAttempts(t, spans) + for _, want := range wantSpanInfos { + verifyTrace(t, spans, want) + } }) } } func verifyTrace(t *testing.T, spans tracetest.SpanStubs, want traceSpanInfo) { + t.Helper() match := false for _, span := range spans { if span.Name == want.name && span.SpanKind.String() == want.spanKind { @@ -1704,35 +1718,22 @@ func verifyTrace(t *testing.T, spans tracetest.SpanStubs, want traceSpanInfo) { } break } - } - if !match { - t.Errorf("Expected span not found: %q (kind: %s)", want.name, want.spanKind) - } -} - -func verifyPreviousRPCAttempts(t *testing.T, spans tracetest.SpanStubs) { - t.Helper() - const maxAttempts = 3 - foundAttempts := make(map[int]bool) - observedSpans := make(map[int][]string) - - for _, span := range spans { - if !strings.HasPrefix(span.Name, "Attempt.") { - continue - } - for _, attr := range span.Attributes { - if attr.Key == "previous-rpc-attempts" { - val := int(attr.Value.AsInt64()) - foundAttempts[val] = true - observedSpans[val] = append(observedSpans[val], span.Name) + for _, wantAttr := range want.attributes { + for _, attr := range span.Attributes { + fmt.Println("Span Name", span.Name) + fmt.Println("want Name", want.name) + if attr.Key == wantAttr.Key && span.Name == want.name { + if attr.Value.AsInt64() != wantAttr.Value.AsInt64() { + t.Errorf("Span %q: %s = %d; want %d", span.Name, attr.Key, attr.Value.AsInt64(), wantAttr.Value.AsInt64()) + } + } } } - } - for i := range maxAttempts { - if !foundAttempts[i] { - t.Errorf("Missing span for retry attempt #%d (expected previous-rpc-attempts = %d)", i+1, i) - } + return + } + if !match { + t.Errorf("Expected span not found: %q (kind: %s)", want.name, want.spanKind) } } diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index e01b74b690a8..c29e6f27ce92 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -179,8 +179,8 @@ type callInfo struct { // nameResolutionEventAdded is set when the resolver delay trace event // is added. Prevents duplicate events, since it is reported per-attempt. nameResolutionEventAdded atomic.Bool - // numRetries holds the count of non-transparent retry attempts. - numRetries int32 + // previousRPCAttempts holds the count of non-transparent retry attempts. + previousRPCAttempts atomic.Int32 } type callInfoKey struct{} @@ -219,8 +219,7 @@ func setRetryCount(ctx context.Context, ci *callInfo) context.Context { return context.WithValue(ctx, retryCountKey{}, ci) } -// getRetryCount retrieves the retry count tracking struct from the context. -func getRetryCount(ctx context.Context) (*callInfo, bool) { +func retryCount(ctx context.Context) (*callInfo, bool) { ci, ok := ctx.Value(retryCountKey{}).(*callInfo) return ci, ok } diff --git a/stats/opentelemetry/trace.go b/stats/opentelemetry/trace.go index b2541a269fbd..1007a1c2152d 100644 --- a/stats/opentelemetry/trace.go +++ b/stats/opentelemetry/trace.go @@ -17,8 +17,6 @@ package opentelemetry import ( - "sync/atomic" - "go.opentelemetry.io/otel/attribute" otelcodes "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -53,8 +51,9 @@ func populateSpan(rs stats.RPCStats, ai *attemptInfo) { // Increment retry count for the next attempt if not a transparent // retry. if !rs.IsTransparentRetryAttempt { - if ci, ok := getRetryCount(ai.ctx); ok { - atomic.AddInt32(&ci.numRetries, 1) + if ci, ok := retryCount(ai.ctx); ok { + ci.previousRPCAttempts.Add(1) + ai.ctx = setRetryCount(ai.ctx, ci) } } case *stats.PickerUpdated: From 1bdad7e627cdbf153a343f6d33730b3f72a35a3b Mon Sep 17 00:00:00 2001 From: Vinothkumar Date: Mon, 26 May 2025 14:37:17 +0000 Subject: [PATCH 7/7] Fixed the test cases --- stats/opentelemetry/e2e_test.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 98052a7bf56c..ee65a71dadd0 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -1712,25 +1712,24 @@ func verifyTrace(t *testing.T, spans tracetest.SpanStubs, want traceSpanInfo) { for _, span := range spans { if span.Name == want.name && span.SpanKind.String() == want.spanKind { match = true - if diff := cmp.Diff(want.events, span.Events, cmpopts.IgnoreFields(trace.Event{}, "Time")); diff != "" { - t.Errorf("Span event mismatch for %q (kind: %s) (-want +got):\n%s", - want.name, want.spanKind, diff) + if len(want.events) > 0 { + if diff := cmp.Diff(want.events, span.Events, cmpopts.IgnoreFields(trace.Event{}, "Time")); diff != "" { + t.Errorf("Span event mismatch for %q (kind: %s) (-want +got):\n%s", + want.name, want.spanKind, diff) + } } break } for _, wantAttr := range want.attributes { for _, attr := range span.Attributes { - fmt.Println("Span Name", span.Name) - fmt.Println("want Name", want.name) - if attr.Key == wantAttr.Key && span.Name == want.name { + if attr.Key == "previous-rpc-attempts" && span.Name == want.name { if attr.Value.AsInt64() != wantAttr.Value.AsInt64() { - t.Errorf("Span %q: %s = %d; want %d", span.Name, attr.Key, attr.Value.AsInt64(), wantAttr.Value.AsInt64()) + t.Errorf("Span %q: attribute %s = %d; want %d", span.Name, attr.Key, attr.Value.AsInt64(), wantAttr.Value.AsInt64()) } + break } } } - - return } if !match { t.Errorf("Expected span not found: %q (kind: %s)", want.name, want.spanKind)