Skip to content

Commit

Permalink
Add warning message for invalid data and update TS write table (#238)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored Dec 7, 2023
1 parent 9b7e415 commit 4db53a8
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 19 deletions.
17 changes: 3 additions & 14 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,6 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
ch.timeSeriesRW.Unlock()

err := func() error {
ctx := context.Background()
err := ch.conn.Exec(ctx, `SET allow_experimental_object_type = 1`)
if err != nil {
return err
}

statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (metric_name, temporality, timestamp_ms, fingerprint, labels, description, unit, type, is_monotonic) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE))
if err != nil {
return err
Expand Down Expand Up @@ -296,15 +290,10 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
return err
}

// Write to time_series_v3 table
// Write to distributed_time_series_v3 table
err = func() error {
ctx := context.Background()
err := ch.conn.Exec(ctx, `SET allow_experimental_object_type = 1`)
if err != nil {
return err
}

statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, timestamp_ms, labels, description, unit, type, is_monotonic) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ch.database, TIME_SERIES_TABLE_V3))
statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, timestamp_ms, labels, description, unit, type, is_monotonic) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE_V3))
if err != nil {
return err
}
Expand Down Expand Up @@ -333,7 +322,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, string(component.DataTypeMetrics)),
tag.Upsert(tableKey, TIME_SERIES_TABLE_V3),
tag.Upsert(tableKey, DISTRIBUTED_TIME_SERIES_TABLE_V3),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
Expand Down
7 changes: 6 additions & 1 deletion exporter/clickhousemetricsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pkg/errors"
"go.opencensus.io/stats/view"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/prometheus/prometheus/prompb"

Expand Down Expand Up @@ -60,6 +61,7 @@ type PrwExporter struct {
usageCollector *usage.UsageCollector
metricNameToMeta map[string]base.MetricMeta
mux *sync.Mutex
logger *zap.Logger
}

// NewPrwExporter initializes a new PrwExporter instance and sets fields accordingly.
Expand Down Expand Up @@ -121,6 +123,7 @@ func NewPrwExporter(cfg *Config, set exporter.CreateSettings) (*PrwExporter, err
usageCollector: collector,
metricNameToMeta: make(map[string]base.MetricMeta),
mux: new(sync.Mutex),
logger: set.Logger,
}, nil
}

Expand Down Expand Up @@ -236,6 +239,7 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
dataPoints := metric.Histogram().DataPoints()
if dataPoints.Len() == 0 {
dropped++
prwe.logger.Warn("Dropped histogram metric with no data points", zap.String("name", metric.Name()))
}
for x := 0; x < dataPoints.Len(); x++ {
addSingleHistogramDataPoint(dataPoints.At(x), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
Expand All @@ -244,6 +248,7 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
dataPoints := metric.Summary().DataPoints()
if dataPoints.Len() == 0 {
dropped++
prwe.logger.Warn("Dropped summary metric with no data points", zap.String("name", metric.Name()))
}
for x := 0; x < dataPoints.Len(); x++ {
addSingleSummaryDataPoint(dataPoints.At(x), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
Expand All @@ -254,7 +259,7 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
dropped++
name := metric.Name()
typ := metric.Type().String()
errs = multierr.Append(errs, consumererror.NewPermanent(errors.New(fmt.Sprintf("unsupported metric type %s for %s", typ, name))))
prwe.logger.Warn("Unsupported metric type", zap.String("name", name), zap.String("type", typ))
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions exporter/clickhousemetricsexporter/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,25 @@ var (
// Measures for usage
ExporterSigNozSentMetricPoints = stats.Int64(
SigNozSentMetricPointsKey,
"Number of signoz log records successfully sent to destination.",
"Number of signoz metric points successfully sent to destination.",
stats.UnitDimensionless)
ExporterSigNozSentMetricPointsBytes = stats.Int64(
SigNozSentMetricPointsBytesKey,
"Total size of signoz log records successfully sent to destination.",
"Total size of signoz metric points successfully sent to destination.",
stats.UnitDimensionless)

// Views for usage
MetricPointsCountView = &view.View{
Name: "signoz_metric_points_count",
Measure: ExporterSigNozSentMetricPoints,
Description: "The number of logs exported to signoz",
Description: "The number of metric points exported to signoz",
Aggregation: view.Sum(),
TagKeys: []tag.Key{usage.TagTenantKey},
}
MetricPointsBytesView = &view.View{
Name: "signoz_metric_points_bytes",
Measure: ExporterSigNozSentMetricPointsBytes,
Description: "The size of logs exported to signoz",
Description: "The size of metric points exported to signoz",
Aggregation: view.Sum(),
TagKeys: []tag.Key{usage.TagTenantKey},
}
Expand Down

0 comments on commit 4db53a8

Please sign in to comment.