Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
f51abc5
feat(aggregating-merge): added migration for aggregating merge tree i…
aniketio-ctrl Jun 2, 2025
ae4872f
Merge branch 'main' of github.com:SigNoz/signoz-otel-collector
aniketio-ctrl Jun 2, 2025
653746e
feat(aggregating-merge): added migration for aggregating merge tree i…
aniketio-ctrl Jun 2, 2025
419b7ef
feat(aggregating-merge): added migration for aggregating merge tree i…
aniketio-ctrl Jun 2, 2025
0a734a5
feat(aggregating-merge): added migration for aggregating merge tree i…
aniketio-ctrl Jun 3, 2025
edbab46
feat(aggregating-merge): added disable ttl cache to disable ttl cache
aniketio-ctrl Jun 5, 2025
a05c148
feat(aggregating-merge): added migration for aggregating merge tree i…
aniketio-ctrl Jun 8, 2025
953a21d
feat(aggregating-merge): added migration for aggregating merge tree i…
aniketio-ctrl Jun 8, 2025
0cf169b
Merge branch 'main' of github.com:SigNoz/signoz-otel-collector into f…
aniketio-ctrl Jun 20, 2025
5c0ed93
feat(agg-merge-tree): added agg merge tree migration
aniketio-ctrl Jun 20, 2025
72ec3f8
feat(agg-merge-tree): added agg merge tree migration
aniketio-ctrl Jun 20, 2025
30ff5a0
feat(agg-merge-tree): added agg merge tree migration
aniketio-ctrl Jun 21, 2025
44367de
Merge branch 'main' of github.com:SigNoz/signoz-otel-collector into f…
aniketio-ctrl Jun 27, 2025
a07a4ac
chore(magnetic-flea): removed bool for sync jobs
aniketio-ctrl Jun 27, 2025
d8d140c
chore(magnetic-flea): added 2 shards for clickhouse
aniketio-ctrl Jun 27, 2025
90b376e
chore(agg-merge-tree): added aggregation necessary
aniketio-ctrl Jun 29, 2025
003fa41
chore(agg-merge-tree): added aggregation necessary
aniketio-ctrl Jun 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/signozschemamigrator/schema_migrator/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type SchemaMigrationRecord struct {
MigrationID uint64
UpItems []Operation
DownItems []Operation
IsNecessary bool
}

// MigrationManager is the manager for the schema migrations.
Expand Down Expand Up @@ -637,7 +638,7 @@ func (m *MigrationManager) MigrateUpSync(ctx context.Context, upVersions []uint6
continue
}
for _, item := range migration.UpItems {
if !item.IsMutation() && item.IsIdempotent() && item.IsLightweight() {
if (!item.IsMutation() && item.IsIdempotent() && item.IsLightweight()) || migration.IsNecessary {
if err := m.RunOperation(ctx, item, migration.MigrationID, signozMetricsDB, false); err != nil {
return err
}
Expand Down
400 changes: 400 additions & 0 deletions cmd/signozschemamigrator/schema_migrator/metrics_migrations.go

Large diffs are not rendered by default.

26 changes: 17 additions & 9 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type ClickHouseParams struct {
WatcherInterval time.Duration
WriteTSToV4 bool
DisableV2 bool
DisableTTLCache bool
ExporterId uuid.UUID
Settings exporter.Settings
}
Expand Down Expand Up @@ -129,11 +130,14 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
return nil, fmt.Errorf("could not connect to clickhouse: %s", err)
}

cache := ttlcache.New[string, bool](
ttlcache.WithTTL[string, bool](45*time.Minute),
ttlcache.WithDisableTouchOnHit[string, bool](),
)
go cache.Start()
var cache *ttlcache.Cache[string, bool]
if !params.DisableTTLCache {
cache = ttlcache.New[string, bool](
ttlcache.WithTTL[string, bool](45*time.Minute),
ttlcache.WithDisableTouchOnHit[string, bool](),
)
go cache.Start()
}

durationHistogram, err := meter.Float64Histogram(
"exporter_db_write_latency",
Expand Down Expand Up @@ -467,9 +471,11 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr

for fingerprint, labels := range timeSeries {
key := fmt.Sprintf("%d:%d", fingerprint, unixMilli)
if item := ch.cache.Get(key); item != nil {
if value := item.Value(); value {
continue
if ch.cache != nil {
if item := ch.cache.Get(key); item != nil {
if value := item.Value(); value {
continue
}
}
}
encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128)))
Expand All @@ -489,7 +495,9 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
if err != nil {
return err
}
ch.cache.Set(key, true, ttlcache.DefaultTTL)
if ch.cache != nil {
ch.cache.Set(key, true, ttlcache.DefaultTTL)
}
}

start := time.Now()
Expand Down
7 changes: 4 additions & 3 deletions exporter/clickhousemetricsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ type Config struct {

WatcherInterval time.Duration `mapstructure:"watcher_interval"`

WriteTSToV4 bool `mapstructure:"write_ts_to_v4"`
DisableV2 bool `mapstructure:"disable_v2"`
EnableExpHist bool `mapstructure:"enable_exp_hist"`
WriteTSToV4 bool `mapstructure:"write_ts_to_v4"`
DisableV2 bool `mapstructure:"disable_v2"`
EnableExpHist bool `mapstructure:"enable_exp_hist"`
DisableTTLCache bool `mapstructure:"disable_ttl_cache"`
}

// RemoteWriteQueue allows to configure the remote write queue.
Expand Down
1 change: 1 addition & 0 deletions exporter/clickhousemetricsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
WatcherInterval: cfg.WatcherInterval,
WriteTSToV4: cfg.WriteTSToV4,
DisableV2: cfg.DisableV2,
DisableTTLCache: cfg.DisableTTLCache,
ExporterId: id,
Settings: set,
}
Expand All @@ -117,7 +118,7 @@
log.Fatalf("Error creating usage collector for metrics: %v", err)
}

