From 83b64d6617e7a844e298ecf09e2359f0d61fdc06 Mon Sep 17 00:00:00 2001 From: "Emmanuel E. Ebenezer" Date: Thu, 2 Jan 2025 01:15:37 +0100 Subject: [PATCH] Upgrade storage integration test: use `TraceWriter` (#6437) ## Which problem is this PR solving? - Part of #6366 ## Description of the changes - Incrementally swaps the fields of `StorageIntegration` to align with v2 storage api while supporting v1 api - [x] replaced `SpanWriter` with `TraceWriter` - Updates test functions accordingly to work with the updated fields ## How was this change tested? - CI ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `npm run lint` and `npm run test` --------- Signed-off-by: Emmanuel Emonueje Ebenezer --- .../internal/integration/e2e_integration.go | 5 +- .../internal/integration/span_writer.go | 79 ----------- .../internal/integration/trace_writer.go | 121 +++++++++++++++++ .../internal/integration/trace_writer_test.go | 124 ++++++++++++++++++ .../storage/integration/badgerstore_test.go | 3 +- plugin/storage/integration/cassandra_test.go | 3 +- .../storage/integration/elasticsearch_test.go | 3 +- plugin/storage/integration/grpc_test.go | 3 +- plugin/storage/integration/integration.go | 10 +- plugin/storage/integration/kafka_test.go | 2 +- plugin/storage/integration/memstore_test.go | 5 +- storage_v2/v1adapter/translator.go | 18 +++ storage_v2/v1adapter/translator_test.go | 30 +++++ 13 files changed, 312 insertions(+), 94 deletions(-) delete mode 100644 cmd/jaeger/internal/integration/span_writer.go create mode 100644 cmd/jaeger/internal/integration/trace_writer.go create mode 100644 cmd/jaeger/internal/integration/trace_writer_test.go diff --git a/cmd/jaeger/internal/integration/e2e_integration.go b/cmd/jaeger/internal/integration/e2e_integration.go index ac85121478e..5882d1f42e0 100644 --- a/cmd/jaeger/internal/integration/e2e_integration.go +++ b/cmd/jaeger/internal/integration/e2e_integration.go @@ -92,14 +92,15 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) { } cmd.Start(t) - s.SpanWriter, err = createSpanWriter(logger, otlpPort) + s.TraceWriter, err = createTraceWriter(logger, otlpPort) require.NoError(t, err) + s.TraceReader, err = createTraceReader(logger, ports.QueryGRPC) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, s.TraceReader.(io.Closer).Close()) - require.NoError(t, s.SpanWriter.(io.Closer).Close()) + require.NoError(t, s.TraceWriter.(io.Closer).Close()) }) } diff --git a/cmd/jaeger/internal/integration/span_writer.go b/cmd/jaeger/internal/integration/span_writer.go deleted file mode 100644 index 073eaec65ee..00000000000 --- a/cmd/jaeger/internal/integration/span_writer.go +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package integration - -import ( - "context" - "fmt" - "io" - "time" - - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configtls" - "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/exporter/otlpexporter" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/storage/spanstore" - "github.com/jaegertracing/jaeger/storage_v2/v1adapter" -) - -var ( - _ spanstore.Writer = (*spanWriter)(nil) - _ io.Closer = (*spanWriter)(nil) -) - -// SpanWriter utilizes the OTLP exporter to send span data to the Jaeger-v2 receiver -type spanWriter struct { - logger *zap.Logger - exporter exporter.Traces -} - -func createSpanWriter(logger *zap.Logger, port int) (*spanWriter, error) { - logger.Info("Creating the span writer", zap.Int("port", port)) - - factory := otlpexporter.NewFactory() - cfg := factory.CreateDefaultConfig().(*otlpexporter.Config) - cfg.Endpoint = fmt.Sprintf("localhost:%d", port) - cfg.Timeout = 30 * time.Second - cfg.RetryConfig.Enabled = false - cfg.QueueConfig.Enabled = false - cfg.TLSSetting = configtls.ClientConfig{ - Insecure: true, - } - - set := exportertest.NewNopSettings() - set.Logger = logger - - exp, err := factory.CreateTraces(context.Background(), set, cfg) - if err != nil { - return nil, err - } - if err := exp.Start(context.Background(), componenttest.NewNopHost()); err != nil { - return nil, err - } - - return &spanWriter{ - logger: logger, - exporter: exp, - }, nil -} - -func (w *spanWriter) Close() error { - w.logger.Info("Closing the span writer") - return w.exporter.Shutdown(context.Background()) -} - -func (w *spanWriter) WriteSpan(ctx context.Context, span *model.Span) error { - td := v1adapter.V1BatchesToTraces([]*model.Batch{ - { - Spans: []*model.Span{span}, - Process: span.Process, - }, - }) - - return w.exporter.ConsumeTraces(ctx, td) -} diff --git a/cmd/jaeger/internal/integration/trace_writer.go b/cmd/jaeger/internal/integration/trace_writer.go new file mode 100644 index 00000000000..c5286aa44d0 --- /dev/null +++ b/cmd/jaeger/internal/integration/trace_writer.go @@ -0,0 +1,121 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package integration + +import ( + "context" + "fmt" + "io" + "time" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/exporter/otlpexporter" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/internal/jptrace" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" +) + +var ( + _ tracestore.Writer = (*traceWriter)(nil) + _ io.Closer = (*traceWriter)(nil) + + MaxChunkSize = 35 // max chunk size otel kafka export can handle safely. +) + +// traceWriter utilizes the OTLP exporter to send span data to the Jaeger-v2 receiver +type traceWriter struct { + logger *zap.Logger + exporter exporter.Traces +} + +func createTraceWriter(logger *zap.Logger, port int) (*traceWriter, error) { + logger.Info("Creating the trace writer", zap.Int("port", port)) + + factory := otlpexporter.NewFactory() + cfg := factory.CreateDefaultConfig().(*otlpexporter.Config) + cfg.Endpoint = fmt.Sprintf("localhost:%d", port) + cfg.Timeout = 30 * time.Second + cfg.RetryConfig.Enabled = false + cfg.QueueConfig.Enabled = false + cfg.TLSSetting = configtls.ClientConfig{ + Insecure: true, + } + + set := exportertest.NewNopSettings() + set.Logger = logger + + exp, err := factory.CreateTraces(context.Background(), set, cfg) + if err != nil { + return nil, err + } + if err := exp.Start(context.Background(), componenttest.NewNopHost()); err != nil { + return nil, err + } + + return &traceWriter{ + logger: logger, + exporter: exp, + }, nil +} + +func (w *traceWriter) Close() error { + w.logger.Info("Closing the trace writer") + return w.exporter.Shutdown(context.Background()) +} + +func (w *traceWriter) WriteTraces(ctx context.Context, td ptrace.Traces) error { + var err error + currentChunk := ptrace.NewTraces() + currentResourceIndex := -1 + currentScopeIndex := -1 + spanCount := 0 + + jptrace.SpanIter(td)(func(pos jptrace.SpanIterPos, span ptrace.Span) bool { + var ( + scope ptrace.ScopeSpans + resource ptrace.ResourceSpans + ) + + if spanCount == MaxChunkSize { + err = w.exporter.ConsumeTraces(ctx, currentChunk) + currentChunk = ptrace.NewTraces() + spanCount = 0 + currentResourceIndex = -1 + currentScopeIndex = -1 + } + + if currentResourceIndex != pos.ResourceIndex { + resource = currentChunk.ResourceSpans().AppendEmpty() + td.ResourceSpans().At(pos.ResourceIndex).Resource().Attributes().CopyTo(resource.Resource().Attributes()) + currentResourceIndex = pos.ResourceIndex + currentScopeIndex = -1 + } else { + resource = currentChunk.ResourceSpans().At(currentChunk.ResourceSpans().Len() - 1) + } + + if currentScopeIndex != pos.ScopeIndex { + scope = resource.ScopeSpans().AppendEmpty() + td.ResourceSpans().At(pos.ResourceIndex).ScopeSpans().At(pos.ScopeIndex).Scope().CopyTo(scope.Scope()) + currentScopeIndex = pos.ScopeIndex + } else { + scope = resource.ScopeSpans().At(resource.ScopeSpans().Len() - 1) + } + + span.CopyTo(scope.Spans().AppendEmpty()) + spanCount++ + + return true + }) + + // write the last chunk if it has any spans + if spanCount > 0 { + err = w.exporter.ConsumeTraces(ctx, currentChunk) + } + return err +} diff --git a/cmd/jaeger/internal/integration/trace_writer_test.go b/cmd/jaeger/internal/integration/trace_writer_test.go new file mode 100644 index 00000000000..bfcaa38db86 --- /dev/null +++ b/cmd/jaeger/internal/integration/trace_writer_test.go @@ -0,0 +1,124 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package integration + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap/zaptest" +) + +type MockExporter struct { + mock.Mock + chunks []ptrace.Traces +} + +func (m *MockExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + m.chunks = append(m.chunks, td) + args := m.Called(ctx, td) + return args.Error(0) +} + +func (m *MockExporter) Capabilities() consumer.Capabilities { + args := m.Called() + return args.Get(0).(consumer.Capabilities) +} + +func (*MockExporter) Shutdown(context.Context) error { + return nil +} + +func (*MockExporter) Start(context.Context, component.Host) error { + return nil +} + +func TestWriteTraces(t *testing.T) { + td := ptrace.NewTraces() + resources := td.ResourceSpans() + + // Create resources and scopes + for i := 1; i <= 3; i++ { + resource := resources.AppendEmpty() + resource.Resource().Attributes().PutStr("service.name", fmt.Sprintf("NoServiceName%d", i)) + scope := resource.ScopeSpans().AppendEmpty() + scope.Scope().SetName(fmt.Sprintf("success-op-%d", i)) + } + + // Add span1 and span2 + scope1 := resources.At(0).ScopeSpans().At(0) + for i := 1; i <= 2; i++ { + span := scope1.Spans().AppendEmpty() + span.SetSpanID(pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, byte(i)})) + span.SetName(fmt.Sprintf("span%d", i)) + } + + // span3 + scope2 := resources.At(1).ScopeSpans().At(0) + span3 := scope2.Spans().AppendEmpty() + span3.SetSpanID(pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 3})) + span3.SetName("span3") + + // span4 and span5 + scope3 := resources.At(2).ScopeSpans().At(0) + for i := 1; i <= 2; i++ { + span := scope3.Spans().AppendEmpty() + span.SetSpanID(pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, byte(i + 3)})) + span.SetName(fmt.Sprintf("span%d", i+3)) + } + + mockExporter := &MockExporter{} + mockExporter.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil).Times(3) + tw := &traceWriter{ + logger: zaptest.NewLogger(t), + exporter: mockExporter, + } + origMaxChunkSize := MaxChunkSize + MaxChunkSize = 2 + err := tw.WriteTraces(context.Background(), td) + MaxChunkSize = origMaxChunkSize + + require.NoError(t, err) + mockExporter.AssertNumberOfCalls(t, "ConsumeTraces", 3) + mockExporter.AssertExpectations(t) + + chunks := mockExporter.chunks + require.Len(t, chunks, 3) + + assert.Equal(t, 1, chunks[0].ResourceSpans().Len()) // First chunk has 2 spans from one resource + assert.Equal(t, 2, chunks[1].ResourceSpans().Len()) // Second chunk has 2 spans from two resources + assert.Equal(t, 1, chunks[2].ResourceSpans().Len()) // Third chunk has one span from one resource + + // First chunk: NoServiceName1 span1 and 2 + firstChunkResource := chunks[0].ResourceSpans().At(0) + assert.Equal(t, "NoServiceName1", firstChunkResource.Resource().Attributes().AsRaw()["service.name"]) + firstChunkScope := firstChunkResource.ScopeSpans().At(0) + assert.Equal(t, "span1", firstChunkScope.Spans().At(0).Name()) + assert.Equal(t, "span2", firstChunkScope.Spans().At(1).Name()) + + // Second chunk: NoServiceName2 span3 and 4 + secondChunkResource := chunks[1].ResourceSpans().At(0) + assert.Equal(t, "NoServiceName2", secondChunkResource.Resource().Attributes().AsRaw()["service.name"]) + secondChunkScope := secondChunkResource.ScopeSpans().At(0) + assert.Equal(t, "span3", secondChunkScope.Spans().At(0).Name()) + + secondChunkResource2 := chunks[1].ResourceSpans().At(1) + assert.Equal(t, "NoServiceName3", secondChunkResource2.Resource().Attributes().AsRaw()["service.name"]) + secondChunkScope2 := secondChunkResource2.ScopeSpans().At(0) + assert.Equal(t, "span4", secondChunkScope2.Spans().At(0).Name()) + + // Third chunk: NoServiceName3 span5 + thirdChunkResource := chunks[2].ResourceSpans().At(0) + assert.Equal(t, "NoServiceName3", thirdChunkResource.Resource().Attributes().AsRaw()["service.name"]) + thirdChunkScope := thirdChunkResource.ScopeSpans().At(0) + assert.Equal(t, "span5", thirdChunkScope.Spans().At(0).Name()) +} diff --git a/plugin/storage/integration/badgerstore_test.go b/plugin/storage/integration/badgerstore_test.go index 23778fcb87a..3a9f7c2f436 100644 --- a/plugin/storage/integration/badgerstore_test.go +++ b/plugin/storage/integration/badgerstore_test.go @@ -33,8 +33,9 @@ func (s *BadgerIntegrationStorage) initialize(t *testing.T) { s.factory.Close() }) - s.SpanWriter, err = s.factory.CreateSpanWriter() + spanWriter, err := s.factory.CreateSpanWriter() require.NoError(t, err) + s.TraceWriter = v1adapter.NewTraceWriter(spanWriter) spanReader, err := s.factory.CreateSpanReader() require.NoError(t, err) diff --git a/plugin/storage/integration/cassandra_test.go b/plugin/storage/integration/cassandra_test.go index 559529498d9..a06ecdff681 100644 --- a/plugin/storage/integration/cassandra_test.go +++ b/plugin/storage/integration/cassandra_test.go @@ -73,8 +73,9 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) { }) s.factory = f var err error - s.SpanWriter, err = f.CreateSpanWriter() + spanWriter, err := f.CreateSpanWriter() require.NoError(t, err) + s.TraceWriter = v1adapter.NewTraceWriter(spanWriter) spanReader, err := f.CreateSpanReader() require.NoError(t, err) s.TraceReader = v1adapter.NewTraceReader(spanReader) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 5bc8ea02b16..007380733d7 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -133,8 +133,9 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) f := s.initializeESFactory(t, allTagsAsFields) s.factory = f var err error - s.SpanWriter, err = f.CreateSpanWriter() + spanWriter, err := f.CreateSpanWriter() require.NoError(t, err) + s.TraceWriter = v1adapter.NewTraceWriter(spanWriter) spanReader, err := f.CreateSpanReader() require.NoError(t, err) s.TraceReader = v1adapter.NewTraceReader(spanReader) diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go index 17e58887a1b..9d8e85bc29e 100644 --- a/plugin/storage/integration/grpc_test.go +++ b/plugin/storage/integration/grpc_test.go @@ -37,8 +37,9 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) { require.NoError(t, f.Initialize(metrics.NullFactory, logger)) s.factory = f - s.SpanWriter, err = f.CreateSpanWriter() + spanWriter, err := f.CreateSpanWriter() require.NoError(t, err) + s.TraceWriter = v1adapter.NewTraceWriter(spanWriter) spanReader, err := f.CreateSpanReader() require.NoError(t, err) s.TraceReader = v1adapter.NewTraceReader(spanReader) diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 982eba3e9fc..3e3a377c630 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -43,7 +43,7 @@ var fixtures embed.FS // Some implementations may declare multiple tests, with different settings, // and RunAll() under different conditions. type StorageIntegration struct { - SpanWriter spanstore.Writer + TraceWriter tracestore.Writer TraceReader tracestore.Reader ArchiveSpanReader spanstore.Reader ArchiveSpanWriter spanstore.Writer @@ -383,10 +383,10 @@ func (s *StorageIntegration) writeTrace(t *testing.T, trace *model.Trace) { t.Logf("%-23s Writing trace with %d spans", time.Now().Format("2006-01-02 15:04:05.999"), len(trace.Spans)) ctx, cx := context.WithTimeout(context.Background(), 5*time.Minute) defer cx() - for _, span := range trace.Spans { - err := s.SpanWriter.WriteSpan(ctx, span) - require.NoError(t, err, "Not expecting error when writing trace to storage") - } + otelTraces := v1adapter.V1TraceToOtelTrace(trace) + err := s.TraceWriter.WriteTraces(ctx, otelTraces) + require.NoError(t, err, "Not expecting error when writing trace to storage") + t.Logf("%-23s Finished writing trace with %d spans", time.Now().Format("2006-01-02 15:04:05.999"), len(trace.Spans)) } diff --git a/plugin/storage/integration/kafka_test.go b/plugin/storage/integration/kafka_test.go index 2b42773ca4a..dc434104657 100644 --- a/plugin/storage/integration/kafka_test.go +++ b/plugin/storage/integration/kafka_test.go @@ -91,9 +91,9 @@ func (s *KafkaIntegrationTestSuite) initialize(t *testing.T) { }) spanConsumer.Start() - s.SpanWriter = spanWriter spanReader := &ingester{traceStore} s.TraceReader = v1adapter.NewTraceReader(spanReader) + s.TraceWriter = v1adapter.NewTraceWriter(spanWriter) s.CleanUp = func(_ *testing.T) {} s.SkipArchiveTest = true } diff --git a/plugin/storage/integration/memstore_test.go b/plugin/storage/integration/memstore_test.go index f910a2f6ed0..daa38551cb0 100644 --- a/plugin/storage/integration/memstore_test.go +++ b/plugin/storage/integration/memstore_test.go @@ -25,9 +25,8 @@ func (s *MemStorageIntegrationTestSuite) initialize(_ *testing.T) { store := memory.NewStore() archiveStore := memory.NewStore() s.SamplingStore = memory.NewSamplingStore(2) - spanReader := store - s.TraceReader = v1adapter.NewTraceReader(spanReader) - s.SpanWriter = store + s.TraceReader = v1adapter.NewTraceReader(store) + s.TraceWriter = v1adapter.NewTraceWriter(store) s.ArchiveSpanReader = archiveStore s.ArchiveSpanWriter = archiveStore diff --git a/storage_v2/v1adapter/translator.go b/storage_v2/v1adapter/translator.go index 835bff80437..fe788cd87a7 100644 --- a/storage_v2/v1adapter/translator.go +++ b/storage_v2/v1adapter/translator.go @@ -61,6 +61,24 @@ func V1TracesFromSeq2(otelSeq iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace return jaegerTraces, nil } +// V1TraceToOtelTrace converts v1 traces (*model.Trace) to Otel traces (ptrace.Traces) +func V1TraceToOtelTrace(jTrace *model.Trace) ptrace.Traces { + batches := createBatchesFromModelTrace(jTrace) + return V1BatchesToTraces(batches) +} + +func createBatchesFromModelTrace(jTrace *model.Trace) []*model.Batch { + spans := jTrace.Spans + + if len(spans) == 0 { + return nil + } + batch := &model.Batch{ + Spans: jTrace.Spans, + } + return []*model.Batch{batch} +} + // modelTraceFromOtelTrace extracts spans from otel traces func modelTraceFromOtelTrace(otelTrace ptrace.Traces) *model.Trace { var spans []*model.Span diff --git a/storage_v2/v1adapter/translator_test.go b/storage_v2/v1adapter/translator_test.go index a3b5ca5ab17..8aa1fc6ca03 100644 --- a/storage_v2/v1adapter/translator_test.go +++ b/storage_v2/v1adapter/translator_test.go @@ -253,3 +253,33 @@ func TestV1TracesFromSeq2(t *testing.T) { }) } } + +func TestV1TraceToOtelTrace_ReturnsExptectedOtelTrace(t *testing.T) { + jTrace := &model.Trace{ + Spans: []*model.Span{ + { + TraceID: model.NewTraceID(2, 3), + SpanID: model.NewSpanID(1), + Process: model.NewProcess("Service1", nil), + OperationName: "two-resources-1", + }, { + TraceID: model.NewTraceID(2, 3), + SpanID: model.NewSpanID(2), + Process: model.NewProcess("service2", nil), + OperationName: "two-resources-2", + }, + }, + } + actualTrace := V1TraceToOtelTrace(jTrace) + + require.NotEmpty(t, actualTrace) + require.Equal(t, 2, actualTrace.ResourceSpans().Len()) +} + +func TestV1TraceToOtelTrace_ReturnEmptyOtelTrace(t *testing.T) { + jTrace := &model.Trace{} + eTrace := ptrace.NewTraces() + aTrace := V1TraceToOtelTrace(jTrace) + + require.Equal(t, eTrace.SpanCount(), aTrace.SpanCount(), 0) +}