Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 159 additions & 1 deletion exporter/signozclickhousemetrics/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

internalmetadata "github.com/SigNoz/signoz-otel-collector/exporter/signozclickhousemetrics/internal/metadata"
chutils "github.com/SigNoz/signoz-otel-collector/pkg/clickhouse"
"github.com/SigNoz/signoz-otel-collector/usage"
"github.com/google/uuid"
"go.opencensus.io/stats"
Expand Down Expand Up @@ -72,6 +74,10 @@ type clickhouseMetricsExporter struct {
exporterID uuid.UUID

closeChan chan struct{}

minAcceptedTs atomic.Value
fetchShouldUpdateMinAcceptedTsTicker *time.Ticker
ttlParser *chutils.TTLParser
}

// sample represents a single metric sample
Expand Down Expand Up @@ -259,20 +265,34 @@ func NewClickHouseExporter(opts ...ExporterOption) (*clickhouseMetricsExporter,
}
}
chExporter.closeChan = make(chan struct{})
chExporter.ttlParser = chutils.NewTTLParser(chExporter.logger)

return chExporter, nil
}

func (c *clickhouseMetricsExporter) Start(ctx context.Context, host component.Host) error {
go c.cache.Start()
c.cacheRunning = true

c.fetchShouldUpdateMinAcceptedTsTicker = time.NewTicker(10 * time.Minute)

c.wg.Add(1)
go func() {
defer c.wg.Done()
c.updateMinAcceptedTs()
c.fetchShouldUpdateMinAcceptedTs()
}()

return nil
}

