Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move from OpenCensus to OTel API #452

Merged
merged 7 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 44 additions & 23 deletions exporter/clickhouselogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)

Expand All @@ -63,10 +66,15 @@ type clickhouseLogsExporter struct {
wg *sync.WaitGroup
closeChan chan struct{}

durationHistogram metric.Float64Histogram

useNewSchema bool
}

func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, error) {
func newExporter(set exporter.Settings, cfg *Config) (*clickhouseLogsExporter, error) {
logger := set.Logger
meter := set.MeterProvider.Meter("github.com/SigNoz/signoz-otel-collector/exporter/clickhouselogsexporter")

if err := cfg.Validate(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -96,17 +104,28 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro
return nil, err
}

durationHistogram, err := meter.Float64Histogram(
"exporter_db_write_latency",
metric.WithDescription("Time taken to write data to ClickHouse"),
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(250, 500, 750, 1000, 2000, 2500, 3000, 4000, 5000, 6000, 8000, 10000, 15000, 25000, 30000),
nityanandagohain marked this conversation as resolved.
Show resolved Hide resolved
)
if err != nil {
return nil, err
}

return &clickhouseLogsExporter{
id: id,
db: client,
insertLogsSQL: insertLogsSQL,
insertLogsSQLV2: insertLogsSQLV2,
logger: logger,
cfg: cfg,
usageCollector: collector,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
useNewSchema: cfg.UseNewSchema,
id: id,
db: client,
insertLogsSQL: insertLogsSQL,
insertLogsSQLV2: insertLogsSQLV2,
logger: logger,
cfg: cfg,
usageCollector: collector,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
useNewSchema: cfg.UseNewSchema,
durationHistogram: durationHistogram,
}, nil
}

Expand Down Expand Up @@ -439,12 +458,13 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
// store the duration for send the data
for i := 0; i < chLen; i++ {
sendDuration := <-chDuration
stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, pipeline.SignalLogs.String()),
tag.Upsert(tableKey, sendDuration.Name),
},
writeLatencyMillis.M(int64(sendDuration.duration.Milliseconds())),
e.durationHistogram.Record(
ctx,
float64(sendDuration.duration.Milliseconds()),
metric.WithAttributes(
attribute.String("table", sendDuration.Name),
attribute.String("exporter", pipeline.SignalLogs.String()),
),
)
}

