From 66f17e3da2c13f0cc0ea15ad285c10e48f19635f Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sat, 4 Jan 2025 21:57:48 -0500 Subject: [PATCH] fix Signed-off-by: Yuri Shkuro --- cmd/collector/app/collector_test.go | 8 +- cmd/collector/app/handler/grpc_handler.go | 16 ++-- .../app/handler/grpc_handler_test.go | 17 ++-- .../app/handler/thrift_span_handler.go | 20 +++-- .../app/handler/thrift_span_handler_test.go | 15 +++- cmd/collector/app/model_consumer.go | 2 +- cmd/collector/app/options.go | 2 +- cmd/collector/app/options_test.go | 4 +- cmd/collector/app/processor/constants.go | 39 +++++++++ cmd/collector/app/processor/interface.go | 85 +++++++++++-------- cmd/collector/app/server/test.go | 2 +- cmd/collector/app/span_processor.go | 23 +++-- cmd/collector/app/span_processor_test.go | 64 ++++++++------ 13 files changed, 197 insertions(+), 100 deletions(-) create mode 100644 cmd/collector/app/processor/constants.go diff --git a/cmd/collector/app/collector_test.go b/cmd/collector/app/collector_test.go index 1c2722494a7..4e42109a2e0 100644 --- a/cmd/collector/app/collector_test.go +++ b/cmd/collector/app/collector_test.go @@ -262,9 +262,11 @@ func TestAggregator(t *testing.T) { }, }, } - _, err := c.spanProcessor.ProcessSpans(processor.Spans{ - SpansV1: spans, - SpanFormat: processor.JaegerSpanFormat, + _, err := c.spanProcessor.ProcessSpans(processor.SpansV1{ + Spans: spans, + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, }) require.NoError(t, err) require.NoError(t, c.Close()) diff --git a/cmd/collector/app/handler/grpc_handler.go b/cmd/collector/app/handler/grpc_handler.go index 3cdf5150217..f88343a3de1 100644 --- a/cmd/collector/app/handler/grpc_handler.go +++ b/cmd/collector/app/handler/grpc_handler.go @@ -47,7 +47,7 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) type batchConsumer struct { logger *zap.Logger spanProcessor processor.SpanProcessor - spanOptions processor.Spans // common settings for all spans + spanOptions processor.Details // common settings for all spans tenancyMgr *tenancy.Manager } @@ -55,7 +55,7 @@ func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, return batchConsumer{ logger: logger, spanProcessor: spanProcessor, - spanOptions: processor.Spans{ + spanOptions: processor.Details{ InboundTransport: transport, SpanFormat: spanFormat, }, @@ -75,11 +75,13 @@ func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error { span.Process = batch.Process } } - _, err = c.spanProcessor.ProcessSpans(processor.Spans{ - SpansV1: batch.Spans, - InboundTransport: c.spanOptions.InboundTransport, - SpanFormat: c.spanOptions.SpanFormat, - Tenant: tenant, + _, err = c.spanProcessor.ProcessSpans(processor.SpansV1{ + Spans: batch.Spans, + Details: processor.Details{ + InboundTransport: c.spanOptions.InboundTransport, + SpanFormat: c.spanOptions.SpanFormat, + Tenant: tenant, + }, }) if err != nil { if errors.Is(err, processor.ErrBusy) { diff --git a/cmd/collector/app/handler/grpc_handler_test.go b/cmd/collector/app/handler/grpc_handler_test.go index 64c43cf3546..44289995b1f 100644 --- a/cmd/collector/app/handler/grpc_handler_test.go +++ b/cmd/collector/app/handler/grpc_handler_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -33,17 +34,21 @@ type mockSpanProcessor struct { spanFormat processor.SpanFormat } -func (p *mockSpanProcessor) ProcessSpans(spans processor.Spans) ([]bool, error) { +func (p *mockSpanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) { p.mux.Lock() defer p.mux.Unlock() - p.spans = append(p.spans, spans.SpansV1...) - oks := make([]bool, len(spans.SpansV1)) + batch.GetSpans(func(spans []*model.Span) { + p.spans = append(p.spans, spans...) + }, func(traces ptrace.Traces) { + panic("not implemented") + }) + oks := make([]bool, len(p.spans)) if p.tenants == nil { p.tenants = make(map[string]bool) } - p.tenants[spans.Tenant] = true - p.transport = spans.InboundTransport - p.spanFormat = spans.SpanFormat + p.tenants[batch.GetTenant()] = true + p.transport = batch.GetInboundTransport() + p.spanFormat = batch.GetSpanFormat() return oks, p.expectedError } diff --git a/cmd/collector/app/handler/thrift_span_handler.go b/cmd/collector/app/handler/thrift_span_handler.go index aea4370af2f..24f4061d1c2 100644 --- a/cmd/collector/app/handler/thrift_span_handler.go +++ b/cmd/collector/app/handler/thrift_span_handler.go @@ -54,10 +54,12 @@ func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options mSpan := jConv.ToDomainSpan(span, batch.Process) mSpans = append(mSpans, mSpan) } - oks, err := jbh.modelProcessor.ProcessSpans(processor.Spans{ - SpansV1: mSpans, - InboundTransport: options.InboundTransport, - SpanFormat: processor.JaegerSpanFormat, + oks, err := jbh.modelProcessor.ProcessSpans(processor.SpansV1{ + Spans: mSpans, + Details: processor.Details{ + InboundTransport: options.InboundTransport, + SpanFormat: processor.JaegerSpanFormat, + }, }) if err != nil { jbh.logger.Error("Collector failed to process span batch", zap.Error(err)) @@ -106,10 +108,12 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options convCount[i] = len(converted) mSpans = append(mSpans, converted...) } - bools, err := h.modelProcessor.ProcessSpans(processor.Spans{ - SpansV1: mSpans, - InboundTransport: options.InboundTransport, - SpanFormat: processor.ZipkinSpanFormat, + bools, err := h.modelProcessor.ProcessSpans(processor.SpansV1{ + Spans: mSpans, + Details: processor.Details{ + InboundTransport: options.InboundTransport, + SpanFormat: processor.ZipkinSpanFormat, + }, }) if err != nil { h.logger.Error("Collector failed to process Zipkin span batch", zap.Error(err)) diff --git a/cmd/collector/app/handler/thrift_span_handler_test.go b/cmd/collector/app/handler/thrift_span_handler_test.go index d926055cb57..f59f221bd3b 100644 --- a/cmd/collector/app/handler/thrift_span_handler_test.go +++ b/cmd/collector/app/handler/thrift_span_handler_test.go @@ -12,10 +12,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" zipkinsanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) @@ -57,12 +59,19 @@ type shouldIErrorProcessor struct { var errTestError = errors.New("Whoops") -func (s *shouldIErrorProcessor) ProcessSpans(spans processor.Spans) ([]bool, error) { +func (s *shouldIErrorProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) { if s.shouldError { return nil, errTestError } - retMe := make([]bool, len(spans.SpansV1)) - for i := range spans.SpansV1 { + var spans []*model.Span + batch.GetSpans(func(sp []*model.Span) { + spans = sp + }, func(traces ptrace.Traces) { + panic("not implemented") + }) + + retMe := make([]bool, len(spans)) + for i := range spans { retMe[i] = true } return retMe, nil diff --git a/cmd/collector/app/model_consumer.go b/cmd/collector/app/model_consumer.go index 0ad848f38b6..e94377d49ae 100644 --- a/cmd/collector/app/model_consumer.go +++ b/cmd/collector/app/model_consumer.go @@ -13,7 +13,7 @@ import ( type ProcessSpan func(span *model.Span, tenant string) // ProcessSpans processes a batch of Domain Model Spans -type ProcessSpans func(spans processor.Spans) +type ProcessSpans func(spans processor.Batch) // FilterSpan decides whether to allow or disallow a span type FilterSpan func(span *model.Span) bool diff --git a/cmd/collector/app/options.go b/cmd/collector/app/options.go index 21b296a3e74..3d8f745ae50 100644 --- a/cmd/collector/app/options.go +++ b/cmd/collector/app/options.go @@ -176,7 +176,7 @@ func (options) apply(opts ...Option) options { ret.hostMetrics = metrics.NullFactory } if ret.preProcessSpans == nil { - ret.preProcessSpans = func(_ processor.Spans) {} + ret.preProcessSpans = func(_ processor.Batch) {} } if ret.sanitizer == nil { ret.sanitizer = func(span *model.Span) *model.Span { return span } diff --git a/cmd/collector/app/options_test.go b/cmd/collector/app/options_test.go index ed06f5a707d..1bd4791e8b2 100644 --- a/cmd/collector/app/options_test.go +++ b/cmd/collector/app/options_test.go @@ -27,7 +27,7 @@ func TestAllOptionSet(t *testing.T) { Options.ServiceMetrics(metrics.NullFactory), Options.Logger(zap.NewNop()), Options.NumWorkers(5), - Options.PreProcessSpans(func(_ processor.Spans) {}), + Options.PreProcessSpans(func(_ processor.Batch) {}), Options.Sanitizer(func(span *model.Span) *model.Span { return span }), Options.QueueSize(10), Options.DynQueueSizeWarmup(1000), @@ -53,7 +53,7 @@ func TestNoOptionsSet(t *testing.T) { assert.Nil(t, opts.collectorTags) assert.False(t, opts.reportBusy) assert.False(t, opts.blockingSubmit) - assert.NotPanics(t, func() { opts.preProcessSpans(processor.Spans{}) }) + assert.NotPanics(t, func() { opts.preProcessSpans(processor.SpansV1{}) }) assert.NotPanics(t, func() { opts.preSave(nil, "") }) assert.True(t, opts.spanFilter(nil)) span := model.Span{} diff --git a/cmd/collector/app/processor/constants.go b/cmd/collector/app/processor/constants.go new file mode 100644 index 00000000000..e639bc5a539 --- /dev/null +++ b/cmd/collector/app/processor/constants.go @@ -0,0 +1,39 @@ +// Copyright (c) 2020 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package processor + +import ( + "errors" +) + +// ErrBusy signalizes that processor cannot process incoming data +var ErrBusy = errors.New("server busy") + +// InboundTransport identifies the transport used to receive spans. +type InboundTransport string + +const ( + // GRPCTransport indicates spans received over gRPC. + GRPCTransport InboundTransport = "grpc" + // HTTPTransport indicates spans received over HTTP. + HTTPTransport InboundTransport = "http" + // UnknownTransport is the fallback/catch-all category. + UnknownTransport InboundTransport = "unknown" +) + +// SpanFormat identifies the data format in which the span was originally received. +type SpanFormat string + +const ( + // JaegerSpanFormat is for Jaeger Thrift spans. + JaegerSpanFormat SpanFormat = "jaeger" + // ZipkinSpanFormat is for Zipkin Thrift spans. + ZipkinSpanFormat SpanFormat = "zipkin" + // ProtoSpanFormat is for Jaeger protobuf Spans. + ProtoSpanFormat SpanFormat = "proto" + // OTLPSpanFormat is for OpenTelemetry OTLP format. + OTLPSpanFormat SpanFormat = "otlp" + // UnknownSpanFormat is the fallback/catch-all category. + UnknownSpanFormat SpanFormat = "unknown" +) diff --git a/cmd/collector/app/processor/interface.go b/cmd/collector/app/processor/interface.go index 2ef0b9a99f7..3ec3fae4c23 100644 --- a/cmd/collector/app/processor/interface.go +++ b/cmd/collector/app/processor/interface.go @@ -4,7 +4,6 @@ package processor import ( - "errors" "io" "go.opentelemetry.io/collector/pdata/ptrace" @@ -12,49 +11,61 @@ import ( "github.com/jaegertracing/jaeger/model" ) -// ErrBusy signalizes that processor cannot process incoming data -var ErrBusy = errors.New("server busy") +var ( + _ Batch = (*SpansV1)(nil) + _ Batch = (*SpansV2)(nil) +) + +// Batch is a batch of spans passed to the processor. +type Batch interface { + // Spans calls the appropriate function based on the type of spans being processed. + GetSpans(v1 func(spans []*model.Span), v2 func(traces ptrace.Traces)) + + GetSpanFormat() SpanFormat + GetInboundTransport() InboundTransport + GetTenant() string +} -// SpansOptions additional options passed to processor along with the spans. -type Spans struct { - SpansV1 []*model.Span - SpansV2 ptrace.Traces +// SpanProcessor handles spans +type SpanProcessor interface { + // ProcessSpans processes spans and return with either a list of true/false success or an error + ProcessSpans(spans Batch) ([]bool, error) + io.Closer +} + +type Details struct { SpanFormat SpanFormat InboundTransport InboundTransport Tenant string } -// SpanProcessor handles model spans -type SpanProcessor interface { - // ProcessSpans processes model spans and return with either a list of true/false success or an error - ProcessSpans(spans Spans) ([]bool, error) - io.Closer +// Spans is a batch of spans passed to the processor. +type SpansV1 struct { + Spans []*model.Span + Details } -// InboundTransport identifies the transport used to receive spans. -type InboundTransport string +type SpansV2 struct { + Traces ptrace.Traces + Details +} -const ( - // GRPCTransport indicates spans received over gRPC. - GRPCTransport InboundTransport = "grpc" - // HTTPTransport indicates spans received over HTTP. - HTTPTransport InboundTransport = "http" - // UnknownTransport is the fallback/catch-all category. - UnknownTransport InboundTransport = "unknown" -) +func (s SpansV1) GetSpans(v1 func([]*model.Span), v2 func(ptrace.Traces)) { + v1(s.Spans) +} -// SpanFormat identifies the data format in which the span was originally received. -type SpanFormat string - -const ( - // JaegerSpanFormat is for Jaeger Thrift spans. - JaegerSpanFormat SpanFormat = "jaeger" - // ZipkinSpanFormat is for Zipkin Thrift spans. - ZipkinSpanFormat SpanFormat = "zipkin" - // ProtoSpanFormat is for Jaeger protobuf Spans. - ProtoSpanFormat SpanFormat = "proto" - // OTLPSpanFormat is for OpenTelemetry OTLP format. - OTLPSpanFormat SpanFormat = "otlp" - // UnknownSpanFormat is the fallback/catch-all category. - UnknownSpanFormat SpanFormat = "unknown" -) +func (s SpansV2) GetSpans(v1 func([]*model.Span), v2 func(ptrace.Traces)) { + v2(s.Traces) +} + +func (d Details) GetSpanFormat() SpanFormat { + return d.SpanFormat +} + +func (d Details) GetInboundTransport() InboundTransport { + return d.InboundTransport +} + +func (d Details) GetTenant() string { + return d.Tenant +} diff --git a/cmd/collector/app/server/test.go b/cmd/collector/app/server/test.go index fa92dbf0ab1..a047191f6b1 100644 --- a/cmd/collector/app/server/test.go +++ b/cmd/collector/app/server/test.go @@ -26,6 +26,6 @@ func (*mockSpanProcessor) Close() error { return nil } -func (*mockSpanProcessor) ProcessSpans(_ processor.Spans) ([]bool, error) { +func (*mockSpanProcessor) ProcessSpans(_ processor.Batch) ([]bool, error) { return []bool{}, nil } diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index abea3c9a19b..f866b009e7d 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "time" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" @@ -164,22 +165,29 @@ func (sp *spanProcessor) countSpan(span *model.Span, _ string /* tenant */) { sp.spansProcessed.Add(1) } -func (sp *spanProcessor) ProcessSpans(spans processor.Spans) ([]bool, error) { - sp.preProcessSpans(spans) - sp.metrics.BatchSize.Update(int64(len(spans.SpansV1))) - retMe := make([]bool, len(spans.SpansV1)) +func (sp *spanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) { + sp.preProcessSpans(batch) + var spans []*model.Span + batch.GetSpans(func(spansV1 []*model.Span) { + spans = spansV1 + }, func(traces ptrace.Traces) { + panic("not implemented") + }) + sp.metrics.BatchSize.Update(int64(len(spans))) + retMe := make([]bool, len(spans)) // Note: this is not the ideal place to do this because collector tags are added to Process.Tags, // and Process can be shared between different spans in the batch, but we no longer know that, // the relation is lost upstream and it's impossible in Go to dedupe pointers. But at least here // we have a single thread updating all spans that may share the same Process, before concurrency // kicks in. - for _, span := range spans.SpansV1 { + for _, span := range spans { sp.addCollectorTags(span) } - for i, mSpan := range spans.SpansV1 { - ok := sp.enqueueSpan(mSpan, spans.SpanFormat, spans.InboundTransport, spans.Tenant) + for i, mSpan := range spans { + // TODO does this have to be one span at a time? + ok := sp.enqueueSpan(mSpan, batch.GetSpanFormat(), batch.GetInboundTransport(), batch.GetTenant()) if !ok && sp.reportBusy { return nil, processor.ErrBusy } @@ -189,6 +197,7 @@ func (sp *spanProcessor) ProcessSpans(spans processor.Spans) ([]bool, error) { } func (sp *spanProcessor) processItemFromQueue(item *queueItem) { + // TODO calling sanitized here contradicts the comment in enqueueSpan that warns about shared Process. sp.processSpan(sp.sanitizer(item.span), item.tenant) sp.metrics.InQueueLatency.Record(time.Since(item.queuedTime)) } diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index e62cd41cc6d..f52494098e9 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -238,9 +238,11 @@ func TestSpanProcessor(t *testing.T) { p := NewSpanProcessor(w, nil, Options.QueueSize(1)).(*spanProcessor) res, err := p.ProcessSpans( - processor.Spans{ - SpansV1: []*model.Span{{}}, // empty span should be enriched by sanitizers - SpanFormat: processor.JaegerSpanFormat, + processor.SpansV1{ + Spans: []*model.Span{{}}, // empty span should be enriched by sanitizers + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) @@ -265,15 +267,17 @@ func TestSpanProcessorErrors(t *testing.T) { Options.QueueSize(1), ).(*spanProcessor) - res, err := p.ProcessSpans(processor.Spans{ - SpansV1: []*model.Span{ + res, err := p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ { Process: &model.Process{ ServiceName: "x", }, }, }, - SpanFormat: processor.JaegerSpanFormat, + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) @@ -320,8 +324,8 @@ func TestSpanProcessorBusy(t *testing.T) { w.Lock() defer w.Unlock() - res, err := p.ProcessSpans(processor.Spans{ - SpansV1: []*model.Span{ + res, err := p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ { Process: &model.Process{ ServiceName: "x", @@ -338,7 +342,9 @@ func TestSpanProcessorBusy(t *testing.T) { }, }, }, - SpanFormat: processor.JaegerSpanFormat, + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, }) require.Error(t, err, "expecting busy error") @@ -620,15 +626,17 @@ func TestAdditionalProcessors(t *testing.T) { // nil doesn't fail p := NewSpanProcessor(w, nil, Options.QueueSize(1)) - res, err := p.ProcessSpans(processor.Spans{ - SpansV1: []*model.Span{ + res, err := p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ { Process: &model.Process{ ServiceName: "x", }, }, }, - SpanFormat: processor.JaegerSpanFormat, + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) @@ -640,15 +648,17 @@ func TestAdditionalProcessors(t *testing.T) { count++ } p = NewSpanProcessor(w, []ProcessSpan{f}, Options.QueueSize(1)) - res, err = p.ProcessSpans(processor.Spans{ - SpansV1: []*model.Span{ + res, err = p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ { Process: &model.Process{ ServiceName: "x", }, }, }, - SpanFormat: processor.JaegerSpanFormat, + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) @@ -662,15 +672,17 @@ func TestSpanProcessorContextPropagation(t *testing.T) { dummyTenant := "context-prop-test-tenant" - res, err := p.ProcessSpans(processor.Spans{ - SpansV1: []*model.Span{ + res, err := p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ { Process: &model.Process{ ServiceName: "x", }, }, }, - Tenant: dummyTenant, + Details: processor.Details{ + Tenant: dummyTenant, + }, }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) @@ -702,11 +714,13 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) { w.Lock() defer w.Unlock() - _, err := p.ProcessSpans(processor.Spans{ - SpansV1: []*model.Span{ + _, err := p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ {OperationName: "op1"}, }, - SpanFormat: processor.JaegerSpanFormat, + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, }) require.NoError(t, err) @@ -717,12 +731,14 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) { // Now the queue is empty again and can accept one more item, but no workers available. // If we send two items, the last one will have to be dropped. - _, err = p.ProcessSpans(processor.Spans{ - SpansV1: []*model.Span{ + _, err = p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ {OperationName: "op2"}, {OperationName: "op3"}, }, - SpanFormat: processor.JaegerSpanFormat, + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, }) require.EqualError(t, err, processor.ErrBusy.Error()) assert.Equal(t, []string{"op3"}, droppedOperations)