Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/jaeger/internal/extension/jaegerquery/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type QueryOptions struct {
Tenancy tenancy.Options `mapstructure:"multi_tenancy"`
// MaxClockSkewAdjust is the maximum duration by which jaeger-query will adjust a span.
MaxClockSkewAdjust time.Duration `mapstructure:"max_clock_skew_adjust" valid:"optional"`
// MaxTraceSize is the max no. of spans allowed per trace.
// If a trace has more spans than this, it will be truncated and a warning will be added
MaxTraceSize int `mapstructure:"max_trace_size" valid:"optional"`
// EnableTracing determines whether traces will be emitted by jaeger-query.
EnableTracing bool `mapstructure:"enable_tracing"`
// HTTP holds the HTTP configuration that the query service uses to serve requests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type QueryServiceOptions struct {
ArchiveTraceWriter tracestore.Writer
// MaxClockSkewAdjust is the maximum duration by which to adjust a span.
MaxClockSkewAdjust time.Duration
// MaxTraceSize is the max no. of spans allowed per trace.
// If a trace has more spans than this, it will be truncated and a warning will be added
MaxTraceSize int
}

// StorageCapabilities is a feature flag for query service
Expand Down Expand Up @@ -200,7 +203,7 @@ func (qs QueryService) receiveTraces(
if rawTraces {
seq(processTraces)
} else {
jptrace.AggregateTraces(seq)(func(trace ptrace.Traces, err error) bool {
jptrace.AggregateTracesWithLimit(seq, qs.options.MaxTraceSize)(func(trace ptrace.Traces, err error) bool {
return processTraces([]ptrace.Traces{trace}, err)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,3 +623,127 @@ func TestGetCapabilities(t *testing.T) {
})
}
}

// Consolidate Underlimit, Overlimit and Exactly at limit tests
func TestMaxTraceSize(t *testing.T) {
tests := []struct {
name string
maxTraceSize int
createTraces func() []ptrace.Traces
expectedSpans int
expectWarning bool
warningPattern string
}{
{
name: "under_limit",
maxTraceSize: 5,
createTraces: func() []ptrace.Traces {
trace := ptrace.NewTraces()
scopes := trace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty()
for i := 0; i < 3; i++ {
span := scopes.Spans().AppendEmpty()
span.SetTraceID(testTraceID)
span.SetSpanID(pcommon.SpanID([8]byte{byte(i + 1)}))
span.SetName(fmt.Sprintf("span-%d", i))
}
return []ptrace.Traces{trace}
},
expectedSpans: 3,
expectWarning: false,
},
{
name: "over_limit",
maxTraceSize: 3,
createTraces: func() []ptrace.Traces {
trace1 := ptrace.NewTraces()
scopes1 := trace1.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty()
for i := 0; i < 3; i++ {
span := scopes1.Spans().AppendEmpty()
span.SetTraceID(testTraceID)
span.SetSpanID(pcommon.SpanID([8]byte{byte(i + 1)}))
span.SetName(fmt.Sprintf("span-%d", i))
}

trace2 := ptrace.NewTraces()
scopes2 := trace2.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty()
for i := 3; i < 5; i++ {
span := scopes2.Spans().AppendEmpty()
span.SetTraceID(testTraceID)
span.SetSpanID(pcommon.SpanID([8]byte{byte(i + 1)}))
span.SetName(fmt.Sprintf("span-%d", i))
}
return []ptrace.Traces{trace1, trace2}
},
expectedSpans: 3,
expectWarning: true,
warningPattern: "trace has more than 3 spans",
},
{
name: "exactly_at_limit",
maxTraceSize: 3,
createTraces: func() []ptrace.Traces {
trace := ptrace.NewTraces()
scopes := trace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty()
for i := 0; i < 3; i++ {
span := scopes.Spans().AppendEmpty()
span.SetTraceID(testTraceID)
span.SetSpanID(pcommon.SpanID([8]byte{byte(i + 1)}))
span.SetName(fmt.Sprintf("span-%d", i))
}
return []ptrace.Traces{trace}
},
expectedSpans: 3,
expectWarning: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
traces := tt.createTraces()
responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
for _, trace := range traces {
if !yield([]ptrace.Traces{trace}, nil) {
return
}
}
})

traceReader := &tracestoremocks.Reader{}
dependencyStorage := &depstoremocks.Reader{}
options := QueryServiceOptions{
MaxTraceSize: tt.maxTraceSize,
}
tqs := &testQueryService{}
tqs.queryService = NewQueryService(traceReader, dependencyStorage, options)

params := GetTraceParams{
TraceIDs: []tracestore.GetTraceParams{{TraceID: testTraceID}},
}
traceReader.On("GetTraces", mock.Anything, params.TraceIDs).
Return(responseIter).Once()

getTracesIter := tqs.queryService.GetTraces(context.Background(), params)
gotTraces, err := jiter.FlattenWithErrors(getTracesIter)
require.NoError(t, err)
require.Len(t, gotTraces, 1)

// count total spans
actualSpans := gotTraces[0].SpanCount()
require.Equal(t, tt.expectedSpans, actualSpans)

// check warning
firstSpan := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
warningsAttr, hasWarning := firstSpan.Attributes().Get("@jaeger@warnings")

if tt.expectWarning {
require.True(t, hasWarning, "expected warning but none found")
require.Equal(t, pcommon.ValueTypeSlice, warningsAttr.Type())
warnings := warningsAttr.Slice()
require.Positive(t, warnings.Len())
require.Contains(t, warnings.At(warnings.Len()-1).Str(), tt.warningPattern)
} else {
require.False(t, hasWarning, "unexpected warning found")
}
})
}
}
1 change: 1 addition & 0 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
}
v2opts := v2querysvc.QueryServiceOptions{
MaxClockSkewAdjust: s.config.MaxClockSkewAdjust,
MaxTraceSize: s.config.MaxTraceSize,
}
if err := s.addArchiveStorage(&opts, &v2opts, host); err != nil {
return err
Expand Down
82 changes: 76 additions & 6 deletions internal/jptrace/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package jptrace

import (
"fmt"
"iter"

"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -14,9 +15,14 @@ import (
//
// The `tracesSeq` input must adhere to the chunking requirements of tracestore.Reader.GetTraces.
func AggregateTraces(tracesSeq iter.Seq2[[]ptrace.Traces, error]) iter.Seq2[ptrace.Traces, error] {
return AggregateTracesWithLimit(tracesSeq, 0)
}

func AggregateTracesWithLimit(tracesSeq iter.Seq2[[]ptrace.Traces, error], maxSize int) iter.Seq2[ptrace.Traces, error] {
return func(yield func(trace ptrace.Traces, err error) bool) {
currentTrace := ptrace.NewTraces()
currentTraceID := pcommon.NewTraceIDEmpty()
spanCount := 0

tracesSeq(func(traces []ptrace.Traces, err error) bool {
if err != nil {
Expand All @@ -27,7 +33,7 @@ func AggregateTraces(tracesSeq iter.Seq2[[]ptrace.Traces, error]) iter.Seq2[ptra
resources := trace.ResourceSpans()
traceID := resources.At(0).ScopeSpans().At(0).Spans().At(0).TraceID()
if currentTraceID == traceID {
mergeTraces(trace, currentTrace)
mergeTraces(currentTrace, trace, maxSize, &spanCount)
} else {
if currentTrace.ResourceSpans().Len() > 0 {
if !yield(currentTrace, nil) {
Expand All @@ -36,6 +42,12 @@ func AggregateTraces(tracesSeq iter.Seq2[[]ptrace.Traces, error]) iter.Seq2[ptra
}
currentTrace = trace
currentTraceID = traceID
spanCount = trace.SpanCount()
if maxSize > 0 && spanCount > maxSize {
currentTrace = ptrace.NewTraces()
spanCount = 0
mergeTraces(currentTrace, trace, maxSize, &spanCount)
}
}
}
return true
Expand All @@ -46,10 +58,68 @@ func AggregateTraces(tracesSeq iter.Seq2[[]ptrace.Traces, error]) iter.Seq2[ptra
}
}

func mergeTraces(src, dest ptrace.Traces) {
resources := src.ResourceSpans()
for i := 0; i < resources.Len(); i++ {
resource := resources.At(i)
resource.CopyTo(dest.ResourceSpans().AppendEmpty())
func mergeTraces(dest, src ptrace.Traces, maxSize int, spanCount *int) bool {
// early exit if already at max
if maxSize > 0 && *spanCount >= maxSize {
markTraceTruncated(dest, maxSize)
return true
}

incomingCount := src.SpanCount()
// check if we can merge all spans without exceeding limit
if maxSize <= 0 || *spanCount+incomingCount <= maxSize {
resources := src.ResourceSpans()
for i := 0; i < resources.Len(); i++ {
resource := resources.At(i)
resource.CopyTo(dest.ResourceSpans().AppendEmpty())
}
*spanCount += incomingCount
return false
}

// partial copy
remaining := maxSize - *spanCount
if remaining > 0 {
copySpansUpToLimit(dest, src, remaining)
*spanCount = maxSize
}
markTraceTruncated(dest, maxSize)
return true
}

func copySpansUpToLimit(dest, src ptrace.Traces, limit int) {
copied := 0

for _, srcResource := range src.ResourceSpans().All() {
if copied >= limit {
return
}
destResource := dest.ResourceSpans().AppendEmpty()
srcResource.Resource().CopyTo(destResource.Resource())
destResource.SetSchemaUrl(srcResource.SchemaUrl())

for _, srcScope := range srcResource.ScopeSpans().All() {
if copied >= limit {
return
}
destScope := destResource.ScopeSpans().AppendEmpty()
srcScope.Scope().CopyTo(destScope.Scope())
destScope.SetSchemaUrl(srcScope.SchemaUrl())

for _, span := range srcScope.Spans().All() {
if copied >= limit {
return
}
span.CopyTo(destScope.Spans().AppendEmpty())
copied++
}
}
}
}

func markTraceTruncated(trace ptrace.Traces, maxSize int) {
// direct access to first span (if truncated, it must exist)
firstSpan := trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
AddWarnings(firstSpan,
fmt.Sprintf("trace has more than %d spans, showing first %d spans only", maxSize, maxSize))
}
75 changes: 75 additions & 0 deletions internal/jptrace/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package jptrace

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -134,3 +135,77 @@ func TestAggregateTraces_RespectsEarlyReturn(t *testing.T) {

require.Equal(t, trace1, lastResult)
}

func TestAggregateTracesWithLimit(t *testing.T) {
createTrace := func(traceID byte, spanCount int) ptrace.Traces {
trace := ptrace.NewTraces()
spans := trace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans()
for i := 0; i < spanCount; i++ {
span := spans.AppendEmpty()
span.SetTraceID(pcommon.TraceID([16]byte{traceID}))
}
return trace
}

tests := []struct {
name string
maxSize int
inputSpans int
expectedSpans int
expectTruncate bool
}{
{"no_limit", 0, 5, 5, false},
{"under_limit", 10, 5, 5, false},
{"over_limit", 3, 5, 3, true},
{"exact_limit", 5, 5, 5, false},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tracesSeq := func(yield func([]ptrace.Traces, error) bool) {
yield([]ptrace.Traces{createTrace(1, tt.inputSpans)}, nil)
}

var result []ptrace.Traces
AggregateTracesWithLimit(tracesSeq, tt.maxSize)(func(trace ptrace.Traces, _ error) bool {
result = append(result, trace)
return true
})

require.Len(t, result, 1)
assert.Equal(t, tt.expectedSpans, result[0].SpanCount())

// Check for truncation warning
if tt.expectTruncate {
firstSpan := result[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
warnings := GetWarnings(firstSpan)
assert.NotEmpty(t, warnings, "expected truncation warning")
assert.Contains(t, warnings[len(warnings)-1], fmt.Sprintf("trace has more than %d spans", tt.maxSize))
}
})
}
}

func TestCopySpansUpToLimit(t *testing.T) {
src := ptrace.NewTraces()
spans := src.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans()
for i := 0; i < 5; i++ {
spans.AppendEmpty().SetName("span")
}

dest := ptrace.NewTraces()
copySpansUpToLimit(dest, src, 3)

assert.Equal(t, 3, dest.SpanCount())
}

func TestMarkAndCheckTruncated(t *testing.T) {
trace := ptrace.NewTraces()
firstSpan := trace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
assert.Empty(t, GetWarnings(firstSpan))
markTraceTruncated(trace, 10)
// Now should have truncation warning
warnings := GetWarnings(firstSpan)
assert.NotEmpty(t, warnings)
assert.Contains(t, warnings[0], "trace has more than 10 spans")
}
Loading