Skip to content
69 changes: 47 additions & 22 deletions internal/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger-idl/model/v1"
"github.com/jaegertracing/jaeger/internal/jiter"
"github.com/jaegertracing/jaeger/internal/storage/v1/api/samplingstore"
samplemodel "github.com/jaegertracing/jaeger/internal/storage/v1/api/samplingstore/model"
"github.com/jaegertracing/jaeger/internal/storage/v2/api/depstore"
Expand Down Expand Up @@ -198,6 +200,7 @@ func (s *StorageIntegration) testGetServices(t *testing.T) {
t.Log(err)
continue
}

for _, trace := range traces {
for _, span := range trace.Spans {
t.Logf("span: Service: %s, TraceID: %s, Operation: %s", service, span.TraceID, span.OperationName)
Expand Down Expand Up @@ -229,10 +232,10 @@ func (s *StorageIntegration) helperTestGetTrace(
expected := s.writeLargeTraceWithDuplicateSpanIds(t, traceSize, duplicateCount)
expectedTraceID := v1adapter.FromV1TraceID(expected.Spans[0].TraceID)

actual := &model.Trace{} // no spans
var actual ptrace.Traces
found := s.waitForCondition(t, func(_ *testing.T) bool {
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID})
traces, err := v1adapter.V1TracesFromSeq2(iterTraces)
traces, err := jiter.FlattenWithErrors(iterTraces)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the point of this change? You still transform the result in to v1 model via GetFirstTrace().

if err != nil {
t.Logf("Error loading trace: %v", err)
return false
Expand All @@ -241,17 +244,24 @@ func (s *StorageIntegration) helperTestGetTrace(
return false
}
actual = traces[0]
return len(actual.Spans) >= len(expected.Spans)

if actual.SpanCount() == 0 {
return false
}
return actual.SpanCount() >= len(expected.Spans)
})

t.Logf("%-23s Loaded trace, expected=%d, actual=%d", time.Now().Format("2006-01-02 15:04:05.999"), len(expected.Spans), len(actual.Spans))
if !assert.True(t, found, "error loading trace, expected=%d, actual=%d", len(expected.Spans), len(actual.Spans)) {
CompareTraces(t, expected, actual)
trace := GetFirstTrace(actual)

t.Logf("%-23s Loaded trace, expected=%d, actual=%d",
time.Now().Format("2006-01-02 15:04:05.999"), len(expected.Spans), len(trace.Spans))
if !assert.True(t, found, "error loading trace, expected=%d, actual=%d", len(expected.Spans), len(trace.Spans)) {
CompareTraces(t, expected, trace)
return
}

if validator != nil {
validator(t, actual)
validator(t, trace)
}
}

Expand Down Expand Up @@ -324,30 +334,36 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) {
expected := s.loadParseAndWriteExampleTrace(t)
expectedTraceID := v1adapter.FromV1TraceID(expected.Spans[0].TraceID)

actual := &model.Trace{} // no spans
found := s.waitForCondition(t, func(t *testing.T) bool {
var actual ptrace.Traces
found := s.waitForCondition(t, func(_ *testing.T) bool {
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID})
traces, err := v1adapter.V1TracesFromSeq2(iterTraces)
traces, err := jiter.FlattenWithErrors(iterTraces)
if err != nil {
t.Log(err)
t.Logf("Error loading trace: %v", err)
return false
}
if len(traces) == 0 {
return false
}
actual = traces[0]
return len(actual.Spans) == len(expected.Spans)
return actual.SpanCount() == len(expected.Spans)
})

trace := GetFirstTrace(actual)

if !assert.True(t, found) {
CompareTraces(t, expected, actual)
CompareTraces(t, expected, trace)
}

t.Run("NotFound error", func(t *testing.T) {
fakeTraceID := v1adapter.FromV1TraceID(model.TraceID{High: 0, Low: 1})
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: fakeTraceID})
traces, err := v1adapter.V1TracesFromSeq2(iterTraces)
require.NoError(t, err) // v2 TraceReader no longer returns an error for not found
assert.Empty(t, traces)

traces, err := jiter.FlattenWithErrors(iterTraces)
require.NoError(t, err)

v1Traces := OTLPTracesToV1Slice(traces)
assert.Empty(t, v1Traces)
})
}

Expand Down Expand Up @@ -386,27 +402,36 @@ func (s *StorageIntegration) testFindTraces(t *testing.T) {
}

func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *tracestore.TraceQueryParams, expected []*model.Trace) []*model.Trace {
var traces []*model.Trace
var traces []ptrace.Traces
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
iterTraces := s.TraceReader.FindTraces(context.Background(), *query)
traces, err = v1adapter.V1TracesFromSeq2(iterTraces)
traces = nil
var err error
traces, err = jiter.FlattenWithErrors(iterTraces)
if err != nil {
t.Log(err)
return false
}

if len(expected) != len(traces) {
t.Logf("Expecting certain number of traces: expected: %d, actual: %d", len(expected), len(traces))
return false
}
if spanCount(expected) != spanCount(traces) {
t.Logf("Excepting certain number of spans: expected: %d, actual: %d", spanCount(expected), spanCount(traces))

actualSpanCount := 0
for _, trace := range traces {
actualSpanCount += trace.SpanCount()
}

if spanCount(expected) != actualSpanCount {
t.Logf("Excepting certain number of spans: expected: %d, actual: %d", spanCount(expected), actualSpanCount)
return false
}
return true
})
require.True(t, found)
return traces

return OTLPTracesToV1Slice(traces)
}

func (s *StorageIntegration) writeTrace(t *testing.T, trace *model.Trace) {
Expand Down
61 changes: 61 additions & 0 deletions internal/storage/integration/trace_compare_otlp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package integration

import (
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger-idl/model/v1"
"github.com/jaegertracing/jaeger/internal/storage/v2/v1adapter"
)

// GetFirstTrace extracts the first trace from OTLP traces
func GetFirstTrace(traces ptrace.Traces) *model.Trace {
if traces.SpanCount() == 0 {
return &model.Trace{}
}
batches := v1adapter.V1BatchesFromTraces(traces)
if len(batches) == 0 {
return &model.Trace{}
}

trace := &model.Trace{}
for _, batch := range batches {
for _, span := range batch.Spans {
if batch.Process != nil {
processCopy := model.Process{
ServiceName: batch.Process.ServiceName,
Tags: make([]model.KeyValue, len(batch.Process.Tags)),
}
copy(processCopy.Tags, batch.Process.Tags)
span.Process = &processCopy
}

// Normalize nil slices to empty slices
if span.Tags == nil {
span.Tags = []model.KeyValue{}
}
if span.Logs == nil {
span.Logs = []model.Log{}
}
if span.References == nil {
span.References = []model.SpanRef{}
}
}
trace.Spans = append(trace.Spans, batch.Spans...)
}
return trace
}

// OTLPTracesToV1Slice converts OTLP traces to v1 slice for comparison
func OTLPTracesToV1Slice(traces []ptrace.Traces) []*model.Trace {
var result []*model.Trace
for _, otlpTrace := range traces {
trace := GetFirstTrace(otlpTrace)
if len(trace.Spans) > 0 {
result = append(result, trace)
}
}
return result
}
Loading