Skip to content

Commit

Permalink
Add metric meta information (#231)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored Nov 30, 2023
1 parent d1dc659 commit ed70641
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 45 deletions.
11 changes: 10 additions & 1 deletion exporter/clickhousemetricsexporter/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,23 @@ import (
"github.com/prometheus/prometheus/prompb"
)

type MetricMeta struct {
Name string
Temporality pmetric.AggregationTemporality
Description string
Unit string
Typ pmetric.MetricType
IsMonotonic bool
}

// Storage represents generic storage.
type Storage interface {
// Read runs queries in the storage and returns the same amount of matrixes.
// Event if they are empty, they must be present in the returned slice.
// Read(context.Context, []Query) (*prompb.ReadResponse, error)

// Write puts data into storage.
Write(context.Context, *prompb.WriteRequest, map[string]pmetric.AggregationTemporality) error
Write(context.Context, *prompb.WriteRequest, map[string]MetricMeta) error

// Returns the DB conn.
GetDBConn() interface{}
Expand Down
25 changes: 17 additions & 8 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pmetric"
semconv "go.opentelemetry.io/collector/semconv/v1.13.0"

"github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/base"
Expand Down Expand Up @@ -191,7 +190,7 @@ func (ch *clickHouse) GetDBConn() interface{} {
return ch.conn
}

func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metricNameToTemporality map[string]pmetric.AggregationTemporality) error {
func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metricNameToMeta map[string]base.MetricMeta) error {
// calculate fingerprints, map them to time series
fingerprints := make([]uint64, len(data.Timeseries))
timeSeries := make(map[uint64][]*prompb.Label, len(data.Timeseries))
Expand Down Expand Up @@ -219,10 +218,10 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
}
// add temporality label
if metricName != "" {
if t, ok := metricNameToTemporality[metricName]; ok {
if t, ok := metricNameToMeta[metricName]; ok {
labels = append(labels, &prompb.Label{
Name: temporalityLabel,
Value: t.String(),
Value: t.Temporality.String(),
})
}
}
Expand Down Expand Up @@ -259,19 +258,24 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
return err
}

statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (metric_name, temporality, timestamp_ms, fingerprint, labels) VALUES (?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE))
statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (metric_name, temporality, timestamp_ms, fingerprint, labels, description, unit, type, is_monotonic) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE))
if err != nil {
return err
}
timestamp := model.Now().Time().UnixMilli()
for fingerprint, labels := range newTimeSeries {
encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128)))
meta := metricNameToMeta[fingerprintToName[fingerprint][nameLabel]]
err = statement.Append(
fingerprintToName[fingerprint][nameLabel],
metricNameToTemporality[fingerprintToName[fingerprint][nameLabel]].String(),
meta.Temporality.String(),
timestamp,
fingerprint,
encodedLabels,
meta.Description,
meta.Unit,
meta.Typ.String(),
meta.IsMonotonic,
)
if err != nil {
return err
Expand Down Expand Up @@ -300,20 +304,25 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
return err
}

statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, timestamp_ms, labels) VALUES (?, ?, ?, ?, ?, ?)", ch.database, TIME_SERIES_TABLE_V3))
statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, timestamp_ms, labels, description, unit, type, is_monotonic) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ch.database, TIME_SERIES_TABLE_V3))
if err != nil {
return err
}
timestamp := model.Now().Time().UnixMilli()
for fingerprint, labels := range newTimeSeries {
encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128)))
meta := metricNameToMeta[fingerprintToName[fingerprint][nameLabel]]
err = statement.Append(
fingerprintToName[fingerprint][envLabel],
metricNameToTemporality[fingerprintToName[fingerprint][nameLabel]].String(),
meta.Temporality.String(),
fingerprintToName[fingerprint][nameLabel],
fingerprint,
timestamp,
encodedLabels,
meta.Description,
meta.Unit,
meta.Typ.String(),
meta.IsMonotonic,
)
if err != nil {
return err
Expand Down
97 changes: 61 additions & 36 deletions exporter/clickhousemetricsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,20 @@ const maxBatchByteSize = 3000000

// PrwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
type PrwExporter struct {
namespace string
externalLabels map[string]string
endpointURL *url.URL
client *http.Client
wg *sync.WaitGroup
closeChan chan struct{}
concurrency int
userAgentHeader string
clientSettings *confighttp.HTTPClientSettings
settings component.TelemetrySettings
ch base.Storage
usageCollector *usage.UsageCollector
metricNameToTemporality map[string]pmetric.AggregationTemporality
mux *sync.Mutex
namespace string
externalLabels map[string]string
endpointURL *url.URL
client *http.Client
wg *sync.WaitGroup
closeChan chan struct{}
concurrency int
userAgentHeader string
clientSettings *confighttp.HTTPClientSettings
settings component.TelemetrySettings
ch base.Storage
usageCollector *usage.UsageCollector
metricNameToMeta map[string]base.MetricMeta
mux *sync.Mutex
}

// NewPrwExporter initializes a new PrwExporter instance and sets fields accordingly.
Expand Down Expand Up @@ -108,19 +108,19 @@ func NewPrwExporter(cfg *Config, set exporter.CreateSettings) (*PrwExporter, err
}

return &PrwExporter{
namespace: cfg.Namespace,
externalLabels: sanitizedLabels,
endpointURL: endpointURL,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
userAgentHeader: userAgentHeader,
concurrency: cfg.RemoteWriteQueue.NumConsumers,
clientSettings: &cfg.HTTPClientSettings,
settings: set.TelemetrySettings,
ch: ch,
usageCollector: collector,
metricNameToTemporality: make(map[string]pmetric.AggregationTemporality),
mux: new(sync.Mutex),
namespace: cfg.Namespace,
externalLabels: sanitizedLabels,
endpointURL: endpointURL,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
userAgentHeader: userAgentHeader,
concurrency: cfg.RemoteWriteQueue.NumConsumers,
clientSettings: &cfg.HTTPClientSettings,
settings: set.TelemetrySettings,
ch: ch,
usageCollector: collector,
metricNameToMeta: make(map[string]base.MetricMeta),
mux: new(sync.Mutex),
}, nil
}

