Skip to content

Commit

Permalink
Update signozspanmetricsprocessor to remove dependency on prometheus … (
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored Nov 23, 2023
1 parent daa4c25 commit d1dc659
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 48 deletions.
3 changes: 3 additions & 0 deletions processor/signozspanmetricsprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ type Config struct {

// skipSanitizeLabel if enabled, labels that start with _ are not sanitized
skipSanitizeLabel bool

// MetricsEmitInterval is the time period between when metrics are flushed or emitted to the configured MetricsExporter.
MetricsFlushInterval time.Duration `mapstructure:"metrics_flush_interval"`
}

// GetAggregationTemporality converts the string value given in the config into a AggregationTemporality.
Expand Down
5 changes: 5 additions & 0 deletions processor/signozspanmetricsprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,21 @@ func TestLoadConfig(t *testing.T) {
wantDimensions []Dimension
wantDimensionsCacheSize int
wantAggregationTemporality string
wantMetricsFlushInterval time.Duration
}{
{
configFile: "config-2-pipelines.yaml",
wantMetricsExporter: "prometheus",
wantAggregationTemporality: cumulative,
wantDimensionsCacheSize: 500,
wantMetricsFlushInterval: 30 * time.Second,
},
{
configFile: "config-3-pipelines.yaml",
wantMetricsExporter: "otlp/spanmetrics",
wantAggregationTemporality: cumulative,
wantDimensionsCacheSize: defaultDimensionsCacheSize,
wantMetricsFlushInterval: 60 * time.Second,
},
{
configFile: "config-full.yaml",
Expand All @@ -73,6 +76,7 @@ func TestLoadConfig(t *testing.T) {
},
wantDimensionsCacheSize: 1500,
wantAggregationTemporality: delta,
wantMetricsFlushInterval: 60 * time.Second,
},
}
for _, tc := range testcases {
Expand Down Expand Up @@ -104,6 +108,7 @@ func TestLoadConfig(t *testing.T) {
Dimensions: tc.wantDimensions,
DimensionsCacheSize: tc.wantDimensionsCacheSize,
AggregationTemporality: tc.wantAggregationTemporality,
MetricsFlushInterval: tc.wantMetricsFlushInterval,
},
cfg.Processors[component.NewID(typeStr)],
)
Expand Down
30 changes: 23 additions & 7 deletions processor/signozspanmetricsprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@ package signozspanmetricsprocessor

import (
"context"
"time"

"github.com/google/uuid"
"github.com/tilinna/clock"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
semconv "go.opentelemetry.io/collector/semconv/v1.13.0"
)

const (
// The value of "type" key in configuration.
typeStr = "signozspanmetrics"
// The stability level of the processor.
stability = component.StabilityLevelBeta

signozID = "signoz.collector.id"
)

// NewFactory creates a factory for the spanmetrics processor.
Expand All @@ -46,12 +47,27 @@ func createDefaultConfig() component.Config {
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
DimensionsCacheSize: defaultDimensionsCacheSize,
skipSanitizeLabel: dropSanitizationFeatureGate.IsEnabled(),
MetricsFlushInterval: 60 * time.Second,
}
}

func createTracesProcessor(ctx context.Context, params processor.CreateSettings, cfg component.Config, nextConsumer consumer.Traces) (processor.Traces, error) {
var instanceID string
serviceInstanceId, ok := params.Resource.Attributes().Get(semconv.AttributeServiceInstanceID)
if ok {
instanceID = serviceInstanceId.AsString()
} else {
instanceUUID, _ := uuid.NewRandom()
instanceID = instanceUUID.String()
}
p, err := newProcessor(params.Logger, instanceID, cfg, metricsTicker(ctx, cfg))
if err != nil {
return nil, err
}
p.tracesConsumer = nextConsumer
return p, nil
}

func createTracesProcessor(_ context.Context, params processor.CreateSettings, cfg component.Config, nextConsumer consumer.Traces) (processor.Traces, error) {
// TODO(srikanthccv): use the instanceID from params when it is added
instanceUUID, _ := uuid.NewRandom()
instanceID := instanceUUID.String()
return newProcessor(params.Logger, instanceID, cfg, nextConsumer)
func metricsTicker(ctx context.Context, cfg component.Config) *clock.Ticker {
return clock.FromContext(ctx).NewTicker(cfg.(*Config).MetricsFlushInterval)
}
11 changes: 11 additions & 0 deletions processor/signozspanmetricsprocessor/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,20 @@ func (c *Cache[K, V]) Get(key K) (V, bool) {
return val.(V), ok
}
val, ok := c.evictedItems[key]

// Revive from evicted items back into the main cache if a fetch was attempted.
if ok {
delete(c.evictedItems, key)
c.Add(key, val)
}

return val, ok
}