func (c *clickhouseMetricsExporter) Shutdown(ctx context.Context) error {
if c.cacheRunning {
c.cache.Stop()
}
if c.fetchShouldUpdateMinAcceptedTsTicker != nil {
c.fetchShouldUpdateMinAcceptedTsTicker.Stop()
}
if c.usageCollector != nil {
err := c.usageCollector.Stop()
if err != nil {
Expand All @@ -284,6 +304,37 @@ func (c *clickhouseMetricsExporter) Shutdown(ctx context.Context) error {
return c.conn.Close()
}

func (c *clickhouseMetricsExporter) updateMinAcceptedTs() {
c.logger.Info("Updating min accepted ts")

delTTL := c.ttlParser.GetTableTTLDays(context.Background(), c.conn, c.cfg.Database, c.cfg.SamplesTable)

if delTTL == 0 {
// no ttl for whatever reason, we won't filter any data
// if it was because of error we would rather want no filter than make false
// assumption
c.logger.Info("no TTL for table, no filtering")
c.minAcceptedTs.Store(uint64(0))
return
}

seconds := delTTL * 24 * 60 * 60
acceptedDateTime := time.Now().Add(-time.Duration(seconds) * time.Second)
c.minAcceptedTs.Store(uint64(acceptedDateTime.UnixNano()))
c.logger.Info("added min accepted ts", zap.Uint64("ttl_days", delTTL), zap.Time("min_accepted_time", acceptedDateTime))
}

func (c *clickhouseMetricsExporter) fetchShouldUpdateMinAcceptedTs() {
for {
select {
case <-c.closeChan:
return
case <-c.fetchShouldUpdateMinAcceptedTsTicker.C:
c.updateMinAcceptedTs()
}
}
}

// processGauge processes gauge metrics
func (c *clickhouseMetricsExporter) processGauge(batch *batch, metric pmetric.Metric, env string, resourceFingerprint, scopeFingerprint *pkgfingerprint.Fingerprint) {
name := metric.Name()
Expand Down Expand Up @@ -316,6 +367,17 @@ func (c *clickhouseMetricsExporter) processGauge(batch *batch, metric pmetric.Me
}
unixMilli := dp.Timestamp().AsTime().UnixMilli()

if c.minAcceptedTs.Load() != nil {
minAcceptedNs := c.minAcceptedTs.Load().(uint64)
if minAcceptedNs > 0 {
oldestAllowedTs := int64(minAcceptedNs / 1e6)
if unixMilli < oldestAllowedTs {
c.logger.Debug("skipping old metric data point", zap.String("metric", name), zap.Int64("ts", unixMilli), zap.Int64("oldestAllowedTs", oldestAllowedTs))
continue
}
}
}

fingerprint := pkgfingerprint.NewFingerprint(pkgfingerprint.PointFingerprintType, scopeFingerprint.Hash(), dp.Attributes(), map[string]string{
"__temporality__": temporality.String(),
})
Expand Down Expand Up @@ -379,6 +441,18 @@ func (c *clickhouseMetricsExporter) processSum(batch *batch, metric pmetric.Metr
continue
}
unixMilli := dp.Timestamp().AsTime().UnixMilli()

if c.minAcceptedTs.Load() != nil {
minAcceptedNs := c.minAcceptedTs.Load().(uint64)
if minAcceptedNs > 0 {
oldestAllowedTs := int64(minAcceptedNs / 1e6)
if unixMilli < oldestAllowedTs {
c.logger.Debug("skipping old metric data point", zap.String("metric", name), zap.Int64("ts", unixMilli), zap.Int64("oldestAllowedTs", oldestAllowedTs))
continue
}
}
}

fingerprint := pkgfingerprint.NewFingerprint(pkgfingerprint.PointFingerprintType, scopeFingerprint.Hash(), dp.Attributes(), map[string]string{
"__temporality__": temporality.String(),
})
Expand Down Expand Up @@ -426,6 +500,18 @@ func (c *clickhouseMetricsExporter) processHistogram(b *batch, metric pmetric.Me

addSample := func(batch *batch, dp pmetric.HistogramDataPoint, suffix string) {
unixMilli := dp.Timestamp().AsTime().UnixMilli()

if c.minAcceptedTs.Load() != nil {
minAcceptedNs := c.minAcceptedTs.Load().(uint64)
if minAcceptedNs > 0 {
oldestAllowedTs := int64(minAcceptedNs / 1e6)
if unixMilli < oldestAllowedTs {
c.logger.Debug("skipping old metric data point", zap.String("metric", name), zap.Int64("ts", unixMilli), zap.Int64("oldestAllowedTs", oldestAllowedTs))
return
}
}
}

sampleTyp := typ
sampleTemporality := temporality
var value float64
Expand Down Expand Up @@ -480,6 +566,18 @@ func (c *clickhouseMetricsExporter) processHistogram(b *batch, metric pmetric.Me
addBucketSample := func(batch *batch, dp pmetric.HistogramDataPoint, suffix string) {
var cumulativeCount uint64
unixMilli := dp.Timestamp().AsTime().UnixMilli()

if c.minAcceptedTs.Load() != nil {
minAcceptedNs := c.minAcceptedTs.Load().(uint64)
if minAcceptedNs > 0 {
oldestAllowedTs := int64(minAcceptedNs / 1e6)
if unixMilli < oldestAllowedTs {
c.logger.Debug("skipping old metric data point", zap.String("metric", name), zap.Int64("ts", unixMilli), zap.Int64("oldestAllowedTs", oldestAllowedTs))
return
}
}
}

pointAttrs := dp.Attributes()

for i := 0; i < dp.ExplicitBounds().Len() && i < dp.BucketCounts().Len(); i++ {
Expand Down Expand Up @@ -611,6 +709,18 @@ func (c *clickhouseMetricsExporter) processSummary(b *batch, metric pmetric.Metr

addSample := func(batch *batch, dp pmetric.SummaryDataPoint, suffix string) {
unixMilli := dp.Timestamp().AsTime().UnixMilli()

if c.minAcceptedTs.Load() != nil {
minAcceptedNs := c.minAcceptedTs.Load().(uint64)
if minAcceptedNs > 0 {
oldestAllowedTs := int64(minAcceptedNs / 1e6)
if unixMilli < oldestAllowedTs {
c.logger.Debug("skipping old metric data point", zap.String("metric", name), zap.Int64("ts", unixMilli), zap.Int64("oldestAllowedTs", oldestAllowedTs))
return
}
}
}

sampleTyp := typ
var value float64
switch suffix {
Expand Down Expand Up @@ -655,6 +765,18 @@ func (c *clickhouseMetricsExporter) processSummary(b *batch, metric pmetric.Metr

addQuantileSample := func(batch *batch, dp pmetric.SummaryDataPoint, suffix string) {
unixMilli := dp.Timestamp().AsTime().UnixMilli()

if c.minAcceptedTs.Load() != nil {
minAcceptedNs := c.minAcceptedTs.Load().(uint64)
if minAcceptedNs > 0 {
oldestAllowedTs := int64(minAcceptedNs / 1e6)
if unixMilli < oldestAllowedTs {
c.logger.Debug("skipping old metric data point", zap.String("metric", name), zap.Int64("ts", unixMilli), zap.Int64("oldestAllowedTs", oldestAllowedTs))
return
}
}
}

for i := 0; i < dp.QuantileValues().Len(); i++ {
quantile := dp.QuantileValues().At(i)
quantileStr := strconv.FormatFloat(quantile.Quantile(), 'f', -1, 64)
Expand Down Expand Up @@ -756,6 +878,18 @@ func (c *clickhouseMetricsExporter) processExponentialHistogram(b *batch, metric

addSample := func(batch *batch, dp pmetric.ExponentialHistogramDataPoint, suffix string) {
unixMilli := dp.Timestamp().AsTime().UnixMilli()

if c.minAcceptedTs.Load() != nil {
minAcceptedNs := c.minAcceptedTs.Load().(uint64)
if minAcceptedNs > 0 {
oldestAllowedTs := int64(minAcceptedNs / 1e6)
if unixMilli < oldestAllowedTs {
c.logger.Debug("skipping old metric data point", zap.String("metric", name), zap.Int64("ts", unixMilli), zap.Int64("oldestAllowedTs", oldestAllowedTs))
return
}
}
}

sampleTyp := typ
sampleTemporality := temporality
var value float64
Expand Down Expand Up @@ -822,6 +956,18 @@ func (c *clickhouseMetricsExporter) processExponentialHistogram(b *batch, metric

addDDSketchSample := func(batch *batch, dp pmetric.ExponentialHistogramDataPoint) {
unixMilli := dp.Timestamp().AsTime().UnixMilli()

if c.minAcceptedTs.Load() != nil {
minAcceptedNs := c.minAcceptedTs.Load().(uint64)
if minAcceptedNs > 0 {
oldestAllowedTs := int64(minAcceptedNs / 1e6)
if unixMilli < oldestAllowedTs {
c.logger.Debug("skipping old metric data point", zap.String("metric", name), zap.Int64("ts", unixMilli), zap.Int64("oldestAllowedTs", oldestAllowedTs))
return
}
}
}

positive := toStore(dp.Positive())
negative := toStore(dp.Negative())
gamma := math.Pow(2, math.Pow(2, float64(-dp.Scale())))
Expand Down Expand Up @@ -964,7 +1110,19 @@ func (c *clickhouseMetricsExporter) PushMetrics(ctx context.Context, md pmetric.
case <-c.closeChan:
return errors.New("shutdown has been called")
default:
return c.writeBatch(ctx, c.prepareBatch(ctx, md))
err := c.writeBatch(ctx, c.prepareBatch(ctx, md))
if err != nil {
// we try to remove metrics older than the retention period
// but if we get too many partitions for single INSERT block, we will drop the data
// Check if this is a ClickHouse exception with error code 252 (TOO_MANY_PARTS)
var chErr *clickhouse.Exception
if errors.As(err, &chErr) && chErr.Code == 252 {
c.logger.Warn("too many partitions for single INSERT block, dropping the batch")
return nil
}
return err
}
return nil
}
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.2 // indirect
github.com/AfterShip/clickhouse-sql-parser v0.4.13 // indirect
github.com/AthenZ/athenz v1.12.13 // indirect
github.com/Azure/azure-amqp-common-go/v4 v4.2.0 // indirect
github.com/Azure/azure-event-hubs-go/v3 v3.6.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,8 @@ github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMb
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4=
github.com/99designs/keyring v1.2.2 h1:pZd3neh/EmUzWONb35LxQfvuY7kiSXAq3HQd97+XBn0=
github.com/99designs/keyring v1.2.2/go.mod h1:wes/FrByc8j7lFOAGLGSNEg8f/PaI3cgTBqhFkHUrPk=
github.com/AfterShip/clickhouse-sql-parser v0.4.13 h1:mu7oB33REsIKe5FQdFXbasftCR/a0lCwX+usCq/QwdQ=
github.com/AfterShip/clickhouse-sql-parser v0.4.13/go.mod h1:W0Z82wJWkJxz2RVun/RMwxue3g7ut47Xxl+SFqdJGus=
github.com/AthenZ/athenz v1.12.13 h1:OhZNqZsoBXNrKBJobeUUEirPDnwt0HRo4kQMIO1UwwQ=
github.com/AthenZ/athenz v1.12.13/go.mod h1:XXDXXgaQzXaBXnJX6x/bH4yF6eon2lkyzQZ0z/dxprE=
github.com/Azure/azure-amqp-common-go/v4 v4.2.0 h1:q/jLx1KJ8xeI8XGfkOWMN9XrXzAfVTkyvCxPvHCjd2I=
Expand Down
Loading
Loading