Expand Down Expand Up @@ -185,12 +185,37 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
temporality = pmetric.AggregationTemporalityUnspecified
default:
}
prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)] = temporality
metricName := getPromMetricName(metric, prwe.namespace)
meta := base.MetricMeta{
Name: metricName,
Temporality: temporality,
Description: metric.Description(),
Unit: metric.Unit(),
Typ: metricType,
}
if metricType == pmetric.MetricTypeSum {
meta.IsMonotonic = metric.Sum().IsMonotonic()
}
prwe.metricNameToMeta[metricName] = meta

if metricType == pmetric.MetricTypeHistogram || metricType == pmetric.MetricTypeSummary {
prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+bucketStr] = temporality
prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+countStr] = temporality
prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+sumStr] = temporality
prwe.metricNameToMeta[metricName+bucketStr] = meta
prwe.metricNameToMeta[metricName+countStr] = base.MetricMeta{
Name: metricName,
Temporality: temporality,
Description: metric.Description(),
Unit: metric.Unit(),
Typ: pmetric.MetricTypeSum,
IsMonotonic: temporality == pmetric.AggregationTemporalityCumulative,
}
prwe.metricNameToMeta[metricName+sumStr] = base.MetricMeta{
Name: metricName,
Temporality: temporality,
Description: metric.Description(),
Unit: metric.Unit(),
Typ: pmetric.MetricTypeSum,
IsMonotonic: temporality == pmetric.AggregationTemporalityCumulative,
}
}

// handle individual metric based on type
Expand Down Expand Up @@ -277,10 +302,10 @@ func (prwe *PrwExporter) addNumberDataPointSlice(dataPoints pmetric.NumberDataPo
// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error {
prwe.mux.Lock()
// make a copy of metricNameToTemporality
metricNameToTemporality := make(map[string]pmetric.AggregationTemporality)
for k, v := range prwe.metricNameToTemporality {
metricNameToTemporality[k] = v
// make a copy of metricNameToMeta
metricNameToMeta := make(map[string]base.MetricMeta)
for k, v := range prwe.metricNameToMeta {
metricNameToMeta[k] = v
}
prwe.mux.Unlock()
var errs []error
Expand Down Expand Up @@ -310,7 +335,7 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti
defer wg.Done()

for request := range input {
err := prwe.ch.Write(ctx, request, metricNameToTemporality)
err := prwe.ch.Write(ctx, request, metricNameToMeta)
if err != nil {
mu.Lock()
errs = append(errs, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
ALTER TABLE signoz_metrics.time_series_v3 ON CLUSTER {{.SIGNOZ_CLUSTER}}
DROP COLUMN IF EXISTS description,
DROP COLUMN IF EXISTS unit,
DROP COLUMN IF EXISTS type,
DROP COLUMN IF EXISTS is_monotonic;

ALTER TABLE signoz_metrics.distributed_time_series_v3 ON CLUSTER {{.SIGNOZ_CLUSTER}}
DROP COLUMN IF EXISTS description,
DROP COLUMN IF EXISTS unit,
DROP COLUMN IF EXISTS type,
DROP COLUMN IF EXISTS is_monotonic;

ALTER TABLE signoz_metrics.time_series_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}}
DROP COLUMN IF EXISTS description,
DROP COLUMN IF EXISTS unit,
DROP COLUMN IF EXISTS type,
DROP COLUMN IF EXISTS is_monotonic;

ALTER TABLE signoz_metrics.distributed_time_series_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}}
DROP COLUMN IF EXISTS description,
DROP COLUMN IF EXISTS unit,
DROP COLUMN IF EXISTS type,
DROP COLUMN IF EXISTS is_monotonic;
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
ALTER TABLE signoz_metrics.time_series_v3 ON CLUSTER {{.SIGNOZ_CLUSTER}}
ADD COLUMN IF NOT EXISTS description LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)),
ADD COLUMN IF NOT EXISTS unit LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)),
ADD COLUMN IF NOT EXISTS type LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)),
ADD COLUMN IF NOT EXISTS is_monotonic Bool DEFAULT false CODEC(ZSTD(1));

ALTER TABLE signoz_metrics.distributed_time_series_v3 ON CLUSTER {{.SIGNOZ_CLUSTER}}
ADD COLUMN IF NOT EXISTS description LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)),
ADD COLUMN IF NOT EXISTS unit LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)),
ADD COLUMN IF NOT EXISTS type LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)),
ADD COLUMN IF NOT EXISTS is_monotonic Bool DEFAULT false CODEC(ZSTD(1));

ALTER TABLE signoz_metrics.time_series_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}}
ADD COLUMN IF NOT EXISTS description LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)),
ADD COLUMN IF NOT EXISTS unit LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)),
ADD COLUMN IF NOT EXISTS type LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)),
ADD COLUMN IF NOT EXISTS is_monotonic Bool DEFAULT false CODEC(ZSTD(1));

ALTER TABLE signoz_metrics.distributed_time_series_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}}
ADD COLUMN IF NOT EXISTS description LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)),
ADD COLUMN IF NOT EXISTS unit LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)),
ADD COLUMN IF NOT EXISTS type LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)),
ADD COLUMN IF NOT EXISTS is_monotonic Bool DEFAULT false CODEC(ZSTD(1));

0 comments on commit ed70641

Please sign in to comment.