Skip to content

Commit

Permalink
Upgrade storage integration test: use TraceWriter (#6437)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Part of #6366

## Description of the changes
- Incrementally swaps the fields of `StorageIntegration` to align with
v2 storage api while supporting v1 api
  - [x] replaced `SpanWriter` with `TraceWriter`
- Updates test functions accordingly to work with the updated fields

## How was this change tested?
- CI

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Emmanuel Emonueje Ebenezer <[email protected]>
  • Loading branch information
ekefan authored Jan 2, 2025
1 parent 244b759 commit 83b64d6
Show file tree
Hide file tree
Showing 13 changed files with 312 additions and 94 deletions.
5 changes: 3 additions & 2 deletions cmd/jaeger/internal/integration/e2e_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,15 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
}
cmd.Start(t)

s.SpanWriter, err = createSpanWriter(logger, otlpPort)
s.TraceWriter, err = createTraceWriter(logger, otlpPort)
require.NoError(t, err)

s.TraceReader, err = createTraceReader(logger, ports.QueryGRPC)
require.NoError(t, err)

t.Cleanup(func() {
require.NoError(t, s.TraceReader.(io.Closer).Close())
require.NoError(t, s.SpanWriter.(io.Closer).Close())
require.NoError(t, s.TraceWriter.(io.Closer).Close())
})
}

Expand Down
79 changes: 0 additions & 79 deletions cmd/jaeger/internal/integration/span_writer.go

This file was deleted.

121 changes: 121 additions & 0 deletions cmd/jaeger/internal/integration/trace_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package integration

import (
"context"
"fmt"
"io"
"time"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

var (
_ tracestore.Writer = (*traceWriter)(nil)
_ io.Closer = (*traceWriter)(nil)

MaxChunkSize = 35 // max chunk size otel kafka export can handle safely.
)

// traceWriter utilizes the OTLP exporter to send span data to the Jaeger-v2 receiver
type traceWriter struct {
logger *zap.Logger
exporter exporter.Traces
}

func createTraceWriter(logger *zap.Logger, port int) (*traceWriter, error) {
logger.Info("Creating the trace writer", zap.Int("port", port))

factory := otlpexporter.NewFactory()
cfg := factory.CreateDefaultConfig().(*otlpexporter.Config)
cfg.Endpoint = fmt.Sprintf("localhost:%d", port)
cfg.Timeout = 30 * time.Second
cfg.RetryConfig.Enabled = false
cfg.QueueConfig.Enabled = false
cfg.TLSSetting = configtls.ClientConfig{
Insecure: true,
}

set := exportertest.NewNopSettings()
set.Logger = logger

exp, err := factory.CreateTraces(context.Background(), set, cfg)
if err != nil {
return nil, err
}
if err := exp.Start(context.Background(), componenttest.NewNopHost()); err != nil {
return nil, err
}

return &traceWriter{
logger: logger,
exporter: exp,
}, nil
}

func (w *traceWriter) Close() error {
w.logger.Info("Closing the trace writer")
return w.exporter.Shutdown(context.Background())
}

func (w *traceWriter) WriteTraces(ctx context.Context, td ptrace.Traces) error {
var err error
currentChunk := ptrace.NewTraces()
currentResourceIndex := -1
currentScopeIndex := -1
spanCount := 0

jptrace.SpanIter(td)(func(pos jptrace.SpanIterPos, span ptrace.Span) bool {
var (
scope ptrace.ScopeSpans
resource ptrace.ResourceSpans
)

if spanCount == MaxChunkSize {
err = w.exporter.ConsumeTraces(ctx, currentChunk)
currentChunk = ptrace.NewTraces()
spanCount = 0
currentResourceIndex = -1
currentScopeIndex = -1
}

if currentResourceIndex != pos.ResourceIndex {
resource = currentChunk.ResourceSpans().AppendEmpty()
td.ResourceSpans().At(pos.ResourceIndex).Resource().Attributes().CopyTo(resource.Resource().Attributes())
currentResourceIndex = pos.ResourceIndex
currentScopeIndex = -1
} else {
resource = currentChunk.ResourceSpans().At(currentChunk.ResourceSpans().Len() - 1)
}

if currentScopeIndex != pos.ScopeIndex {
scope = resource.ScopeSpans().AppendEmpty()
td.ResourceSpans().At(pos.ResourceIndex).ScopeSpans().At(pos.ScopeIndex).Scope().CopyTo(scope.Scope())
currentScopeIndex = pos.ScopeIndex
} else {
scope = resource.ScopeSpans().At(resource.ScopeSpans().Len() - 1)
}

span.CopyTo(scope.Spans().AppendEmpty())
spanCount++

return true
})

// write the last chunk if it has any spans
if spanCount > 0 {
err = w.exporter.ConsumeTraces(ctx, currentChunk)
}
return err
}
124 changes: 124 additions & 0 deletions cmd/jaeger/internal/integration/trace_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package integration

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap/zaptest"
)

type MockExporter struct {
mock.Mock
chunks []ptrace.Traces
}

func (m *MockExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
m.chunks = append(m.chunks, td)
args := m.Called(ctx, td)
return args.Error(0)
}

func (m *MockExporter) Capabilities() consumer.Capabilities {
args := m.Called()
return args.Get(0).(consumer.Capabilities)
}

func (*MockExporter) Shutdown(context.Context) error {
return nil
}

