From a63cd19e425d4a5667b3920bb66207afddb7cdde Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sat, 4 Jan 2025 21:17:32 -0500 Subject: [PATCH 1/6] Refactor collector pipeline to allow v1/v2 data model Signed-off-by: Yuri Shkuro --- cmd/collector/app/collector_test.go | 5 +- cmd/collector/app/handler/grpc_handler.go | 7 +- .../app/handler/grpc_handler_test.go | 12 +- .../app/handler/thrift_span_handler.go | 6 +- .../app/handler/thrift_span_handler_test.go | 7 +- cmd/collector/app/model_consumer.go | 3 +- cmd/collector/app/options.go | 2 +- cmd/collector/app/options_test.go | 4 +- cmd/collector/app/processor/interface.go | 7 +- cmd/collector/app/server/test.go | 3 +- cmd/collector/app/span_processor.go | 14 +-- cmd/collector/app/span_processor_test.go | 106 +++++++++++------- 12 files changed, 102 insertions(+), 74 deletions(-) diff --git a/cmd/collector/app/collector_test.go b/cmd/collector/app/collector_test.go index 4211f88cbbe..1c2722494a7 100644 --- a/cmd/collector/app/collector_test.go +++ b/cmd/collector/app/collector_test.go @@ -262,7 +262,10 @@ func TestAggregator(t *testing.T) { }, }, } - _, err := c.spanProcessor.ProcessSpans(spans, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) + _, err := c.spanProcessor.ProcessSpans(processor.Spans{ + SpansV1: spans, + SpanFormat: processor.JaegerSpanFormat, + }) require.NoError(t, err) require.NoError(t, c.Close()) diff --git a/cmd/collector/app/handler/grpc_handler.go b/cmd/collector/app/handler/grpc_handler.go index 853bd899f11..3cdf5150217 100644 --- a/cmd/collector/app/handler/grpc_handler.go +++ b/cmd/collector/app/handler/grpc_handler.go @@ -47,7 +47,7 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) type batchConsumer struct { logger *zap.Logger spanProcessor processor.SpanProcessor - spanOptions processor.SpansOptions + spanOptions processor.Spans // common settings for all spans tenancyMgr *tenancy.Manager } @@ -55,7 +55,7 @@ func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, return batchConsumer{ logger: logger, spanProcessor: spanProcessor, - spanOptions: processor.SpansOptions{ + spanOptions: processor.Spans{ InboundTransport: transport, SpanFormat: spanFormat, }, @@ -75,7 +75,8 @@ func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error { span.Process = batch.Process } } - _, err = c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{ + _, err = c.spanProcessor.ProcessSpans(processor.Spans{ + SpansV1: batch.Spans, InboundTransport: c.spanOptions.InboundTransport, SpanFormat: c.spanOptions.SpanFormat, Tenant: tenant, diff --git a/cmd/collector/app/handler/grpc_handler_test.go b/cmd/collector/app/handler/grpc_handler_test.go index 2501ced7461..64c43cf3546 100644 --- a/cmd/collector/app/handler/grpc_handler_test.go +++ b/cmd/collector/app/handler/grpc_handler_test.go @@ -33,17 +33,17 @@ type mockSpanProcessor struct { spanFormat processor.SpanFormat } -func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts processor.SpansOptions) ([]bool, error) { +func (p *mockSpanProcessor) ProcessSpans(spans processor.Spans) ([]bool, error) { p.mux.Lock() defer p.mux.Unlock() - p.spans = append(p.spans, spans...) - oks := make([]bool, len(spans)) + p.spans = append(p.spans, spans.SpansV1...) + oks := make([]bool, len(spans.SpansV1)) if p.tenants == nil { p.tenants = make(map[string]bool) } - p.tenants[opts.Tenant] = true - p.transport = opts.InboundTransport - p.spanFormat = opts.SpanFormat + p.tenants[spans.Tenant] = true + p.transport = spans.InboundTransport + p.spanFormat = spans.SpanFormat return oks, p.expectedError } diff --git a/cmd/collector/app/handler/thrift_span_handler.go b/cmd/collector/app/handler/thrift_span_handler.go index b6940b4bc3c..aea4370af2f 100644 --- a/cmd/collector/app/handler/thrift_span_handler.go +++ b/cmd/collector/app/handler/thrift_span_handler.go @@ -54,7 +54,8 @@ func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options mSpan := jConv.ToDomainSpan(span, batch.Process) mSpans = append(mSpans, mSpan) } - oks, err := jbh.modelProcessor.ProcessSpans(mSpans, processor.SpansOptions{ + oks, err := jbh.modelProcessor.ProcessSpans(processor.Spans{ + SpansV1: mSpans, InboundTransport: options.InboundTransport, SpanFormat: processor.JaegerSpanFormat, }) @@ -105,7 +106,8 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options convCount[i] = len(converted) mSpans = append(mSpans, converted...) } - bools, err := h.modelProcessor.ProcessSpans(mSpans, processor.SpansOptions{ + bools, err := h.modelProcessor.ProcessSpans(processor.Spans{ + SpansV1: mSpans, InboundTransport: options.InboundTransport, SpanFormat: processor.ZipkinSpanFormat, }) diff --git a/cmd/collector/app/handler/thrift_span_handler_test.go b/cmd/collector/app/handler/thrift_span_handler_test.go index 512944fc284..d926055cb57 100644 --- a/cmd/collector/app/handler/thrift_span_handler_test.go +++ b/cmd/collector/app/handler/thrift_span_handler_test.go @@ -16,7 +16,6 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/processor" zipkinsanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" - "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) @@ -58,12 +57,12 @@ type shouldIErrorProcessor struct { var errTestError = errors.New("Whoops") -func (s *shouldIErrorProcessor) ProcessSpans(mSpans []*model.Span, _ processor.SpansOptions) ([]bool, error) { +func (s *shouldIErrorProcessor) ProcessSpans(spans processor.Spans) ([]bool, error) { if s.shouldError { return nil, errTestError } - retMe := make([]bool, len(mSpans)) - for i := range mSpans { + retMe := make([]bool, len(spans.SpansV1)) + for i := range spans.SpansV1 { retMe[i] = true } return retMe, nil diff --git a/cmd/collector/app/model_consumer.go b/cmd/collector/app/model_consumer.go index a4a7636982e..0ad848f38b6 100644 --- a/cmd/collector/app/model_consumer.go +++ b/cmd/collector/app/model_consumer.go @@ -5,6 +5,7 @@ package app import ( + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" ) @@ -12,7 +13,7 @@ import ( type ProcessSpan func(span *model.Span, tenant string) // ProcessSpans processes a batch of Domain Model Spans -type ProcessSpans func(spans []*model.Span, tenant string) +type ProcessSpans func(spans processor.Spans) // FilterSpan decides whether to allow or disallow a span type FilterSpan func(span *model.Span) bool diff --git a/cmd/collector/app/options.go b/cmd/collector/app/options.go index 03f4eb8c6f7..21b296a3e74 100644 --- a/cmd/collector/app/options.go +++ b/cmd/collector/app/options.go @@ -176,7 +176,7 @@ func (options) apply(opts ...Option) options { ret.hostMetrics = metrics.NullFactory } if ret.preProcessSpans == nil { - ret.preProcessSpans = func(_ []*model.Span, _ /* tenant */ string) {} + ret.preProcessSpans = func(_ processor.Spans) {} } if ret.sanitizer == nil { ret.sanitizer = func(span *model.Span) *model.Span { return span } diff --git a/cmd/collector/app/options_test.go b/cmd/collector/app/options_test.go index 94f388920ae..ed06f5a707d 100644 --- a/cmd/collector/app/options_test.go +++ b/cmd/collector/app/options_test.go @@ -27,7 +27,7 @@ func TestAllOptionSet(t *testing.T) { Options.ServiceMetrics(metrics.NullFactory), Options.Logger(zap.NewNop()), Options.NumWorkers(5), - Options.PreProcessSpans(func(_ []*model.Span, _ /* tenant */ string) {}), + Options.PreProcessSpans(func(_ processor.Spans) {}), Options.Sanitizer(func(span *model.Span) *model.Span { return span }), Options.QueueSize(10), Options.DynQueueSizeWarmup(1000), @@ -53,7 +53,7 @@ func TestNoOptionsSet(t *testing.T) { assert.Nil(t, opts.collectorTags) assert.False(t, opts.reportBusy) assert.False(t, opts.blockingSubmit) - assert.NotPanics(t, func() { opts.preProcessSpans(nil, "") }) + assert.NotPanics(t, func() { opts.preProcessSpans(processor.Spans{}) }) assert.NotPanics(t, func() { opts.preSave(nil, "") }) assert.True(t, opts.spanFilter(nil)) span := model.Span{} diff --git a/cmd/collector/app/processor/interface.go b/cmd/collector/app/processor/interface.go index 03fe67c0b57..7ef623a271e 100644 --- a/cmd/collector/app/processor/interface.go +++ b/cmd/collector/app/processor/interface.go @@ -8,13 +8,16 @@ import ( "io" "github.com/jaegertracing/jaeger/model" + "go.opentelemetry.io/collector/pdata/ptrace" ) // ErrBusy signalizes that processor cannot process incoming data var ErrBusy = errors.New("server busy") // SpansOptions additional options passed to processor along with the spans. -type SpansOptions struct { +type Spans struct { + SpansV1 []*model.Span + SpansV2 ptrace.Traces SpanFormat SpanFormat InboundTransport InboundTransport Tenant string @@ -23,7 +26,7 @@ type SpansOptions struct { // SpanProcessor handles model spans type SpanProcessor interface { // ProcessSpans processes model spans and return with either a list of true/false success or an error - ProcessSpans(mSpans []*model.Span, options SpansOptions) ([]bool, error) + ProcessSpans(spans Spans) ([]bool, error) io.Closer } diff --git a/cmd/collector/app/server/test.go b/cmd/collector/app/server/test.go index 57dbebe48a4..fa92dbf0ab1 100644 --- a/cmd/collector/app/server/test.go +++ b/cmd/collector/app/server/test.go @@ -7,7 +7,6 @@ import ( "context" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" - "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) @@ -27,6 +26,6 @@ func (*mockSpanProcessor) Close() error { return nil } -func (*mockSpanProcessor) ProcessSpans([]*model.Span, processor.SpansOptions) ([]bool, error) { +func (*mockSpanProcessor) ProcessSpans(_ processor.Spans) ([]bool, error) { return []bool{}, nil } diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index 8377abb363a..abea3c9a19b 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -164,22 +164,22 @@ func (sp *spanProcessor) countSpan(span *model.Span, _ string /* tenant */) { sp.spansProcessed.Add(1) } -func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options processor.SpansOptions) ([]bool, error) { - sp.preProcessSpans(mSpans, options.Tenant) - sp.metrics.BatchSize.Update(int64(len(mSpans))) - retMe := make([]bool, len(mSpans)) +func (sp *spanProcessor) ProcessSpans(spans processor.Spans) ([]bool, error) { + sp.preProcessSpans(spans) + sp.metrics.BatchSize.Update(int64(len(spans.SpansV1))) + retMe := make([]bool, len(spans.SpansV1)) // Note: this is not the ideal place to do this because collector tags are added to Process.Tags, // and Process can be shared between different spans in the batch, but we no longer know that, // the relation is lost upstream and it's impossible in Go to dedupe pointers. But at least here // we have a single thread updating all spans that may share the same Process, before concurrency // kicks in. - for _, span := range mSpans { + for _, span := range spans.SpansV1 { sp.addCollectorTags(span) } - for i, mSpan := range mSpans { - ok := sp.enqueueSpan(mSpan, options.SpanFormat, options.InboundTransport, options.Tenant) + for i, mSpan := range spans.SpansV1 { + ok := sp.enqueueSpan(mSpan, spans.SpanFormat, spans.InboundTransport, spans.Tenant) if !ok && sp.reportBusy { return nil, processor.ErrBusy } diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index c57f73b4bd8..e62cd41cc6d 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -238,8 +238,10 @@ func TestSpanProcessor(t *testing.T) { p := NewSpanProcessor(w, nil, Options.QueueSize(1)).(*spanProcessor) res, err := p.ProcessSpans( - []*model.Span{{}}, // empty span should be enriched by sanitizers - processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) + processor.Spans{ + SpansV1: []*model.Span{{}}, // empty span should be enriched by sanitizers + SpanFormat: processor.JaegerSpanFormat, + }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) require.NoError(t, p.Close()) @@ -263,13 +265,16 @@ func TestSpanProcessorErrors(t *testing.T) { Options.QueueSize(1), ).(*spanProcessor) - res, err := p.ProcessSpans([]*model.Span{ - { - Process: &model.Process{ - ServiceName: "x", + res, err := p.ProcessSpans(processor.Spans{ + SpansV1: []*model.Span{ + { + Process: &model.Process{ + ServiceName: "x", + }, }, }, - }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) + SpanFormat: processor.JaegerSpanFormat, + }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) @@ -315,23 +320,26 @@ func TestSpanProcessorBusy(t *testing.T) { w.Lock() defer w.Unlock() - res, err := p.ProcessSpans([]*model.Span{ - { - Process: &model.Process{ - ServiceName: "x", + res, err := p.ProcessSpans(processor.Spans{ + SpansV1: []*model.Span{ + { + Process: &model.Process{ + ServiceName: "x", + }, }, - }, - { - Process: &model.Process{ - ServiceName: "x", + { + Process: &model.Process{ + ServiceName: "x", + }, }, - }, - { - Process: &model.Process{ - ServiceName: "x", + { + Process: &model.Process{ + ServiceName: "x", + }, }, }, - }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) + SpanFormat: processor.JaegerSpanFormat, + }) require.Error(t, err, "expecting busy error") assert.Nil(t, res) @@ -612,13 +620,16 @@ func TestAdditionalProcessors(t *testing.T) { // nil doesn't fail p := NewSpanProcessor(w, nil, Options.QueueSize(1)) - res, err := p.ProcessSpans([]*model.Span{ - { - Process: &model.Process{ - ServiceName: "x", + res, err := p.ProcessSpans(processor.Spans{ + SpansV1: []*model.Span{ + { + Process: &model.Process{ + ServiceName: "x", + }, }, }, - }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) + SpanFormat: processor.JaegerSpanFormat, + }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) require.NoError(t, p.Close()) @@ -629,13 +640,16 @@ func TestAdditionalProcessors(t *testing.T) { count++ } p = NewSpanProcessor(w, []ProcessSpan{f}, Options.QueueSize(1)) - res, err = p.ProcessSpans([]*model.Span{ - { - Process: &model.Process{ - ServiceName: "x", + res, err = p.ProcessSpans(processor.Spans{ + SpansV1: []*model.Span{ + { + Process: &model.Process{ + ServiceName: "x", + }, }, }, - }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) + SpanFormat: processor.JaegerSpanFormat, + }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) require.NoError(t, p.Close()) @@ -648,13 +662,14 @@ func TestSpanProcessorContextPropagation(t *testing.T) { dummyTenant := "context-prop-test-tenant" - res, err := p.ProcessSpans([]*model.Span{ - { - Process: &model.Process{ - ServiceName: "x", + res, err := p.ProcessSpans(processor.Spans{ + SpansV1: []*model.Span{ + { + Process: &model.Process{ + ServiceName: "x", + }, }, }, - }, processor.SpansOptions{ Tenant: dummyTenant, }) require.NoError(t, err) @@ -687,10 +702,12 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) { w.Lock() defer w.Unlock() - opts := processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat} - _, err := p.ProcessSpans([]*model.Span{ - {OperationName: "op1"}, - }, opts) + _, err := p.ProcessSpans(processor.Spans{ + SpansV1: []*model.Span{ + {OperationName: "op1"}, + }, + SpanFormat: processor.JaegerSpanFormat, + }) require.NoError(t, err) // Wait for the sole worker to pick the item from the queue and block @@ -700,10 +717,13 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) { // Now the queue is empty again and can accept one more item, but no workers available. // If we send two items, the last one will have to be dropped. - _, err = p.ProcessSpans([]*model.Span{ - {OperationName: "op2"}, - {OperationName: "op3"}, - }, opts) + _, err = p.ProcessSpans(processor.Spans{ + SpansV1: []*model.Span{ + {OperationName: "op2"}, + {OperationName: "op3"}, + }, + SpanFormat: processor.JaegerSpanFormat, + }) require.EqualError(t, err, processor.ErrBusy.Error()) assert.Equal(t, []string{"op3"}, droppedOperations) } From 8ddd3014dee5ffafae1543d9cd2da5578da1d1b7 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sat, 4 Jan 2025 21:29:58 -0500 Subject: [PATCH 2/6] fix Signed-off-by: Yuri Shkuro --- cmd/collector/app/processor/interface.go | 3 ++- cmd/jaeger/internal/integration/trace_writer.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/collector/app/processor/interface.go b/cmd/collector/app/processor/interface.go index 7ef623a271e..2ef0b9a99f7 100644 --- a/cmd/collector/app/processor/interface.go +++ b/cmd/collector/app/processor/interface.go @@ -7,8 +7,9 @@ import ( "errors" "io" - "github.com/jaegertracing/jaeger/model" "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/model" ) // ErrBusy signalizes that processor cannot process incoming data diff --git a/cmd/jaeger/internal/integration/trace_writer.go b/cmd/jaeger/internal/integration/trace_writer.go index c5286aa44d0..999ce34fb4d 100644 --- a/cmd/jaeger/internal/integration/trace_writer.go +++ b/cmd/jaeger/internal/integration/trace_writer.go @@ -81,7 +81,7 @@ func (w *traceWriter) WriteTraces(ctx context.Context, td ptrace.Traces) error { scope ptrace.ScopeSpans resource ptrace.ResourceSpans ) - + if spanCount == MaxChunkSize { err = w.exporter.ConsumeTraces(ctx, currentChunk) currentChunk = ptrace.NewTraces() From 66f17e3da2c13f0cc0ea15ad285c10e48f19635f Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sat, 4 Jan 2025 21:57:48 -0500 Subject: [PATCH 3/6] fix Signed-off-by: Yuri Shkuro --- cmd/collector/app/collector_test.go | 8 +- cmd/collector/app/handler/grpc_handler.go | 16 ++-- .../app/handler/grpc_handler_test.go | 17 ++-- .../app/handler/thrift_span_handler.go | 20 +++-- .../app/handler/thrift_span_handler_test.go | 15 +++- cmd/collector/app/model_consumer.go | 2 +- cmd/collector/app/options.go | 2 +- cmd/collector/app/options_test.go | 4 +- cmd/collector/app/processor/constants.go | 39 +++++++++ cmd/collector/app/processor/interface.go | 85 +++++++++++-------- cmd/collector/app/server/test.go | 2 +- cmd/collector/app/span_processor.go | 23 +++-- cmd/collector/app/span_processor_test.go | 64 ++++++++------ 13 files changed, 197 insertions(+), 100 deletions(-) create mode 100644 cmd/collector/app/processor/constants.go diff --git a/cmd/collector/app/collector_test.go b/cmd/collector/app/collector_test.go index 1c2722494a7..4e42109a2e0 100644 --- a/cmd/collector/app/collector_test.go +++ b/cmd/collector/app/collector_test.go @@ -262,9 +262,11 @@ func TestAggregator(t *testing.T) { }, }, } - _, err := c.spanProcessor.ProcessSpans(processor.Spans{ - SpansV1: spans, - SpanFormat: processor.JaegerSpanFormat, + _, err := c.spanProcessor.ProcessSpans(processor.SpansV1{ + Spans: spans, + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, }) require.NoError(t, err) require.NoError(t, c.Close()) diff --git a/cmd/collector/app/handler/grpc_handler.go b/cmd/collector/app/handler/grpc_handler.go index 3cdf5150217..f88343a3de1 100644 --- a/cmd/collector/app/handler/grpc_handler.go +++ b/cmd/collector/app/handler/grpc_handler.go @@ -47,7 +47,7 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) type batchConsumer struct { logger *zap.Logger spanProcessor processor.SpanProcessor - spanOptions processor.Spans // common settings for all spans + spanOptions processor.Details // common settings for all spans tenancyMgr *tenancy.Manager } @@ -55,7 +55,7 @@ func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, return batchConsumer{ logger: logger, spanProcessor: spanProcessor, - spanOptions: processor.Spans{ + spanOptions: processor.Details{ InboundTransport: transport, SpanFormat: spanFormat, }, @@ -75,11 +75,13 @@ func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error { span.Process = batch.Process } } - _, err = c.spanProcessor.ProcessSpans(processor.Spans{ - SpansV1: batch.Spans, - InboundTransport: c.spanOptions.InboundTransport, - SpanFormat: c.spanOptions.SpanFormat, - Tenant: tenant, + _, err = c.spanProcessor.ProcessSpans(processor.SpansV1{ + Spans: batch.Spans, + Details: processor.Details{ + InboundTransport: c.spanOptions.InboundTransport, + SpanFormat: c.spanOptions.SpanFormat, + Tenant: tenant, + }, }) if err != nil { if errors.Is(err, processor.ErrBusy) { diff --git a/cmd/collector/app/handler/grpc_handler_test.go b/cmd/collector/app/handler/grpc_handler_test.go index 64c43cf3546..44289995b1f 100644 --- a/cmd/collector/app/handler/grpc_handler_test.go +++ b/cmd/collector/app/handler/grpc_handler_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -33,17 +34,21 @@ type mockSpanProcessor struct { spanFormat processor.SpanFormat } -func (p *mockSpanProcessor) ProcessSpans(spans processor.Spans) ([]bool, error) { +func (p *mockSpanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) { p.mux.Lock() defer p.mux.Unlock() - p.spans = append(p.spans, spans.SpansV1...) - oks := make([]bool, len(spans.SpansV1)) + batch.GetSpans(func(spans []*model.Span) { + p.spans = append(p.spans, spans...) + }, func(traces ptrace.Traces) { + panic("not implemented") + }) + oks := make([]bool, len(p.spans)) if p.tenants == nil { p.tenants = make(map[string]bool) } - p.tenants[spans.Tenant] = true - p.transport = spans.InboundTransport - p.spanFormat = spans.SpanFormat + p.tenants[batch.GetTenant()] = true + p.transport = batch.GetInboundTransport() + p.spanFormat = batch.GetSpanFormat() return oks, p.expectedError } diff --git a/cmd/collector/app/handler/thrift_span_handler.go b/cmd/collector/app/handler/thrift_span_handler.go index aea4370af2f..24f4061d1c2 100644 --- a/cmd/collector/app/handler/thrift_span_handler.go +++ b/cmd/collector/app/handler/thrift_span_handler.go @@ -54,10 +54,12 @@ func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options mSpan := jConv.ToDomainSpan(span, batch.Process) mSpans = append(mSpans, mSpan) } - oks, err := jbh.modelProcessor.ProcessSpans(processor.Spans{ - SpansV1: mSpans, - InboundTransport: options.InboundTransport, - SpanFormat: processor.JaegerSpanFormat, + oks, err := jbh.modelProcessor.ProcessSpans(processor.SpansV1{ + Spans: mSpans, + Details: processor.Details{ + InboundTransport: options.InboundTransport, + SpanFormat: processor.JaegerSpanFormat, + }, }) if err != nil { jbh.logger.Error("Collector failed to process span batch", zap.Error(err)) @@ -106,10 +108,12 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options convCount[i] = len(converted) mSpans = append(mSpans, converted...) } - bools, err := h.modelProcessor.ProcessSpans(processor.Spans{ - SpansV1: mSpans, - InboundTransport: options.InboundTransport, - SpanFormat: processor.ZipkinSpanFormat, + bools, err := h.modelProcessor.ProcessSpans(processor.SpansV1{ + Spans: mSpans, + Details: processor.Details{ + InboundTransport: options.InboundTransport, + SpanFormat: processor.ZipkinSpanFormat, + }, }) if err != nil { h.logger.Error("Collector failed to process Zipkin span batch", zap.Error(err)) diff --git a/cmd/collector/app/handler/thrift_span_handler_test.go b/cmd/collector/app/handler/thrift_span_handler_test.go index d926055cb57..f59f221bd3b 100644 --- a/cmd/collector/app/handler/thrift_span_handler_test.go +++ b/cmd/collector/app/handler/thrift_span_handler_test.go @@ -12,10 +12,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" zipkinsanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) @@ -57,12 +59,19 @@ type shouldIErrorProcessor struct { var errTestError = errors.New("Whoops") -func (s *shouldIErrorProcessor) ProcessSpans(spans processor.Spans) ([]bool, error) { +func (s *shouldIErrorProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) { if s.shouldError { return nil, errTestError } - retMe := make([]bool, len(spans.SpansV1)) - for i := range spans.SpansV1 { + var spans []*model.Span + batch.GetSpans(func(sp []*model.Span) { + spans = sp + }, func(traces ptrace.Traces) { + panic("not implemented") + }) + + retMe := make([]bool, len(spans)) + for i := range spans { retMe[i] = true } return retMe, nil diff --git a/cmd/collector/app/model_consumer.go b/cmd/collector/app/model_consumer.go index 0ad848f38b6..e94377d49ae 100644 --- a/cmd/collector/app/model_consumer.go +++ b/cmd/collector/app/model_consumer.go @@ -13,7 +13,7 @@ import ( type ProcessSpan func(span *model.Span, tenant string) // ProcessSpans processes a batch of Domain Model Spans -type ProcessSpans func(spans processor.Spans) +type ProcessSpans func(spans processor.Batch) // FilterSpan decides whether to allow or disallow a span type FilterSpan func(span *model.Span) bool diff --git a/cmd/collector/app/options.go b/cmd/collector/app/options.go index 21b296a3e74..3d8f745ae50 100644 --- a/cmd/collector/app/options.go +++ b/cmd/collector/app/options.go @@ -176,7 +176,7 @@ func (options) apply(opts ...Option) options { ret.hostMetrics = metrics.NullFactory } if ret.preProcessSpans == nil { - ret.preProcessSpans = func(_ processor.Spans) {} + ret.preProcessSpans = func(_ processor.Batch) {} } if ret.sanitizer == nil { ret.sanitizer = func(span *model.Span) *model.Span { return span } diff --git a/cmd/collector/app/options_test.go b/cmd/collector/app/options_test.go index ed06f5a707d..1bd4791e8b2 100644 --- a/cmd/collector/app/options_test.go +++ b/cmd/collector/app/options_test.go @@ -27,7 +27,7 @@ func TestAllOptionSet(t *testing.T) { Options.ServiceMetrics(metrics.NullFactory), Options.Logger(zap.NewNop()), Options.NumWorkers(5), - Options.PreProcessSpans(func(_ processor.Spans) {}), + Options.PreProcessSpans(func(_ processor.Batch) {}), Options.Sanitizer(func(span *model.Span) *model.Span { return span }), Options.QueueSize(10), Options.DynQueueSizeWarmup(1000), @@ -53,7 +53,7 @@ func TestNoOptionsSet(t *testing.T) { assert.Nil(t, opts.collectorTags) assert.False(t, opts.reportBusy) assert.False(t, opts.blockingSubmit) - assert.NotPanics(t, func() { opts.preProcessSpans(processor.Spans{}) }) + assert.NotPanics(t, func() { opts.preProcessSpans(processor.SpansV1{}) }) assert.NotPanics(t, func() { opts.preSave(nil, "") }) assert.True(t, opts.spanFilter(nil)) span := model.Span{} diff --git a/cmd/collector/app/processor/constants.go b/cmd/collector/app/processor/constants.go new file mode 100644 index 00000000000..e639bc5a539 --- /dev/null +++ b/cmd/collector/app/processor/constants.go @@ -0,0 +1,39 @@ +// Copyright (c) 2020 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package processor + +import ( + "errors" +) + +// ErrBusy signalizes that processor cannot process incoming data +var ErrBusy = errors.New("server busy") + +// InboundTransport identifies the transport used to receive spans. +type InboundTransport string + +const ( + // GRPCTransport indicates spans received over gRPC. + GRPCTransport InboundTransport = "grpc" + // HTTPTransport indicates spans received over HTTP. + HTTPTransport InboundTransport = "http" + // UnknownTransport is the fallback/catch-all category. + UnknownTransport InboundTransport = "unknown" +) + +// SpanFormat identifies the data format in which the span was originally received. +type SpanFormat string + +const ( + // JaegerSpanFormat is for Jaeger Thrift spans. + JaegerSpanFormat SpanFormat = "jaeger" + // ZipkinSpanFormat is for Zipkin Thrift spans. + ZipkinSpanFormat SpanFormat = "zipkin" + // ProtoSpanFormat is for Jaeger protobuf Spans. + ProtoSpanFormat SpanFormat = "proto" + // OTLPSpanFormat is for OpenTelemetry OTLP format. + OTLPSpanFormat SpanFormat = "otlp" + // UnknownSpanFormat is the fallback/catch-all category. + UnknownSpanFormat SpanFormat = "unknown" +) diff --git a/cmd/collector/app/processor/interface.go b/cmd/collector/app/processor/interface.go index 2ef0b9a99f7..3ec3fae4c23 100644 --- a/cmd/collector/app/processor/interface.go +++ b/cmd/collector/app/processor/interface.go @@ -4,7 +4,6 @@ package processor import ( - "errors" "io" "go.opentelemetry.io/collector/pdata/ptrace" @@ -12,49 +11,61 @@ import ( "github.com/jaegertracing/jaeger/model" ) -// ErrBusy signalizes that processor cannot process incoming data -var ErrBusy = errors.New("server busy") +var ( + _ Batch = (*SpansV1)(nil) + _ Batch = (*SpansV2)(nil) +) + +// Batch is a batch of spans passed to the processor. +type Batch interface { + // Spans calls the appropriate function based on the type of spans being processed. + GetSpans(v1 func(spans []*model.Span), v2 func(traces ptrace.Traces)) + + GetSpanFormat() SpanFormat + GetInboundTransport() InboundTransport + GetTenant() string +} -// SpansOptions additional options passed to processor along with the spans. -type Spans struct { - SpansV1 []*model.Span - SpansV2 ptrace.Traces +// SpanProcessor handles spans +type SpanProcessor interface { + // ProcessSpans processes spans and return with either a list of true/false success or an error + ProcessSpans(spans Batch) ([]bool, error) + io.Closer +} + +type Details struct { SpanFormat SpanFormat InboundTransport InboundTransport Tenant string } -// SpanProcessor handles model spans -type SpanProcessor interface { - // ProcessSpans processes model spans and return with either a list of true/false success or an error - ProcessSpans(spans Spans) ([]bool, error) - io.Closer +// Spans is a batch of spans passed to the processor. +type SpansV1 struct { + Spans []*model.Span + Details } -// InboundTransport identifies the transport used to receive spans. -type InboundTransport string +type SpansV2 struct { + Traces ptrace.Traces + Details +} -const ( - // GRPCTransport indicates spans received over gRPC. - GRPCTransport InboundTransport = "grpc" - // HTTPTransport indicates spans received over HTTP. - HTTPTransport InboundTransport = "http" - // UnknownTransport is the fallback/catch-all category. - UnknownTransport InboundTransport = "unknown" -) +func (s SpansV1) GetSpans(v1 func([]*model.Span), v2 func(ptrace.Traces)) { + v1(s.Spans) +} -// SpanFormat identifies the data format in which the span was originally received. -type SpanFormat string - -const ( - // JaegerSpanFormat is for Jaeger Thrift spans. - JaegerSpanFormat SpanFormat = "jaeger" - // ZipkinSpanFormat is for Zipkin Thrift spans. - ZipkinSpanFormat SpanFormat = "zipkin" - // ProtoSpanFormat is for Jaeger protobuf Spans. - ProtoSpanFormat SpanFormat = "proto" - // OTLPSpanFormat is for OpenTelemetry OTLP format. - OTLPSpanFormat SpanFormat = "otlp" - // UnknownSpanFormat is the fallback/catch-all category. - UnknownSpanFormat SpanFormat = "unknown" -) +func (s SpansV2) GetSpans(v1 func([]*model.Span), v2 func(ptrace.Traces)) { + v2(s.Traces) +} + +func (d Details) GetSpanFormat() SpanFormat { + return d.SpanFormat +} + +func (d Details) GetInboundTransport() InboundTransport { + return d.InboundTransport +} + +func (d Details) GetTenant() string { + return d.Tenant +} diff --git a/cmd/collector/app/server/test.go b/cmd/collector/app/server/test.go index fa92dbf0ab1..a047191f6b1 100644 --- a/cmd/collector/app/server/test.go +++ b/cmd/collector/app/server/test.go @@ -26,6 +26,6 @@ func (*mockSpanProcessor) Close() error { return nil } -func (*mockSpanProcessor) ProcessSpans(_ processor.Spans) ([]bool, error) { +func (*mockSpanProcessor) ProcessSpans(_ processor.Batch) ([]bool, error) { return []bool{}, nil } diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index abea3c9a19b..f866b009e7d 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "time" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" @@ -164,22 +165,29 @@ func (sp *spanProcessor) countSpan(span *model.Span, _ string /* tenant */) { sp.spansProcessed.Add(1) } -func (sp *spanProcessor) ProcessSpans(spans processor.Spans) ([]bool, error) { - sp.preProcessSpans(spans) - sp.metrics.BatchSize.Update(int64(len(spans.SpansV1))) - retMe := make([]bool, len(spans.SpansV1)) +func (sp *spanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) { + sp.preProcessSpans(batch) + var spans []*model.Span + batch.GetSpans(func(spansV1 []*model.Span) { + spans = spansV1 + }, func(traces ptrace.Traces) { + panic("not implemented") + }) + sp.metrics.BatchSize.Update(int64(len(spans))) + retMe := make([]bool, len(spans)) // Note: this is not the ideal place to do this because collector tags are added to Process.Tags, // and Process can be shared between different spans in the batch, but we no longer know that, // the relation is lost upstream and it's impossible in Go to dedupe pointers. But at least here // we have a single thread updating all spans that may share the same Process, before concurrency // kicks in. - for _, span := range spans.SpansV1 { + for _, span := range spans { sp.addCollectorTags(span) } - for i, mSpan := range spans.SpansV1 { - ok := sp.enqueueSpan(mSpan, spans.SpanFormat, spans.InboundTransport, spans.Tenant) + for i, mSpan := range spans { + // TODO does this have to be one span at a time? + ok := sp.enqueueSpan(mSpan, batch.GetSpanFormat(), batch.GetInboundTransport(), batch.GetTenant()) if !ok && sp.reportBusy { return nil, processor.ErrBusy } @@ -189,6 +197,7 @@ func (sp *spanProcessor) ProcessSpans(spans processor.Spans) ([]bool, error) { } func (sp *spanProcessor) processItemFromQueue(item *queueItem) { + // TODO calling sanitized here contradicts the comment in enqueueSpan that warns about shared Process. sp.processSpan(sp.sanitizer(item.span), item.tenant) sp.metrics.InQueueLatency.Record(time.Since(item.queuedTime)) } diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index e62cd41cc6d..f52494098e9 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -238,9 +238,11 @@ func TestSpanProcessor(t *testing.T) { p := NewSpanProcessor(w, nil, Options.QueueSize(1)).(*spanProcessor) res, err := p.ProcessSpans( - processor.Spans{ - SpansV1: []*model.Span{{}}, // empty span should be enriched by sanitizers - SpanFormat: processor.JaegerSpanFormat, + processor.SpansV1{ + Spans: []*model.Span{{}}, // empty span should be enriched by sanitizers + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) @@ -265,15 +267,17 @@ func TestSpanProcessorErrors(t *testing.T) { Options.QueueSize(1), ).(*spanProcessor) - res, err := p.ProcessSpans(processor.Spans{ - SpansV1: []*model.Span{ + res, err := p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ { Process: &model.Process{ ServiceName: "x", }, }, }, - SpanFormat: processor.JaegerSpanFormat, + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) @@ -320,8 +324,8 @@ func TestSpanProcessorBusy(t *testing.T) { w.Lock() defer w.Unlock() - res, err := p.ProcessSpans(processor.Spans{ - SpansV1: []*model.Span{ + res, err := p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ { Process: &model.Process{ ServiceName: "x", @@ -338,7 +342,9 @@ func TestSpanProcessorBusy(t *testing.T) { }, }, }, - SpanFormat: processor.JaegerSpanFormat, + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, }) require.Error(t, err, "expecting busy error") @@ -620,15 +626,17 @@ func TestAdditionalProcessors(t *testing.T) { // nil doesn't fail p := NewSpanProcessor(w, nil, Options.QueueSize(1)) - res, err := p.ProcessSpans(processor.Spans{ - SpansV1: []*model.Span{ + res, err := p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ { Process: &model.Process{ ServiceName: "x", }, }, }, - SpanFormat: processor.JaegerSpanFormat, + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) @@ -640,15 +648,17 @@ func TestAdditionalProcessors(t *testing.T) { count++ } p = NewSpanProcessor(w, []ProcessSpan{f}, Options.QueueSize(1)) - res, err = p.ProcessSpans(processor.Spans{ - SpansV1: []*model.Span{ + res, err = p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ { Process: &model.Process{ ServiceName: "x", }, }, }, - SpanFormat: processor.JaegerSpanFormat, + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) @@ -662,15 +672,17 @@ func TestSpanProcessorContextPropagation(t *testing.T) { dummyTenant := "context-prop-test-tenant" - res, err := p.ProcessSpans(processor.Spans{ - SpansV1: []*model.Span{ + res, err := p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ { Process: &model.Process{ ServiceName: "x", }, }, }, - Tenant: dummyTenant, + Details: processor.Details{ + Tenant: dummyTenant, + }, }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) @@ -702,11 +714,13 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) { w.Lock() defer w.Unlock() - _, err := p.ProcessSpans(processor.Spans{ - SpansV1: []*model.Span{ + _, err := p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ {OperationName: "op1"}, }, - SpanFormat: processor.JaegerSpanFormat, + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, }) require.NoError(t, err) @@ -717,12 +731,14 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) { // Now the queue is empty again and can accept one more item, but no workers available. // If we send two items, the last one will have to be dropped. - _, err = p.ProcessSpans(processor.Spans{ - SpansV1: []*model.Span{ + _, err = p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ {OperationName: "op2"}, {OperationName: "op3"}, }, - SpanFormat: processor.JaegerSpanFormat, + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, }) require.EqualError(t, err, processor.ErrBusy.Error()) assert.Equal(t, []string{"op3"}, droppedOperations) From 4a370fb04b0efaa975fce89446ffccf5128d677c Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sat, 4 Jan 2025 22:04:38 -0500 Subject: [PATCH 4/6] delint Signed-off-by: Yuri Shkuro --- cmd/collector/app/handler/grpc_handler_test.go | 2 +- cmd/collector/app/handler/thrift_span_handler_test.go | 2 +- cmd/collector/app/processor/interface.go | 6 +++--- cmd/collector/app/span_processor.go | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cmd/collector/app/handler/grpc_handler_test.go b/cmd/collector/app/handler/grpc_handler_test.go index 44289995b1f..e7940e916ed 100644 --- a/cmd/collector/app/handler/grpc_handler_test.go +++ b/cmd/collector/app/handler/grpc_handler_test.go @@ -39,7 +39,7 @@ func (p *mockSpanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) defer p.mux.Unlock() batch.GetSpans(func(spans []*model.Span) { p.spans = append(p.spans, spans...) - }, func(traces ptrace.Traces) { + }, func(_ ptrace.Traces) { panic("not implemented") }) oks := make([]bool, len(p.spans)) diff --git a/cmd/collector/app/handler/thrift_span_handler_test.go b/cmd/collector/app/handler/thrift_span_handler_test.go index f59f221bd3b..16e5f5fe7c2 100644 --- a/cmd/collector/app/handler/thrift_span_handler_test.go +++ b/cmd/collector/app/handler/thrift_span_handler_test.go @@ -66,7 +66,7 @@ func (s *shouldIErrorProcessor) ProcessSpans(batch processor.Batch) ([]bool, err var spans []*model.Span batch.GetSpans(func(sp []*model.Span) { spans = sp - }, func(traces ptrace.Traces) { + }, func(_ ptrace.Traces) { panic("not implemented") }) diff --git a/cmd/collector/app/processor/interface.go b/cmd/collector/app/processor/interface.go index 3ec3fae4c23..363a27d20de 100644 --- a/cmd/collector/app/processor/interface.go +++ b/cmd/collector/app/processor/interface.go @@ -18,7 +18,7 @@ var ( // Batch is a batch of spans passed to the processor. type Batch interface { - // Spans calls the appropriate function based on the type of spans being processed. + // GetSpans delegates to the appropriate function based on the data model version. GetSpans(v1 func(spans []*model.Span), v2 func(traces ptrace.Traces)) GetSpanFormat() SpanFormat @@ -50,11 +50,11 @@ type SpansV2 struct { Details } -func (s SpansV1) GetSpans(v1 func([]*model.Span), v2 func(ptrace.Traces)) { +func (s SpansV1) GetSpans(v1 func([]*model.Span), _ func(ptrace.Traces)) { v1(s.Spans) } -func (s SpansV2) GetSpans(v1 func([]*model.Span), v2 func(ptrace.Traces)) { +func (s SpansV2) GetSpans(_ func([]*model.Span), v2 func(ptrace.Traces)) { v2(s.Traces) } diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index f866b009e7d..a3541cd3c6c 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -170,7 +170,7 @@ func (sp *spanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) { var spans []*model.Span batch.GetSpans(func(spansV1 []*model.Span) { spans = spansV1 - }, func(traces ptrace.Traces) { + }, func(_ ptrace.Traces) { panic("not implemented") }) sp.metrics.BatchSize.Update(int64(len(spans))) @@ -197,7 +197,7 @@ func (sp *spanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) { } func (sp *spanProcessor) processItemFromQueue(item *queueItem) { - // TODO calling sanitized here contradicts the comment in enqueueSpan that warns about shared Process. + // TODO calling sanitizer here contradicts the comment in enqueueSpan about immutable Process. sp.processSpan(sp.sanitizer(item.span), item.tenant) sp.metrics.InQueueLatency.Record(time.Since(item.queuedTime)) } From 85d9c20abb0bd6ed56b1c34f23939453702c70e1 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sat, 4 Jan 2025 22:14:19 -0500 Subject: [PATCH 5/6] fix Signed-off-by: Yuri Shkuro --- .../{empty_test.go => package_test.go} | 0 .../processor/{interface.go => processor.go} | 0 cmd/collector/app/processor/processor_test.go | 66 +++++++++++++++++++ 3 files changed, 66 insertions(+) rename cmd/collector/app/processor/{empty_test.go => package_test.go} (100%) rename cmd/collector/app/processor/{interface.go => processor.go} (100%) create mode 100644 cmd/collector/app/processor/processor_test.go diff --git a/cmd/collector/app/processor/empty_test.go b/cmd/collector/app/processor/package_test.go similarity index 100% rename from cmd/collector/app/processor/empty_test.go rename to cmd/collector/app/processor/package_test.go diff --git a/cmd/collector/app/processor/interface.go b/cmd/collector/app/processor/processor.go similarity index 100% rename from cmd/collector/app/processor/interface.go rename to cmd/collector/app/processor/processor.go diff --git a/cmd/collector/app/processor/processor_test.go b/cmd/collector/app/processor/processor_test.go new file mode 100644 index 00000000000..5391fea887d --- /dev/null +++ b/cmd/collector/app/processor/processor_test.go @@ -0,0 +1,66 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package processor + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/model" +) + +func TestDetails(t *testing.T) { + d := Details{ + SpanFormat: JaegerSpanFormat, + InboundTransport: InboundTransport("grpc"), + Tenant: "tenant", + } + assert.Equal(t, JaegerSpanFormat, d.GetSpanFormat()) + assert.Equal(t, InboundTransport("grpc"), d.GetInboundTransport()) + assert.Equal(t, "tenant", d.GetTenant()) +} + +func TestSpansV1(t *testing.T) { + s := SpansV1{ + Spans: []*model.Span{{}}, + Details: Details{ + SpanFormat: JaegerSpanFormat, + InboundTransport: InboundTransport("grpc"), + Tenant: "tenant", + }, + } + var spans []*model.Span + s.GetSpans(func(s []*model.Span) { + spans = s + }, func(_ ptrace.Traces) { + panic("not implemented") + }) + assert.Equal(t, []*model.Span{{}}, spans) + assert.Equal(t, JaegerSpanFormat, s.GetSpanFormat()) + assert.Equal(t, InboundTransport("grpc"), s.GetInboundTransport()) + assert.Equal(t, "tenant", s.GetTenant()) +} + +func TestSpansV2(t *testing.T) { + s := SpansV2{ + Traces: ptrace.NewTraces(), + Details: Details{ + SpanFormat: JaegerSpanFormat, + InboundTransport: InboundTransport("grpc"), + Tenant: "tenant", + }, + } + var traces ptrace.Traces + s.GetSpans(func(_ []*model.Span) { + panic("not implemented") + }, func(t ptrace.Traces) { + traces = t + }) + assert.Equal(t, ptrace.NewTraces(), traces) + assert.Equal(t, JaegerSpanFormat, s.GetSpanFormat()) + assert.Equal(t, InboundTransport("grpc"), s.GetInboundTransport()) + assert.Equal(t, "tenant", s.GetTenant()) +} From 809abdb296ea7daf2170f2adbf030d9eb1ae5e1b Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sat, 4 Jan 2025 22:16:19 -0500 Subject: [PATCH 6/6] fix Signed-off-by: Yuri Shkuro --- cmd/collector/app/processor/processor_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/collector/app/processor/processor_test.go b/cmd/collector/app/processor/processor_test.go index 5391fea887d..d187a23c96e 100644 --- a/cmd/collector/app/processor/processor_test.go +++ b/cmd/collector/app/processor/processor_test.go @@ -15,11 +15,11 @@ import ( func TestDetails(t *testing.T) { d := Details{ SpanFormat: JaegerSpanFormat, - InboundTransport: InboundTransport("grpc"), + InboundTransport: GRPCTransport, Tenant: "tenant", } assert.Equal(t, JaegerSpanFormat, d.GetSpanFormat()) - assert.Equal(t, InboundTransport("grpc"), d.GetInboundTransport()) + assert.Equal(t, GRPCTransport, d.GetInboundTransport()) assert.Equal(t, "tenant", d.GetTenant()) } @@ -28,7 +28,7 @@ func TestSpansV1(t *testing.T) { Spans: []*model.Span{{}}, Details: Details{ SpanFormat: JaegerSpanFormat, - InboundTransport: InboundTransport("grpc"), + InboundTransport: GRPCTransport, Tenant: "tenant", }, } @@ -40,7 +40,7 @@ func TestSpansV1(t *testing.T) { }) assert.Equal(t, []*model.Span{{}}, spans) assert.Equal(t, JaegerSpanFormat, s.GetSpanFormat()) - assert.Equal(t, InboundTransport("grpc"), s.GetInboundTransport()) + assert.Equal(t, GRPCTransport, s.GetInboundTransport()) assert.Equal(t, "tenant", s.GetTenant()) } @@ -49,7 +49,7 @@ func TestSpansV2(t *testing.T) { Traces: ptrace.NewTraces(), Details: Details{ SpanFormat: JaegerSpanFormat, - InboundTransport: InboundTransport("grpc"), + InboundTransport: GRPCTransport, Tenant: "tenant", }, } @@ -61,6 +61,6 @@ func TestSpansV2(t *testing.T) { }) assert.Equal(t, ptrace.NewTraces(), traces) assert.Equal(t, JaegerSpanFormat, s.GetSpanFormat()) - assert.Equal(t, InboundTransport("grpc"), s.GetInboundTransport()) + assert.Equal(t, GRPCTransport, s.GetInboundTransport()) assert.Equal(t, "tenant", s.GetTenant()) }