Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/jaeger/internal/extension/jaegerquery/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type QueryOptions struct {
Tenancy tenancy.Options `mapstructure:"multi_tenancy"`
// MaxClockSkewAdjust is the maximum duration by which jaeger-query will adjust a span.
MaxClockSkewAdjust time.Duration `mapstructure:"max_clock_skew_adjust" valid:"optional"`
// MaxTraceSize is the max no. of spans allowed per trace.
// If a trace has more spans than this, it will be truncated and a warning will be added
MaxTraceSize int `mapstructure:"max_trace_size" valid:"optional"`
// EnableTracing determines whether traces will be emitted by jaeger-query.
EnableTracing bool `mapstructure:"enable_tracing"`
// HTTP holds the HTTP configuration that the query service uses to serve requests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package querysvc
import (
"context"
"errors"
"fmt"
"iter"
"time"

Expand All @@ -29,6 +30,9 @@ type QueryServiceOptions struct {
ArchiveTraceWriter tracestore.Writer
// MaxClockSkewAdjust is the maximum duration by which to adjust a span.
MaxClockSkewAdjust time.Duration
// MaxTraceSize is the max no. of spans allowed per trace.
// If a trace has more spans than this, it will be truncated and a warning will be added
MaxTraceSize int
}

// StorageCapabilities is a feature flag for query service
Expand Down Expand Up @@ -200,10 +204,38 @@ func (qs QueryService) receiveTraces(
if rawTraces {
seq(processTraces)
} else {
jptrace.AggregateTraces(seq)(func(trace ptrace.Traces, err error) bool {
jptrace.AggregateTracesWithLimit(seq, qs.options.MaxTraceSize)(func(trace ptrace.Traces, err error) bool {
// Add warning if trace was truncated
if err == nil && qs.options.MaxTraceSize > 0 && jptrace.IsTraceTruncated(trace) {
qs.addTruncationWarning(trace)
}
return processTraces([]ptrace.Traces{trace}, err)
})
}

return foundTraceIDs, proceed
}

// add a warning to the first span of the trace
func (qs QueryService) addTruncationWarning(trace ptrace.Traces) {
resources := trace.ResourceSpans()
if resources.Len() == 0 {
return
}

scopes := resources.At(0).ScopeSpans()
if scopes.Len() == 0 {
return
}

spans := scopes.At(0).Spans()
if spans.Len() == 0 {
return
}

firstSpan := spans.At(0)
firstSpan.Attributes().Remove("@jaeger@truncated")
jptrace.AddWarnings(firstSpan,
fmt.Sprintf("trace has more than %d spans, showing first %d spans only",
qs.options.MaxTraceSize, qs.options.MaxTraceSize))
}
Original file line number Diff line number Diff line change
Expand Up @@ -623,3 +623,162 @@ func TestGetCapabilities(t *testing.T) {
})
}
}

func TestMaxTraceSize_UnderLimit(t *testing.T) {
// 3 spans
trace := ptrace.NewTraces()
resources := trace.ResourceSpans().AppendEmpty()
scopes := resources.ScopeSpans().AppendEmpty()
for i := 0; i < 3; i++ {
span := scopes.Spans().AppendEmpty()
span.SetTraceID(testTraceID)
span.SetSpanID(pcommon.SpanID([8]byte{byte(i + 1)}))
span.SetName(fmt.Sprintf("span-%d", i))
}

responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
yield([]ptrace.Traces{trace}, nil)
})

traceReader := &tracestoremocks.Reader{}
dependencyStorage := &depstoremocks.Reader{}
options := QueryServiceOptions{
MaxTraceSize: 5, // limit is 5, but trace has only 3 spans
}
tqs := &testQueryService{}
tqs.queryService = NewQueryService(traceReader, dependencyStorage, options)

params := GetTraceParams{
TraceIDs: []tracestore.GetTraceParams{{TraceID: testTraceID}},
}
traceReader.On("GetTraces", mock.Anything, params.TraceIDs).
Return(responseIter).Once()

getTracesIter := tqs.queryService.GetTraces(context.Background(), params)
gotTraces, err := jiter.FlattenWithErrors(getTracesIter)
require.NoError(t, err)
require.Len(t, gotTraces, 1)

gotSpans := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans()
require.Equal(t, 3, gotSpans.Len())

// no warning should be present
firstSpan := gotSpans.At(0)
_, hasWarning := firstSpan.Attributes().Get("@jaeger@warnings")
require.False(t, hasWarning)
}