func (*MockExporter) Start(context.Context, component.Host) error {
return nil
}

func TestWriteTraces(t *testing.T) {
td := ptrace.NewTraces()
resources := td.ResourceSpans()

// Create resources and scopes
for i := 1; i <= 3; i++ {
resource := resources.AppendEmpty()
resource.Resource().Attributes().PutStr("service.name", fmt.Sprintf("NoServiceName%d", i))
scope := resource.ScopeSpans().AppendEmpty()
scope.Scope().SetName(fmt.Sprintf("success-op-%d", i))
}

// Add span1 and span2
scope1 := resources.At(0).ScopeSpans().At(0)
for i := 1; i <= 2; i++ {
span := scope1.Spans().AppendEmpty()
span.SetSpanID(pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, byte(i)}))
span.SetName(fmt.Sprintf("span%d", i))
}

// span3
scope2 := resources.At(1).ScopeSpans().At(0)
span3 := scope2.Spans().AppendEmpty()
span3.SetSpanID(pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 3}))
span3.SetName("span3")

// span4 and span5
scope3 := resources.At(2).ScopeSpans().At(0)
for i := 1; i <= 2; i++ {
span := scope3.Spans().AppendEmpty()
span.SetSpanID(pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, byte(i + 3)}))
span.SetName(fmt.Sprintf("span%d", i+3))
}

mockExporter := &MockExporter{}
mockExporter.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil).Times(3)
tw := &traceWriter{
logger: zaptest.NewLogger(t),
exporter: mockExporter,
}
origMaxChunkSize := MaxChunkSize
MaxChunkSize = 2
err := tw.WriteTraces(context.Background(), td)
MaxChunkSize = origMaxChunkSize

require.NoError(t, err)
mockExporter.AssertNumberOfCalls(t, "ConsumeTraces", 3)
mockExporter.AssertExpectations(t)

chunks := mockExporter.chunks
require.Len(t, chunks, 3)

assert.Equal(t, 1, chunks[0].ResourceSpans().Len()) // First chunk has 2 spans from one resource
assert.Equal(t, 2, chunks[1].ResourceSpans().Len()) // Second chunk has 2 spans from two resources
assert.Equal(t, 1, chunks[2].ResourceSpans().Len()) // Third chunk has one span from one resource

// First chunk: NoServiceName1 span1 and 2
firstChunkResource := chunks[0].ResourceSpans().At(0)
assert.Equal(t, "NoServiceName1", firstChunkResource.Resource().Attributes().AsRaw()["service.name"])
firstChunkScope := firstChunkResource.ScopeSpans().At(0)
assert.Equal(t, "span1", firstChunkScope.Spans().At(0).Name())
assert.Equal(t, "span2", firstChunkScope.Spans().At(1).Name())

// Second chunk: NoServiceName2 span3 and 4
secondChunkResource := chunks[1].ResourceSpans().At(0)
assert.Equal(t, "NoServiceName2", secondChunkResource.Resource().Attributes().AsRaw()["service.name"])
secondChunkScope := secondChunkResource.ScopeSpans().At(0)
assert.Equal(t, "span3", secondChunkScope.Spans().At(0).Name())

secondChunkResource2 := chunks[1].ResourceSpans().At(1)
assert.Equal(t, "NoServiceName3", secondChunkResource2.Resource().Attributes().AsRaw()["service.name"])
secondChunkScope2 := secondChunkResource2.ScopeSpans().At(0)
assert.Equal(t, "span4", secondChunkScope2.Spans().At(0).Name())

// Third chunk: NoServiceName3 span5
thirdChunkResource := chunks[2].ResourceSpans().At(0)
assert.Equal(t, "NoServiceName3", thirdChunkResource.Resource().Attributes().AsRaw()["service.name"])
thirdChunkScope := thirdChunkResource.ScopeSpans().At(0)
assert.Equal(t, "span5", thirdChunkScope.Spans().At(0).Name())
}
3 changes: 2 additions & 1 deletion plugin/storage/integration/badgerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ func (s *BadgerIntegrationStorage) initialize(t *testing.T) {
s.factory.Close()
})

s.SpanWriter, err = s.factory.CreateSpanWriter()
spanWriter, err := s.factory.CreateSpanWriter()
require.NoError(t, err)
s.TraceWriter = v1adapter.NewTraceWriter(spanWriter)

spanReader, err := s.factory.CreateSpanReader()
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) {
})
s.factory = f
var err error
s.SpanWriter, err = f.CreateSpanWriter()
spanWriter, err := f.CreateSpanWriter()
require.NoError(t, err)
s.TraceWriter = v1adapter.NewTraceWriter(spanWriter)
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool)
f := s.initializeESFactory(t, allTagsAsFields)
s.factory = f
var err error
s.SpanWriter, err = f.CreateSpanWriter()
spanWriter, err := f.CreateSpanWriter()
require.NoError(t, err)
s.TraceWriter = v1adapter.NewTraceWriter(spanWriter)
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) {
require.NoError(t, f.Initialize(metrics.NullFactory, logger))
s.factory = f

s.SpanWriter, err = f.CreateSpanWriter()
spanWriter, err := f.CreateSpanWriter()
require.NoError(t, err)
s.TraceWriter = v1adapter.NewTraceWriter(spanWriter)
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
Expand Down
Loading

0 comments on commit 83b64d6

Please sign in to comment.