collector.Start()

Check failure on line 121 in exporter/clickhousemetricsexporter/exporter.go

View workflow job for this annotation

GitHub Actions / lint / go

Error return value of `collector.Start` is not checked (errcheck)

if err := view.Register(MetricPointsCountView, MetricPointsBytesView); err != nil {
return nil, err
Expand Down Expand Up @@ -154,7 +155,7 @@
func (prwe *PrwExporter) Shutdown(context.Context) error {
// shutdown usage reporting.
if prwe.usageCollector != nil {
prwe.usageCollector.Stop()

Check failure on line 158 in exporter/clickhousemetricsexporter/exporter.go

View workflow job for this annotation

GitHub Actions / lint / go

Error return value of `prwe.usageCollector.Stop` is not checked (errcheck)
}

close(prwe.closeChan)
Expand Down
1 change: 1 addition & 0 deletions exporter/clickhousemetricsexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,6 @@ func createDefaultConfig() component.Config {
WriteTSToV4: true,
DisableV2: false,
EnableExpHist: false,
DisableTTLCache: false,
}
}
1 change: 1 addition & 0 deletions exporter/signozclickhousemetrics/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Config struct {
TimeSeriesTable string `mapstructure:"time_series_table"`
ExpHistTable string `mapstructure:"exp_hist_table"`
MetadataTable string `mapstructure:"metadata_table"`
DisableTtlCache bool `mapstructure:"disable_ttl_cache"`
}

var _ component.Config = (*Config)(nil)
Expand Down
33 changes: 20 additions & 13 deletions exporter/signozclickhousemetrics/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type clickhouseMetricsExporter struct {
processMetricsDuration metricapi.Float64Histogram
exportMetricsDuration metricapi.Float64Histogram
settings exporter.Settings
disableTtlCache bool
}

// sample represents a single metric sample
Expand Down Expand Up @@ -147,9 +148,14 @@ func WithEnableExpHist(enableExpHist bool) ExporterOption {
}
}

func WithCache(cache *ttlcache.Cache[string, bool]) ExporterOption {
func WithCache(cache *ttlcache.Cache[string, bool], enabled bool) ExporterOption {
return func(e *clickhouseMetricsExporter) error {
e.cache = cache
e.disableTtlCache = enabled
if e.disableTtlCache {
e.cache = nil
} else {
e.cache = cache
}
return nil
}
}
Expand All @@ -176,13 +182,8 @@ func WithSettings(settings exporter.Settings) ExporterOption {
}

func defaultOptions() []ExporterOption {
cache := ttlcache.New(
ttlcache.WithTTL[string, bool](45*time.Minute),
ttlcache.WithDisableTouchOnHit[string, bool](),
)

return []ExporterOption{
WithCache(cache),
WithLogger(zap.NewNop()),
WithEnableExpHist(false),
WithMeter(noop.NewMeterProvider().Meter(internalmetadata.ScopeName)),
Expand Down Expand Up @@ -228,8 +229,10 @@ func NewClickHouseExporter(opts ...ExporterOption) (*clickhouseMetricsExporter,
}

func (c *clickhouseMetricsExporter) Start(ctx context.Context, host component.Host) error {
go c.cache.Start()
c.cacheRunning = true
if c.cache != nil {
go c.cache.Start()
c.cacheRunning = true
}
return nil
}

Expand Down Expand Up @@ -949,9 +952,11 @@ func (c *clickhouseMetricsExporter) writeBatch(ctx context.Context, batch *batch
for _, ts := range timeSeries {
roundedUnixMilli := ts.unixMilli / 3600000 * 3600000
cacheKey := makeCacheKey(ts.fingerprint, uint64(roundedUnixMilli))
if item := c.cache.Get(cacheKey); item != nil {
if value := item.Value(); value {
continue
if c.cache != nil {
if item := c.cache.Get(cacheKey); item != nil {
if value := item.Value(); value {
continue
}
}
}
err = statement.Append(
Expand All @@ -973,7 +978,9 @@ func (c *clickhouseMetricsExporter) writeBatch(ctx context.Context, batch *batch
if err != nil {
return err
}
c.cache.Set(cacheKey, true, ttlcache.DefaultTTL)
if c.cache != nil {
c.cache.Set(cacheKey, true, ttlcache.DefaultTTL)
}
}
return statement.Send()
}
Expand Down
9 changes: 9 additions & 0 deletions exporter/signozclickhousemetrics/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package signozclickhousemetrics
import (
"context"
"errors"
"github.com/jellydator/ttlcache/v3"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -39,13 +41,19 @@ func createMetricsExporter(ctx context.Context, set exporter.Settings,
return nil, err
}

cache := ttlcache.New[string, bool](
ttlcache.WithTTL[string, bool](45*time.Minute),
ttlcache.WithDisableTouchOnHit[string, bool](),
)

chExporter, err := NewClickHouseExporter(
WithConfig(chCfg),
WithConn(conn),
WithLogger(set.Logger),
WithMeter(set.MeterProvider.Meter(internalmetadata.ScopeName)),
WithEnableExpHist(chCfg.EnableExpHist),
WithSettings(set),
WithCache(cache, chCfg.DisableTtlCache),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -82,5 +90,6 @@ func createDefaultConfig() component.Config {
TimeSeriesTable: "distributed_time_series_v4",
ExpHistTable: "distributed_exp_hist",
MetadataTable: "distributed_metadata",
DisableTtlCache: false,
}
}
Loading