func TestMaxTraceSize_OverLimit(t *testing.T) {
// 5 spans split across 2 batches
trace1 := ptrace.NewTraces()
resources1 := trace1.ResourceSpans().AppendEmpty()
scopes1 := resources1.ScopeSpans().AppendEmpty()
for i := 0; i < 3; i++ {
span := scopes1.Spans().AppendEmpty()
span.SetTraceID(testTraceID)
span.SetSpanID(pcommon.SpanID([8]byte{byte(i + 1)}))
span.SetName(fmt.Sprintf("span-%d", i))
}

trace2 := ptrace.NewTraces()
resources2 := trace2.ResourceSpans().AppendEmpty()
scopes2 := resources2.ScopeSpans().AppendEmpty()
for i := 3; i < 5; i++ {
span := scopes2.Spans().AppendEmpty()
span.SetTraceID(testTraceID)
span.SetSpanID(pcommon.SpanID([8]byte{byte(i + 1)}))
span.SetName(fmt.Sprintf("span-%d", i))
}

responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
if !yield([]ptrace.Traces{trace1}, nil) {
return
}
yield([]ptrace.Traces{trace2}, nil)
})

traceReader := &tracestoremocks.Reader{}
dependencyStorage := &depstoremocks.Reader{}
options := QueryServiceOptions{
MaxTraceSize: 3, // Limit is 3, but trace has 5 spans total
}
tqs := &testQueryService{}
tqs.queryService = NewQueryService(traceReader, dependencyStorage, options)

params := GetTraceParams{
TraceIDs: []tracestore.GetTraceParams{{TraceID: testTraceID}},
}
traceReader.On("GetTraces", mock.Anything, params.TraceIDs).
Return(responseIter).Once()

getTracesIter := tqs.queryService.GetTraces(context.Background(), params)
gotTraces, err := jiter.FlattenWithErrors(getTracesIter)
require.NoError(t, err)
require.Len(t, gotTraces, 1)

// count total spans in the result
totalSpans := 0
resources := gotTraces[0].ResourceSpans()
for i := 0; i < resources.Len(); i++ {
scopes := resources.At(i).ScopeSpans()
for j := 0; j < scopes.Len(); j++ {
totalSpans += scopes.At(j).Spans().Len()
}
}

// Only 3 spans should be present(the limit)
require.Equal(t, 3, totalSpans)

// there should be a warning for the first span
firstSpan := resources.At(0).ScopeSpans().At(0).Spans().At(0)
warningsAttr, hasWarning := firstSpan.Attributes().Get("@jaeger@warnings")
require.True(t, hasWarning)
require.Equal(t, pcommon.ValueTypeSlice, warningsAttr.Type())
warnings := warningsAttr.Slice()
require.Positive(t, warnings.Len())
require.Contains(t, warnings.At(warnings.Len()-1).Str(), "trace has more than 3 spans")
}

func TestMaxTraceSize_ExactlyAtLimit(t *testing.T) {
// 3 spans
trace := ptrace.NewTraces()
resources := trace.ResourceSpans().AppendEmpty()
scopes := resources.ScopeSpans().AppendEmpty()
for i := 0; i < 3; i++ {
span := scopes.Spans().AppendEmpty()
span.SetTraceID(testTraceID)
span.SetSpanID(pcommon.SpanID([8]byte{byte(i + 1)}))
span.SetName(fmt.Sprintf("span-%d", i))
}

responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
yield([]ptrace.Traces{trace}, nil)
})

traceReader := &tracestoremocks.Reader{}
dependencyStorage := &depstoremocks.Reader{}
options := QueryServiceOptions{
MaxTraceSize: 3, // Limit is exactly 3, trace has 3 spans
}
tqs := &testQueryService{}
tqs.queryService = NewQueryService(traceReader, dependencyStorage, options)

params := GetTraceParams{
TraceIDs: []tracestore.GetTraceParams{{TraceID: testTraceID}},
}
traceReader.On("GetTraces", mock.Anything, params.TraceIDs).
Return(responseIter).Once()

getTracesIter := tqs.queryService.GetTraces(context.Background(), params)
gotTraces, err := jiter.FlattenWithErrors(getTracesIter)
require.NoError(t, err)
require.Len(t, gotTraces, 1)

gotSpans := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans()
// all 3 spans should be present
require.Equal(t, 3, gotSpans.Len())

firstSpan := gotSpans.At(0)
_, hasWarning := firstSpan.Attributes().Get("@jaeger@warnings")
require.False(t, hasWarning)
}
1 change: 1 addition & 0 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
}
v2opts := v2querysvc.QueryServiceOptions{
MaxClockSkewAdjust: s.config.MaxClockSkewAdjust,
MaxTraceSize: s.config.MaxTraceSize,
}
if err := s.addArchiveStorage(&opts, &v2opts, host); err != nil {
return err
Expand Down
Loading
Loading