diff --git a/cmd/jaeger/internal/extension/jaegerquery/internal/flags.go b/cmd/jaeger/internal/extension/jaegerquery/internal/flags.go index 502738bc7fd..bb65f9c0d68 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/internal/flags.go +++ b/cmd/jaeger/internal/extension/jaegerquery/internal/flags.go @@ -36,6 +36,9 @@ type QueryOptions struct { Tenancy tenancy.Options `mapstructure:"multi_tenancy"` // MaxClockSkewAdjust is the maximum duration by which jaeger-query will adjust a span. MaxClockSkewAdjust time.Duration `mapstructure:"max_clock_skew_adjust" valid:"optional"` + // MaxTraceSize is the max no. of spans allowed per trace. + // If a trace has more spans than this, it will be truncated and a warning will be added + MaxTraceSize int `mapstructure:"max_trace_size" valid:"optional"` // EnableTracing determines whether traces will be emitted by jaeger-query. EnableTracing bool `mapstructure:"enable_tracing"` // HTTP holds the HTTP configuration that the query service uses to serve requests. diff --git a/cmd/jaeger/internal/extension/jaegerquery/internal/querysvc/v2/querysvc/service.go b/cmd/jaeger/internal/extension/jaegerquery/internal/querysvc/v2/querysvc/service.go index 58b70aa259f..ad94d7e16e7 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/internal/querysvc/v2/querysvc/service.go +++ b/cmd/jaeger/internal/extension/jaegerquery/internal/querysvc/v2/querysvc/service.go @@ -29,6 +29,9 @@ type QueryServiceOptions struct { ArchiveTraceWriter tracestore.Writer // MaxClockSkewAdjust is the maximum duration by which to adjust a span. MaxClockSkewAdjust time.Duration + // MaxTraceSize is the max no. of spans allowed per trace. + // If a trace has more spans than this, it will be truncated and a warning will be added + MaxTraceSize int } // StorageCapabilities is a feature flag for query service @@ -200,7 +203,7 @@ func (qs QueryService) receiveTraces( if rawTraces { seq(processTraces) } else { - jptrace.AggregateTraces(seq)(func(trace ptrace.Traces, err error) bool { + jptrace.AggregateTracesWithLimit(seq, qs.options.MaxTraceSize)(func(trace ptrace.Traces, err error) bool { return processTraces([]ptrace.Traces{trace}, err) }) } diff --git a/cmd/jaeger/internal/extension/jaegerquery/internal/querysvc/v2/querysvc/service_test.go b/cmd/jaeger/internal/extension/jaegerquery/internal/querysvc/v2/querysvc/service_test.go index dfd096e197a..76f33abf4e0 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/internal/querysvc/v2/querysvc/service_test.go +++ b/cmd/jaeger/internal/extension/jaegerquery/internal/querysvc/v2/querysvc/service_test.go @@ -623,3 +623,127 @@ func TestGetCapabilities(t *testing.T) { }) } } + +// Consolidate Underlimit, Overlimit and Exactly at limit tests +func TestMaxTraceSize(t *testing.T) { + tests := []struct { + name string + maxTraceSize int + createTraces func() []ptrace.Traces + expectedSpans int + expectWarning bool + warningPattern string + }{ + { + name: "under_limit", + maxTraceSize: 5, + createTraces: func() []ptrace.Traces { + trace := ptrace.NewTraces() + scopes := trace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty() + for i := 0; i < 3; i++ { + span := scopes.Spans().AppendEmpty() + span.SetTraceID(testTraceID) + span.SetSpanID(pcommon.SpanID([8]byte{byte(i + 1)})) + span.SetName(fmt.Sprintf("span-%d", i)) + } + return []ptrace.Traces{trace} + }, + expectedSpans: 3, + expectWarning: false, + }, + { + name: "over_limit", + maxTraceSize: 3, + createTraces: func() []ptrace.Traces { + trace1 := ptrace.NewTraces() + scopes1 := trace1.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty() + for i := 0; i < 3; i++ { + span := scopes1.Spans().AppendEmpty() + span.SetTraceID(testTraceID) + span.SetSpanID(pcommon.SpanID([8]byte{byte(i + 1)})) + span.SetName(fmt.Sprintf("span-%d", i)) + } + + trace2 := ptrace.NewTraces() + scopes2 := trace2.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty() + for i := 3; i < 5; i++ { + span := scopes2.Spans().AppendEmpty() + span.SetTraceID(testTraceID) + span.SetSpanID(pcommon.SpanID([8]byte{byte(i + 1)})) + span.SetName(fmt.Sprintf("span-%d", i)) + } + return []ptrace.Traces{trace1, trace2} + }, + expectedSpans: 3, + expectWarning: true, + warningPattern: "trace has more than 3 spans", + }, + { + name: "exactly_at_limit", + maxTraceSize: 3, + createTraces: func() []ptrace.Traces { + trace := ptrace.NewTraces() + scopes := trace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty() + for i := 0; i < 3; i++ { + span := scopes.Spans().AppendEmpty() + span.SetTraceID(testTraceID) + span.SetSpanID(pcommon.SpanID([8]byte{byte(i + 1)})) + span.SetName(fmt.Sprintf("span-%d", i)) + } + return []ptrace.Traces{trace} + }, + expectedSpans: 3, + expectWarning: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + traces := tt.createTraces() + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + for _, trace := range traces { + if !yield([]ptrace.Traces{trace}, nil) { + return + } + } + }) + + traceReader := &tracestoremocks.Reader{} + dependencyStorage := &depstoremocks.Reader{} + options := QueryServiceOptions{ + MaxTraceSize: tt.maxTraceSize, + } + tqs := &testQueryService{} + tqs.queryService = NewQueryService(traceReader, dependencyStorage, options) + + params := GetTraceParams{ + TraceIDs: []tracestore.GetTraceParams{{TraceID: testTraceID}}, + } + traceReader.On("GetTraces", mock.Anything, params.TraceIDs). + Return(responseIter).Once() + + getTracesIter := tqs.queryService.GetTraces(context.Background(), params) + gotTraces, err := jiter.FlattenWithErrors(getTracesIter) + require.NoError(t, err) + require.Len(t, gotTraces, 1) + + // count total spans + actualSpans := gotTraces[0].SpanCount() + require.Equal(t, tt.expectedSpans, actualSpans) + + // check warning + firstSpan := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + warningsAttr, hasWarning := firstSpan.Attributes().Get("@jaeger@warnings") + + if tt.expectWarning { + require.True(t, hasWarning, "expected warning but none found") + require.Equal(t, pcommon.ValueTypeSlice, warningsAttr.Type()) + warnings := warningsAttr.Slice() + require.Positive(t, warnings.Len()) + require.Contains(t, warnings.At(warnings.Len()-1).Str(), tt.warningPattern) + } else { + require.False(t, hasWarning, "unexpected warning found") + } + }) + } +} diff --git a/cmd/jaeger/internal/extension/jaegerquery/server.go b/cmd/jaeger/internal/extension/jaegerquery/server.go index 22bbee4abe8..60d2b777bc9 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server.go @@ -106,6 +106,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error { } v2opts := v2querysvc.QueryServiceOptions{ MaxClockSkewAdjust: s.config.MaxClockSkewAdjust, + MaxTraceSize: s.config.MaxTraceSize, } if err := s.addArchiveStorage(&opts, &v2opts, host); err != nil { return err diff --git a/internal/jptrace/aggregator.go b/internal/jptrace/aggregator.go index fa8cde63646..c3c4a0b4196 100644 --- a/internal/jptrace/aggregator.go +++ b/internal/jptrace/aggregator.go @@ -4,6 +4,7 @@ package jptrace import ( + "fmt" "iter" "go.opentelemetry.io/collector/pdata/pcommon" @@ -14,9 +15,14 @@ import ( // // The `tracesSeq` input must adhere to the chunking requirements of tracestore.Reader.GetTraces. func AggregateTraces(tracesSeq iter.Seq2[[]ptrace.Traces, error]) iter.Seq2[ptrace.Traces, error] { + return AggregateTracesWithLimit(tracesSeq, 0) +} + +func AggregateTracesWithLimit(tracesSeq iter.Seq2[[]ptrace.Traces, error], maxSize int) iter.Seq2[ptrace.Traces, error] { return func(yield func(trace ptrace.Traces, err error) bool) { currentTrace := ptrace.NewTraces() currentTraceID := pcommon.NewTraceIDEmpty() + spanCount := 0 tracesSeq(func(traces []ptrace.Traces, err error) bool { if err != nil { @@ -27,7 +33,7 @@ func AggregateTraces(tracesSeq iter.Seq2[[]ptrace.Traces, error]) iter.Seq2[ptra resources := trace.ResourceSpans() traceID := resources.At(0).ScopeSpans().At(0).Spans().At(0).TraceID() if currentTraceID == traceID { - mergeTraces(trace, currentTrace) + mergeTraces(currentTrace, trace, maxSize, &spanCount) } else { if currentTrace.ResourceSpans().Len() > 0 { if !yield(currentTrace, nil) { @@ -36,6 +42,12 @@ func AggregateTraces(tracesSeq iter.Seq2[[]ptrace.Traces, error]) iter.Seq2[ptra } currentTrace = trace currentTraceID = traceID + spanCount = trace.SpanCount() + if maxSize > 0 && spanCount > maxSize { + currentTrace = ptrace.NewTraces() + spanCount = 0 + mergeTraces(currentTrace, trace, maxSize, &spanCount) + } } } return true @@ -46,10 +58,68 @@ func AggregateTraces(tracesSeq iter.Seq2[[]ptrace.Traces, error]) iter.Seq2[ptra } } -func mergeTraces(src, dest ptrace.Traces) { - resources := src.ResourceSpans() - for i := 0; i < resources.Len(); i++ { - resource := resources.At(i) - resource.CopyTo(dest.ResourceSpans().AppendEmpty()) +func mergeTraces(dest, src ptrace.Traces, maxSize int, spanCount *int) bool { + // early exit if already at max + if maxSize > 0 && *spanCount >= maxSize { + markTraceTruncated(dest, maxSize) + return true + } + + incomingCount := src.SpanCount() + // check if we can merge all spans without exceeding limit + if maxSize <= 0 || *spanCount+incomingCount <= maxSize { + resources := src.ResourceSpans() + for i := 0; i < resources.Len(); i++ { + resource := resources.At(i) + resource.CopyTo(dest.ResourceSpans().AppendEmpty()) + } + *spanCount += incomingCount + return false } + + // partial copy + remaining := maxSize - *spanCount + if remaining > 0 { + copySpansUpToLimit(dest, src, remaining) + *spanCount = maxSize + } + markTraceTruncated(dest, maxSize) + return true +} + +func copySpansUpToLimit(dest, src ptrace.Traces, limit int) { + copied := 0 + + for _, srcResource := range src.ResourceSpans().All() { + if copied >= limit { + return + } + destResource := dest.ResourceSpans().AppendEmpty() + srcResource.Resource().CopyTo(destResource.Resource()) + destResource.SetSchemaUrl(srcResource.SchemaUrl()) + + for _, srcScope := range srcResource.ScopeSpans().All() { + if copied >= limit { + return + } + destScope := destResource.ScopeSpans().AppendEmpty() + srcScope.Scope().CopyTo(destScope.Scope()) + destScope.SetSchemaUrl(srcScope.SchemaUrl()) + + for _, span := range srcScope.Spans().All() { + if copied >= limit { + return + } + span.CopyTo(destScope.Spans().AppendEmpty()) + copied++ + } + } + } +} + +func markTraceTruncated(trace ptrace.Traces, maxSize int) { + // direct access to first span (if truncated, it must exist) + firstSpan := trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + AddWarnings(firstSpan, + fmt.Sprintf("trace has more than %d spans, showing first %d spans only", maxSize, maxSize)) } diff --git a/internal/jptrace/aggregator_test.go b/internal/jptrace/aggregator_test.go index abeeaaa0845..ea872add4c1 100644 --- a/internal/jptrace/aggregator_test.go +++ b/internal/jptrace/aggregator_test.go @@ -4,6 +4,7 @@ package jptrace import ( + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -134,3 +135,77 @@ func TestAggregateTraces_RespectsEarlyReturn(t *testing.T) { require.Equal(t, trace1, lastResult) } + +func TestAggregateTracesWithLimit(t *testing.T) { + createTrace := func(traceID byte, spanCount int) ptrace.Traces { + trace := ptrace.NewTraces() + spans := trace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans() + for i := 0; i < spanCount; i++ { + span := spans.AppendEmpty() + span.SetTraceID(pcommon.TraceID([16]byte{traceID})) + } + return trace + } + + tests := []struct { + name string + maxSize int + inputSpans int + expectedSpans int + expectTruncate bool + }{ + {"no_limit", 0, 5, 5, false}, + {"under_limit", 10, 5, 5, false}, + {"over_limit", 3, 5, 3, true}, + {"exact_limit", 5, 5, 5, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tracesSeq := func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{createTrace(1, tt.inputSpans)}, nil) + } + + var result []ptrace.Traces + AggregateTracesWithLimit(tracesSeq, tt.maxSize)(func(trace ptrace.Traces, _ error) bool { + result = append(result, trace) + return true + }) + + require.Len(t, result, 1) + assert.Equal(t, tt.expectedSpans, result[0].SpanCount()) + + // Check for truncation warning + if tt.expectTruncate { + firstSpan := result[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + warnings := GetWarnings(firstSpan) + assert.NotEmpty(t, warnings, "expected truncation warning") + assert.Contains(t, warnings[len(warnings)-1], fmt.Sprintf("trace has more than %d spans", tt.maxSize)) + } + }) + } +} + +func TestCopySpansUpToLimit(t *testing.T) { + src := ptrace.NewTraces() + spans := src.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans() + for i := 0; i < 5; i++ { + spans.AppendEmpty().SetName("span") + } + + dest := ptrace.NewTraces() + copySpansUpToLimit(dest, src, 3) + + assert.Equal(t, 3, dest.SpanCount()) +} + +func TestMarkAndCheckTruncated(t *testing.T) { + trace := ptrace.NewTraces() + firstSpan := trace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + assert.Empty(t, GetWarnings(firstSpan)) + markTraceTruncated(trace, 10) + // Now should have truncation warning + warnings := GetWarnings(firstSpan) + assert.NotEmpty(t, warnings) + assert.Contains(t, warnings[0], "trace has more than 10 spans") +}