func (c *Cache[K, V]) Contains(key K) bool {
return c.lru.Contains(key)
}

// Len returns the number of items in the cache.
func (c *Cache[K, V]) Len() int {
return c.lru.Len()
Expand Down
24 changes: 24 additions & 0 deletions processor/signozspanmetricsprocessor/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,30 @@ func TestNewCache(t *testing.T) {
}
}

func TestCache_GetReviveEvicted(t *testing.T) {
cache, _ := NewCache[string, string](1)
cache.Add("key0", "val_from_LRU")
cache.evictedItems["key1"] = "val_from_evicted_items"

gotValue, gotOk := cache.Get("key0")
assert.True(t, gotOk)
assert.Equal(t, "val_from_LRU", gotValue)

// Should revive the evicted key back into the main LRU cache.
gotValue, gotOk = cache.Get("key1")
assert.True(t, gotOk)
assert.Equal(t, "val_from_evicted_items", gotValue)

cache.RemoveEvictedItems()

_, gotOk = cache.Get("key0")
assert.False(t, gotOk, "key0 should be removed from evicted items")

gotValue, gotOk = cache.Get("key1")
assert.True(t, gotOk)
assert.Equal(t, "val_from_evicted_items", gotValue, "key1 should be in the main LRU cache")
}

func TestCache_Get(t *testing.T) {
tests := []struct {
name string
Expand Down
92 changes: 70 additions & 22 deletions processor/signozspanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
"time"
"unicode"

"github.com/tilinna/clock"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/collector/semconv/v1.13.0"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/SigNoz/signoz-otel-collector/processor/signozspanmetricsprocessor/internal/cache"
Expand Down Expand Up @@ -72,8 +73,8 @@ type processorImp struct {
instanceID string
config Config

metricsExporter exporter.Metrics
nextConsumer consumer.Traces
metricsConsumer consumer.Metrics
tracesConsumer consumer.Traces

// Additional dimensions to add to metrics.
dimensions []dimension // signoz_latency metric
Expand Down Expand Up @@ -108,6 +109,12 @@ type processorImp struct {

attrsCardinality map[string]map[string]struct{}
excludePatternRegex map[string]*regexp.Regexp

ticker *clock.Ticker
done chan struct{}
started bool

shutdownOnce sync.Once
}

type dimension struct {
Expand Down Expand Up @@ -137,7 +144,7 @@ type histogramData struct {
exemplarsData []exemplarData
}

func newProcessor(logger *zap.Logger, instanceID string, config component.Config, nextConsumer consumer.Traces) (*processorImp, error) {
func newProcessor(logger *zap.Logger, instanceID string, config component.Config, ticker *clock.Ticker) (*processorImp, error) {
logger.Info("Building signozspanmetricsprocessor")
pConfig := config.(*Config)

Expand Down Expand Up @@ -207,7 +214,6 @@ func newProcessor(logger *zap.Logger, instanceID string, config component.Config
callLatencyBounds: bounds,
dbCallLatencyBounds: bounds,
externalCallLatencyBounds: bounds,
nextConsumer: nextConsumer,
dimensions: newDimensions(pConfig.Dimensions),
callDimensions: newDimensions(callDimensions),
dbCallDimensions: newDimensions(dbCallDimensions),
Expand All @@ -219,6 +225,8 @@ func newProcessor(logger *zap.Logger, instanceID string, config component.Config
externalCallMetricKeyToDimensions: externalCallMetricKeyToDimensionsCache,
attrsCardinality: make(map[string]map[string]struct{}),
excludePatternRegex: excludePatternRegex,
ticker: ticker,
done: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -305,7 +313,7 @@ func (p *processorImp) Start(ctx context.Context, host component.Host) error {
p.logger.Info("Starting signozspanmetricsprocessor with config", zap.Any("config", p.config))
exporters := host.GetExporters()

var availableMetricsExporters []string
availableMetricsExporters := make([]string, 0, len(exporters[component.DataTypeMetrics]))

// The available list of exporters come from any configured metrics pipelines' exporters.
for k, exp := range exporters[component.DataTypeMetrics] {
Expand All @@ -321,22 +329,43 @@ func (p *processorImp) Start(ctx context.Context, host component.Host) error {
zap.Any("available-exporters", availableMetricsExporters),
)
if k.String() == p.config.MetricsExporter {
p.metricsExporter = metricsExp
p.metricsConsumer = metricsExp
p.logger.Info("Found exporter", zap.String("signozspanmetrics-exporter", p.config.MetricsExporter))
break
}
}
if p.metricsExporter == nil {
if p.metricsConsumer == nil {
return fmt.Errorf("failed to find metrics exporter: '%s'; please configure metrics_exporter from one of: %+v",
p.config.MetricsExporter, availableMetricsExporters)
}
p.logger.Info("Started signozspanmetricsprocessor")

p.started = true
go func() {
for {
select {
case <-p.done:
return
case <-p.ticker.C:
p.exportMetrics(ctx)
}
}
}()

return nil
}

// Shutdown implements the component.Component interface.
func (p *processorImp) Shutdown(ctx context.Context) error {
p.logger.Info("Shutting down signozspanmetricsprocessor")
p.shutdownOnce.Do(func() {
if p.started {
p.logger.Info("Stopping ticker")
p.ticker.Stop()
p.done <- struct{}{}
p.started = false
}
})
return nil
}

Expand All @@ -349,15 +378,17 @@ func (p *processorImp) Capabilities() consumer.Capabilities {
// It aggregates the trace data to generate metrics, forwarding these metrics to the discovered metrics exporter.
// The original input trace data will be forwarded to the next consumer, unmodified.
func (p *processorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
// Forward trace data unmodified and propagate both metrics and trace pipeline errors, if any.
return multierr.Combine(p.tracesToMetrics(ctx, traces), p.nextConsumer.ConsumeTraces(ctx, traces))
p.lock.Lock()
p.aggregateMetrics(traces)
p.lock.Unlock()

// Forward trace data unmodified and propagate trace pipeline errors, if any.
return p.tracesConsumer.ConsumeTraces(ctx, traces)
}

func (p *processorImp) tracesToMetrics(ctx context.Context, traces ptrace.Traces) error {
func (p *processorImp) exportMetrics(ctx context.Context) {
p.lock.Lock()

p.aggregateMetrics(traces)

m, err := p.buildMetrics()

// Exemplars are only relevant to this batch of traces, so must be cleared within the lock,
Expand All @@ -369,14 +400,13 @@ func (p *processorImp) tracesToMetrics(ctx context.Context, traces ptrace.Traces

if err != nil {
p.logCardinalityInfo()
return err
p.logger.Error("Failed to build metrics", zap.Error(err))
}

if err = p.metricsExporter.ConsumeMetrics(ctx, m); err != nil {
return err
if err := p.metricsConsumer.ConsumeMetrics(ctx, m); err != nil {
p.logger.Error("Failed ConsumeMetrics", zap.Error(err))
return
}

return nil
}

// buildMetrics collects the computed raw metrics data, builds the metrics object and
Expand Down Expand Up @@ -404,6 +434,27 @@ func (p *processorImp) buildMetrics() (pmetric.Metrics, error) {
p.dbCallMetricKeyToDimensions.RemoveEvictedItems()
p.externalCallMetricKeyToDimensions.RemoveEvictedItems()

for key := range p.histograms {
if !p.metricKeyToDimensions.Contains(key) {
delete(p.histograms, key)
}
}
for key := range p.callHistograms {
if !p.callMetricKeyToDimensions.Contains(key) {
delete(p.callHistograms, key)
}
}
for key := range p.dbCallHistograms {
if !p.dbCallMetricKeyToDimensions.Contains(key) {
delete(p.dbCallHistograms, key)
}
}
for key := range p.externalCallHistograms {
if !p.externalCallMetricKeyToDimensions.Contains(key) {
delete(p.externalCallHistograms, key)
}
}

// If delta metrics, reset accumulated data
if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
p.resetAccumulatedMetrics()
Expand Down Expand Up @@ -446,7 +497,6 @@ func (p *processorImp) collectLatencyMetrics(ilm pmetric.ScopeMetrics) error {

dimensions, err := p.getDimensionsByMetricKey(key)
if err != nil {
p.logger.Error(err.Error())
return err
}

Expand Down Expand Up @@ -489,7 +539,6 @@ func (p *processorImp) collectDBCallMetrics(ilm pmetric.ScopeMetrics) error {

dimensions, err := p.getDimensionsByDBCallMetricKey(key)
if err != nil {
p.logger.Error(err.Error())
return err
}

Expand Down Expand Up @@ -531,7 +580,6 @@ func (p *processorImp) collectExternalCallMetrics(ilm pmetric.ScopeMetrics) erro

dimensions, err := p.getDimensionsByExternalCallMetricKey(key)
if err != nil {
p.logger.Error(err.Error())
return err
}

Expand Down Expand Up @@ -748,7 +796,7 @@ func (p *processorImp) aggregateMetrics(traces ptrace.Traces) {
if !ok {
continue
}
resourceAttr.PutStr(signozID, p.instanceID)
resourceAttr.PutStr(semconv.AttributeServiceInstanceID, p.instanceID)
serviceName := serviceAttr.Str()
ilsSlice := rspans.ScopeSpans()
for j := 0; j < ilsSlice.Len(); j++ {
Expand Down
Loading

0 comments on commit d1dc659

Please sign in to comment.