Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro committed Jan 5, 2025
1 parent 8ddd301 commit 66f17e3
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 100 deletions.
8 changes: 5 additions & 3 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,11 @@ func TestAggregator(t *testing.T) {
},
},
}
_, err := c.spanProcessor.ProcessSpans(processor.Spans{
SpansV1: spans,
SpanFormat: processor.JaegerSpanFormat,
_, err := c.spanProcessor.ProcessSpans(processor.SpansV1{
Spans: spans,
Details: processor.Details{
SpanFormat: processor.JaegerSpanFormat,
},
})
require.NoError(t, err)
require.NoError(t, c.Close())
Expand Down
16 changes: 9 additions & 7 deletions cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
type batchConsumer struct {
logger *zap.Logger
spanProcessor processor.SpanProcessor
spanOptions processor.Spans // common settings for all spans
spanOptions processor.Details // common settings for all spans
tenancyMgr *tenancy.Manager
}

func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyMgr *tenancy.Manager) batchConsumer {
return batchConsumer{
logger: logger,
spanProcessor: spanProcessor,
spanOptions: processor.Spans{
spanOptions: processor.Details{
InboundTransport: transport,
SpanFormat: spanFormat,
},
Expand All @@ -75,11 +75,13 @@ func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error {
span.Process = batch.Process
}
}
_, err = c.spanProcessor.ProcessSpans(processor.Spans{
SpansV1: batch.Spans,
InboundTransport: c.spanOptions.InboundTransport,
SpanFormat: c.spanOptions.SpanFormat,
Tenant: tenant,
_, err = c.spanProcessor.ProcessSpans(processor.SpansV1{
Spans: batch.Spans,
Details: processor.Details{
InboundTransport: c.spanOptions.InboundTransport,
SpanFormat: c.spanOptions.SpanFormat,
Tenant: tenant,
},
})
if err != nil {
if errors.Is(err, processor.ErrBusy) {
Expand Down
17 changes: 11 additions & 6 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -33,17 +34,21 @@ type mockSpanProcessor struct {
spanFormat processor.SpanFormat
}

func (p *mockSpanProcessor) ProcessSpans(spans processor.Spans) ([]bool, error) {
func (p *mockSpanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
p.mux.Lock()
defer p.mux.Unlock()
p.spans = append(p.spans, spans.SpansV1...)
oks := make([]bool, len(spans.SpansV1))
batch.GetSpans(func(spans []*model.Span) {
p.spans = append(p.spans, spans...)
}, func(traces ptrace.Traces) {
panic("not implemented")
})
oks := make([]bool, len(p.spans))
if p.tenants == nil {
p.tenants = make(map[string]bool)
}
p.tenants[spans.Tenant] = true
p.transport = spans.InboundTransport
p.spanFormat = spans.SpanFormat
p.tenants[batch.GetTenant()] = true
p.transport = batch.GetInboundTransport()
p.spanFormat = batch.GetSpanFormat()
return oks, p.expectedError
}

Expand Down
20 changes: 12 additions & 8 deletions cmd/collector/app/handler/thrift_span_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options
mSpan := jConv.ToDomainSpan(span, batch.Process)
mSpans = append(mSpans, mSpan)
}
oks, err := jbh.modelProcessor.ProcessSpans(processor.Spans{
SpansV1: mSpans,
InboundTransport: options.InboundTransport,
SpanFormat: processor.JaegerSpanFormat,
oks, err := jbh.modelProcessor.ProcessSpans(processor.SpansV1{
Spans: mSpans,
Details: processor.Details{
InboundTransport: options.InboundTransport,
SpanFormat: processor.JaegerSpanFormat,
},
})
if err != nil {
jbh.logger.Error("Collector failed to process span batch", zap.Error(err))
Expand Down Expand Up @@ -106,10 +108,12 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options
convCount[i] = len(converted)
mSpans = append(mSpans, converted...)
}
bools, err := h.modelProcessor.ProcessSpans(processor.Spans{
SpansV1: mSpans,
InboundTransport: options.InboundTransport,
SpanFormat: processor.ZipkinSpanFormat,
bools, err := h.modelProcessor.ProcessSpans(processor.SpansV1{
Spans: mSpans,
Details: processor.Details{
InboundTransport: options.InboundTransport,
SpanFormat: processor.ZipkinSpanFormat,
},
})
if err != nil {
h.logger.Error("Collector failed to process Zipkin span batch", zap.Error(err))
Expand Down
15 changes: 12 additions & 3 deletions cmd/collector/app/handler/thrift_span_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
zipkinsanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)
Expand Down Expand Up @@ -57,12 +59,19 @@ type shouldIErrorProcessor struct {

var errTestError = errors.New("Whoops")

func (s *shouldIErrorProcessor) ProcessSpans(spans processor.Spans) ([]bool, error) {
func (s *shouldIErrorProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
if s.shouldError {
return nil, errTestError
}
retMe := make([]bool, len(spans.SpansV1))
for i := range spans.SpansV1 {
var spans []*model.Span
batch.GetSpans(func(sp []*model.Span) {
spans = sp
}, func(traces ptrace.Traces) {
panic("not implemented")
})

retMe := make([]bool, len(spans))
for i := range spans {
retMe[i] = true
}
return retMe, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/model_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type ProcessSpan func(span *model.Span, tenant string)

// ProcessSpans processes a batch of Domain Model Spans
type ProcessSpans func(spans processor.Spans)
type ProcessSpans func(spans processor.Batch)

// FilterSpan decides whether to allow or disallow a span
type FilterSpan func(span *model.Span) bool
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (options) apply(opts ...Option) options {
ret.hostMetrics = metrics.NullFactory
}
if ret.preProcessSpans == nil {
ret.preProcessSpans = func(_ processor.Spans) {}
ret.preProcessSpans = func(_ processor.Batch) {}
}
if ret.sanitizer == nil {
ret.sanitizer = func(span *model.Span) *model.Span { return span }
Expand Down
4 changes: 2 additions & 2 deletions cmd/collector/app/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestAllOptionSet(t *testing.T) {
Options.ServiceMetrics(metrics.NullFactory),
Options.Logger(zap.NewNop()),
Options.NumWorkers(5),
Options.PreProcessSpans(func(_ processor.Spans) {}),
Options.PreProcessSpans(func(_ processor.Batch) {}),
Options.Sanitizer(func(span *model.Span) *model.Span { return span }),
Options.QueueSize(10),
Options.DynQueueSizeWarmup(1000),
Expand All @@ -53,7 +53,7 @@ func TestNoOptionsSet(t *testing.T) {
assert.Nil(t, opts.collectorTags)
assert.False(t, opts.reportBusy)
assert.False(t, opts.blockingSubmit)
assert.NotPanics(t, func() { opts.preProcessSpans(processor.Spans{}) })
assert.NotPanics(t, func() { opts.preProcessSpans(processor.SpansV1{}) })
assert.NotPanics(t, func() { opts.preSave(nil, "") })
assert.True(t, opts.spanFilter(nil))
span := model.Span{}
Expand Down
39 changes: 39 additions & 0 deletions cmd/collector/app/processor/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) 2020 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package processor

import (
"errors"
)

// ErrBusy signalizes that processor cannot process incoming data
var ErrBusy = errors.New("server busy")

// InboundTransport identifies the transport used to receive spans.
type InboundTransport string

const (
// GRPCTransport indicates spans received over gRPC.
GRPCTransport InboundTransport = "grpc"
// HTTPTransport indicates spans received over HTTP.
HTTPTransport InboundTransport = "http"
// UnknownTransport is the fallback/catch-all category.
UnknownTransport InboundTransport = "unknown"
)

// SpanFormat identifies the data format in which the span was originally received.
type SpanFormat string

const (
// JaegerSpanFormat is for Jaeger Thrift spans.
JaegerSpanFormat SpanFormat = "jaeger"
// ZipkinSpanFormat is for Zipkin Thrift spans.
ZipkinSpanFormat SpanFormat = "zipkin"
// ProtoSpanFormat is for Jaeger protobuf Spans.
ProtoSpanFormat SpanFormat = "proto"
// OTLPSpanFormat is for OpenTelemetry OTLP format.
OTLPSpanFormat SpanFormat = "otlp"
// UnknownSpanFormat is the fallback/catch-all category.
UnknownSpanFormat SpanFormat = "unknown"
)
85 changes: 48 additions & 37 deletions cmd/collector/app/processor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,57 +4,68 @@
package processor

import (
"errors"
"io"

"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/model"
)

// ErrBusy signalizes that processor cannot process incoming data
var ErrBusy = errors.New("server busy")
var (
_ Batch = (*SpansV1)(nil)
_ Batch = (*SpansV2)(nil)
)

// Batch is a batch of spans passed to the processor.
type Batch interface {
// Spans calls the appropriate function based on the type of spans being processed.
GetSpans(v1 func(spans []*model.Span), v2 func(traces ptrace.Traces))

GetSpanFormat() SpanFormat
GetInboundTransport() InboundTransport
GetTenant() string
}

// SpansOptions additional options passed to processor along with the spans.
type Spans struct {
SpansV1 []*model.Span
SpansV2 ptrace.Traces
// SpanProcessor handles spans
type SpanProcessor interface {
// ProcessSpans processes spans and return with either a list of true/false success or an error
ProcessSpans(spans Batch) ([]bool, error)
io.Closer
}

type Details struct {
SpanFormat SpanFormat
InboundTransport InboundTransport
Tenant string
}

// SpanProcessor handles model spans
type SpanProcessor interface {
// ProcessSpans processes model spans and return with either a list of true/false success or an error
ProcessSpans(spans Spans) ([]bool, error)
io.Closer
// Spans is a batch of spans passed to the processor.
type SpansV1 struct {
Spans []*model.Span
Details
}

// InboundTransport identifies the transport used to receive spans.
type InboundTransport string
type SpansV2 struct {
Traces ptrace.Traces
Details
}

const (
// GRPCTransport indicates spans received over gRPC.
GRPCTransport InboundTransport = "grpc"
// HTTPTransport indicates spans received over HTTP.
HTTPTransport InboundTransport = "http"
// UnknownTransport is the fallback/catch-all category.
UnknownTransport InboundTransport = "unknown"
)
func (s SpansV1) GetSpans(v1 func([]*model.Span), v2 func(ptrace.Traces)) {
v1(s.Spans)
}

// SpanFormat identifies the data format in which the span was originally received.
type SpanFormat string

const (
// JaegerSpanFormat is for Jaeger Thrift spans.
JaegerSpanFormat SpanFormat = "jaeger"
// ZipkinSpanFormat is for Zipkin Thrift spans.
ZipkinSpanFormat SpanFormat = "zipkin"
// ProtoSpanFormat is for Jaeger protobuf Spans.
ProtoSpanFormat SpanFormat = "proto"
// OTLPSpanFormat is for OpenTelemetry OTLP format.
OTLPSpanFormat SpanFormat = "otlp"
// UnknownSpanFormat is the fallback/catch-all category.
UnknownSpanFormat SpanFormat = "unknown"
)
func (s SpansV2) GetSpans(v1 func([]*model.Span), v2 func(ptrace.Traces)) {
v2(s.Traces)
}

func (d Details) GetSpanFormat() SpanFormat {
return d.SpanFormat
}

func (d Details) GetInboundTransport() InboundTransport {
return d.InboundTransport
}

func (d Details) GetTenant() string {
return d.Tenant
}
2 changes: 1 addition & 1 deletion cmd/collector/app/server/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ func (*mockSpanProcessor) Close() error {
return nil
}

func (*mockSpanProcessor) ProcessSpans(_ processor.Spans) ([]bool, error) {
func (*mockSpanProcessor) ProcessSpans(_ processor.Batch) ([]bool, error) {
return []bool{}, nil
}
Loading

0 comments on commit 66f17e3

Please sign in to comment.