Expand All @@ -466,12 +486,13 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
// push tag attributes
tagWriteStart := time.Now()
err = tagStatement.Send()
stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, pipeline.SignalLogs.String()),
tag.Upsert(tableKey, DISTRIBUTED_TAG_ATTRIBUTES),
},
writeLatencyMillis.M(int64(time.Since(tagWriteStart).Milliseconds())),
e.durationHistogram.Record(
ctx,
float64(time.Since(tagWriteStart).Milliseconds()),
metric.WithAttributes(
attribute.String("table", DISTRIBUTED_TAG_ATTRIBUTES),
attribute.String("exporter", pipeline.SignalLogs.String()),
),
)
if err != nil {
return err
Expand Down
22 changes: 1 addition & 21 deletions exporter/clickhouselogsexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ import (
"context"
"fmt"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
Expand All @@ -36,25 +33,8 @@ const (
tableName = "logs"
)

var (
writeLatencyMillis = stats.Int64("exporter_db_write_latency", "Time taken (in millis) for exporter to write batch", "ms")
exporterKey = tag.MustNewKey("exporter")
tableKey = tag.MustNewKey("table")
)

// NewFactory creates a factory for Elastic exporter.
func NewFactory() exporter.Factory {
writeLatencyDistribution := view.Distribution(100, 250, 500, 750, 1000, 2000, 4000, 8000, 16000, 32000, 64000, 128000, 256000, 512000)

writeLatencyView := &view.View{
Name: "exporter_db_write_latency",
Measure: writeLatencyMillis,
Description: writeLatencyMillis.Description(),
TagKeys: []tag.Key{exporterKey, tableKey},
Aggregation: writeLatencyDistribution,
}

view.Register(writeLatencyView)

return exporter.NewFactory(
component.MustNewType(typeStr),
Expand All @@ -79,7 +59,7 @@ func createLogsExporter(
cfg component.Config,
) (exporter.Logs, error) {
c := cfg.(*Config)
exporter, err := newExporter(set.Logger, c)
exporter, err := newExporter(set, c)
if err != nil {
return nil, fmt.Errorf("cannot configure clickhouse logs exporter: %w", err)
}
Expand Down
100 changes: 67 additions & 33 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ import (
"github.com/jellydator/ttlcache/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/sirupsen/logrus"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pipeline"
semconv "go.opentelemetry.io/collector/semconv/v1.13.0"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"

"github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/base"
"github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/utils/timeseries"
Expand All @@ -61,7 +64,7 @@ const (
// clickHouse implements storage interface for the ClickHouse.
type clickHouse struct {
conn clickhouse.Conn
l *logrus.Entry
l *zap.Logger
database string
maxTimeSeriesInQuery int

Expand All @@ -80,6 +83,8 @@ type clickHouse struct {
mWrittenTimeSeries prometheus.Counter

exporterID uuid.UUID

durationHistogram metric.Float64Histogram
}

type ClickHouseParams struct {
Expand All @@ -92,10 +97,13 @@ type ClickHouseParams struct {
WriteTSToV4 bool
DisableV2 bool
ExporterId uuid.UUID
Settings exporter.Settings
}

func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
l := logrus.WithField("component", "clickhouse")

logger := params.Settings.Logger
meter := params.Settings.MeterProvider.Meter("github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter")

options, err := clickhouse.ParseDSN(params.DSN)

Expand Down Expand Up @@ -124,9 +132,19 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
)
go cache.Start()

durationHistogram, err := meter.Float64Histogram(
"exporter_db_write_latency",
metric.WithDescription("Time taken to write data to ClickHouse"),
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(250, 500, 750, 1000, 2000, 2500, 3000, 4000, 5000, 6000, 8000, 10000, 15000, 25000, 30000),
)
if err != nil {
return nil, err
}

ch := &clickHouse{
conn: conn,
l: l,
l: logger,
database: options.Auth.Database,
maxTimeSeriesInQuery: params.MaxTimeSeriesInQuery,
cache: cache,
Expand All @@ -139,10 +157,11 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
Name: "written_time_series",
Help: "Number of written time series.",
}),
watcherInterval: params.WatcherInterval,
writeTSToV4: params.WriteTSToV4,
disableV2: params.DisableV2,
exporterID: params.ExporterId,
watcherInterval: params.WatcherInterval,
writeTSToV4: params.WriteTSToV4,
disableV2: params.DisableV2,
exporterID: params.ExporterId,
durationHistogram: durationHistogram,
}

go func() {
Expand Down Expand Up @@ -176,20 +195,20 @@ func (ch *clickHouse) shardCountWatcher(ctx context.Context) {

ch.timeSeriesRW.Lock()
if ch.prevShardCount != shardCount {
ch.l.Infof("Shard count changed from %d to %d. Resetting time series map.", ch.prevShardCount, shardCount)
ch.l.Info("Shard count changed. Resetting time series map.", zap.Uint64("prev", ch.prevShardCount), zap.Uint64("current", shardCount))
ch.timeSeries = make(map[uint64]struct{})
}
ch.prevShardCount = shardCount
ch.timeSeriesRW.Unlock()
return nil
}()
if err != nil {
ch.l.Error(err)
ch.l.Error("error getting shard count", zap.Error(err))
}

select {
case <-ctx.Done():
ch.l.Warn(ctx.Err())
ch.l.Warn("shard count watcher stopped", zap.Error(ctx.Err()))
return
case <-ticker.C:
}
Expand Down Expand Up @@ -254,7 +273,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
fingerprintToName[f][envLabel] = env
}
if len(fingerprints) != len(timeSeries) {
ch.l.Debugf("got %d fingerprints, but only %d of them were unique time series", len(fingerprints), len(timeSeries))
ch.l.Debug("got fingerprints, but only unique time series", zap.Int("fingerprints", len(fingerprints)), zap.Int("time series", len(timeSeries)))
}

// find new time series
Expand Down Expand Up @@ -299,11 +318,14 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr

start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, pipeline.SignalMetrics.String()),
tag.Upsert(tableKey, DISTRIBUTED_TIME_SERIES_TABLE),
ch.durationHistogram.Record(
ctx,
float64(time.Since(start).Milliseconds()),
metric.WithAttributes(
attribute.String("exporter", pipeline.SignalMetrics.String()),
attribute.String("table", DISTRIBUTED_TIME_SERIES_TABLE),
),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()

Expand Down Expand Up @@ -339,11 +361,14 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
}
start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, pipeline.SignalMetrics.String()),
tag.Upsert(tableKey, DISTRIBUTED_SAMPLES_TABLE),
ch.durationHistogram.Record(
ctx,
float64(time.Since(start).Milliseconds()),
metric.WithAttributes(
attribute.String("exporter", pipeline.SignalMetrics.String()),
attribute.String("table", DISTRIBUTED_SAMPLES_TABLE),
),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()
if err != nil {
Expand Down Expand Up @@ -396,11 +421,14 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr

start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, pipeline.SignalMetrics.String()),
tag.Upsert(tableKey, DISTRIBUTED_SAMPLES_TABLE_V4),
ch.durationHistogram.Record(
ctx,
float64(time.Since(start).Milliseconds()),
metric.WithAttributes(
attribute.String("exporter", pipeline.SignalMetrics.String()),
attribute.String("table", DISTRIBUTED_SAMPLES_TABLE_V4),
),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()

Expand Down Expand Up @@ -451,11 +479,14 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr

start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, pipeline.SignalMetrics.String()),
tag.Upsert(tableKey, DISTRIBUTED_TIME_SERIES_TABLE_V4),
ch.durationHistogram.Record(
ctx,
float64(time.Since(start).Milliseconds()),
metric.WithAttributes(
attribute.String("exporter", pipeline.SignalMetrics.String()),
attribute.String("table", DISTRIBUTED_TIME_SERIES_TABLE_V4),
),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()

Expand All @@ -467,7 +498,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
n := len(newTimeSeries)
if n != 0 {
ch.mWrittenTimeSeries.Add(float64(n))
ch.l.Debugf("Wrote %d new time series.", n)
ch.l.Debug("wrote new time series", zap.Int("count", n))
}

err = func() error {
Expand Down Expand Up @@ -540,11 +571,14 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr

start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, pipeline.SignalMetrics.String()),
tag.Upsert(tableKey, DISTRIBUTED_EXP_HIST_TABLE),
ch.durationHistogram.Record(
ctx,
float64(time.Since(start).Milliseconds()),
metric.WithAttributes(
attribute.String("exporter", pipeline.SignalMetrics.String()),
attribute.String("table", DISTRIBUTED_EXP_HIST_TABLE),
),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions exporter/clickhousemetricsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func NewPrwExporter(cfg *Config, set exporter.Settings) (*PrwExporter, error) {
WriteTSToV4: cfg.WriteTSToV4,
DisableV2: cfg.DisableV2,
ExporterId: id,
Settings: set,
}
ch, err := NewClickHouse(params)
if err != nil {
Expand Down
Loading