diff --git a/exporter/signozclickhousemetrics/exporter.go b/exporter/signozclickhousemetrics/exporter.go index 7f9307da..590cde2f 100644 --- a/exporter/signozclickhousemetrics/exporter.go +++ b/exporter/signozclickhousemetrics/exporter.go @@ -8,9 +8,11 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" internalmetadata "github.com/SigNoz/signoz-otel-collector/exporter/signozclickhousemetrics/internal/metadata" + chutils "github.com/SigNoz/signoz-otel-collector/pkg/clickhouse" "github.com/SigNoz/signoz-otel-collector/usage" "github.com/google/uuid" "go.opencensus.io/stats" @@ -72,6 +74,10 @@ type clickhouseMetricsExporter struct { exporterID uuid.UUID closeChan chan struct{} + + minAcceptedTs atomic.Value + fetchShouldUpdateMinAcceptedTsTicker *time.Ticker + ttlParser *chutils.TTLParser } // sample represents a single metric sample @@ -259,6 +265,7 @@ func NewClickHouseExporter(opts ...ExporterOption) (*clickhouseMetricsExporter, } } chExporter.closeChan = make(chan struct{}) + chExporter.ttlParser = chutils.NewTTLParser(chExporter.logger) return chExporter, nil } @@ -266,6 +273,16 @@ func NewClickHouseExporter(opts ...ExporterOption) (*clickhouseMetricsExporter, func (c *clickhouseMetricsExporter) Start(ctx context.Context, host component.Host) error { go c.cache.Start() c.cacheRunning = true + + c.fetchShouldUpdateMinAcceptedTsTicker = time.NewTicker(10 * time.Minute) + + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.updateMinAcceptedTs() + c.fetchShouldUpdateMinAcceptedTs() + }() + return nil } @@ -273,6 +290,9 @@ func (c *clickhouseMetricsExporter) Shutdown(ctx context.Context) error { if c.cacheRunning { c.cache.Stop() } + if c.fetchShouldUpdateMinAcceptedTsTicker != nil { + c.fetchShouldUpdateMinAcceptedTsTicker.Stop() + } if c.usageCollector != nil { err := c.usageCollector.Stop() if err != nil { @@ -284,6 +304,37 @@ func (c *clickhouseMetricsExporter) Shutdown(ctx context.Context) error { return c.conn.Close() } +func (c *clickhouseMetricsExporter) updateMinAcceptedTs() { + c.logger.Info("Updating min accepted ts") + + delTTL := c.ttlParser.GetTableTTLDays(context.Background(), c.conn, c.cfg.Database, c.cfg.SamplesTable) + + if delTTL == 0 { + // no ttl for whatever reason, we won't filter any data + // if it was because of error we would rather want no filter than make false + // assumption + c.logger.Info("no TTL for table, no filtering") + c.minAcceptedTs.Store(uint64(0)) + return + } + + seconds := delTTL * 24 * 60 * 60 + acceptedDateTime := time.Now().Add(-time.Duration(seconds) * time.Second) + c.minAcceptedTs.Store(uint64(acceptedDateTime.UnixNano())) + c.logger.Info("added min accepted ts", zap.Uint64("ttl_days", delTTL), zap.Time("min_accepted_time", acceptedDateTime)) +} + +func (c *clickhouseMetricsExporter) fetchShouldUpdateMinAcceptedTs() { + for { + select { + case <-c.closeChan: + return + case <-c.fetchShouldUpdateMinAcceptedTsTicker.C: + c.updateMinAcceptedTs() + } + } +} + // processGauge processes gauge metrics func (c *clickhouseMetricsExporter) processGauge(batch *batch, metric pmetric.Metric, env string, resourceFingerprint, scopeFingerprint *pkgfingerprint.Fingerprint) { name := metric.Name() @@ -316,6 +367,17 @@ func (c *clickhouseMetricsExporter) processGauge(batch *batch, metric pmetric.Me } unixMilli := dp.Timestamp().AsTime().UnixMilli() + if c.minAcceptedTs.Load() != nil { + minAcceptedNs := c.minAcceptedTs.Load().(uint64) + if minAcceptedNs > 0 { + oldestAllowedTs := int64(minAcceptedNs / 1e6) + if unixMilli < oldestAllowedTs { + c.logger.Debug("skipping old metric data point", zap.String("metric", name), zap.Int64("ts", unixMilli), zap.Int64("oldestAllowedTs", oldestAllowedTs)) + continue + } + } + } + fingerprint := pkgfingerprint.NewFingerprint(pkgfingerprint.PointFingerprintType, scopeFingerprint.Hash(), dp.Attributes(), map[string]string{ "__temporality__": temporality.String(), }) @@ -379,6 +441,18 @@ func (c *clickhouseMetricsExporter) processSum(batch *batch, metric pmetric.Metr continue } unixMilli := dp.Timestamp().AsTime().UnixMilli() + + if c.minAcceptedTs.Load() != nil { + minAcceptedNs := c.minAcceptedTs.Load().(uint64) + if minAcceptedNs > 0 { + oldestAllowedTs := int64(minAcceptedNs / 1e6) + if unixMilli < oldestAllowedTs { + c.logger.Debug("skipping old metric data point", zap.String("metric", name), zap.Int64("ts", unixMilli), zap.Int64("oldestAllowedTs", oldestAllowedTs)) + continue + } + } + } + fingerprint := pkgfingerprint.NewFingerprint(pkgfingerprint.PointFingerprintType, scopeFingerprint.Hash(), dp.Attributes(), map[string]string{ "__temporality__": temporality.String(), }) @@ -426,6 +500,18 @@ func (c *clickhouseMetricsExporter) processHistogram(b *batch, metric pmetric.Me addSample := func(batch *batch, dp pmetric.HistogramDataPoint, suffix string) { unixMilli := dp.Timestamp().AsTime().UnixMilli() + + if c.minAcceptedTs.Load() != nil { + minAcceptedNs := c.minAcceptedTs.Load().(uint64) + if minAcceptedNs > 0 { + oldestAllowedTs := int64(minAcceptedNs / 1e6) + if unixMilli < oldestAllowedTs { + c.logger.Debug("skipping old metric data point", zap.String("metric", name), zap.Int64("ts", unixMilli), zap.Int64("oldestAllowedTs", oldestAllowedTs)) + return + } + } + } + sampleTyp := typ sampleTemporality := temporality var value float64 @@ -480,6 +566,18 @@ func (c *clickhouseMetricsExporter) processHistogram(b *batch, metric pmetric.Me addBucketSample := func(batch *batch, dp pmetric.HistogramDataPoint, suffix string) { var cumulativeCount uint64 unixMilli := dp.Timestamp().AsTime().UnixMilli() + + if c.minAcceptedTs.Load() != nil { + minAcceptedNs := c.minAcceptedTs.Load().(uint64) + if minAcceptedNs > 0 { + oldestAllowedTs := int64(minAcceptedNs / 1e6) + if unixMilli < oldestAllowedTs { + c.logger.Debug("skipping old metric data point", zap.String("metric", name), zap.Int64("ts", unixMilli), zap.Int64("oldestAllowedTs", oldestAllowedTs)) + return + } + } + } + pointAttrs := dp.Attributes() for i := 0; i < dp.ExplicitBounds().Len() && i < dp.BucketCounts().Len(); i++ { @@ -611,6 +709,18 @@ func (c *clickhouseMetricsExporter) processSummary(b *batch, metric pmetric.Metr addSample := func(batch *batch, dp pmetric.SummaryDataPoint, suffix string) { unixMilli := dp.Timestamp().AsTime().UnixMilli() + + if c.minAcceptedTs.Load() != nil { + minAcceptedNs := c.minAcceptedTs.Load().(uint64) + if minAcceptedNs > 0 { + oldestAllowedTs := int64(minAcceptedNs / 1e6) + if unixMilli < oldestAllowedTs { + c.logger.Debug("skipping old metric data point", zap.String("metric", name), zap.Int64("ts", unixMilli), zap.Int64("oldestAllowedTs", oldestAllowedTs)) + return + } + } + } + sampleTyp := typ var value float64 switch suffix { @@ -655,6 +765,18 @@ func (c *clickhouseMetricsExporter) processSummary(b *batch, metric pmetric.Metr addQuantileSample := func(batch *batch, dp pmetric.SummaryDataPoint, suffix string) { unixMilli := dp.Timestamp().AsTime().UnixMilli() + + if c.minAcceptedTs.Load() != nil { + minAcceptedNs := c.minAcceptedTs.Load().(uint64) + if minAcceptedNs > 0 { + oldestAllowedTs := int64(minAcceptedNs / 1e6) + if unixMilli < oldestAllowedTs { + c.logger.Debug("skipping old metric data point", zap.String("metric", name), zap.Int64("ts", unixMilli), zap.Int64("oldestAllowedTs", oldestAllowedTs)) + return + } + } + } + for i := 0; i < dp.QuantileValues().Len(); i++ { quantile := dp.QuantileValues().At(i) quantileStr := strconv.FormatFloat(quantile.Quantile(), 'f', -1, 64) @@ -756,6 +878,18 @@ func (c *clickhouseMetricsExporter) processExponentialHistogram(b *batch, metric addSample := func(batch *batch, dp pmetric.ExponentialHistogramDataPoint, suffix string) { unixMilli := dp.Timestamp().AsTime().UnixMilli() + + if c.minAcceptedTs.Load() != nil { + minAcceptedNs := c.minAcceptedTs.Load().(uint64) + if minAcceptedNs > 0 { + oldestAllowedTs := int64(minAcceptedNs / 1e6) + if unixMilli < oldestAllowedTs { + c.logger.Debug("skipping old metric data point", zap.String("metric", name), zap.Int64("ts", unixMilli), zap.Int64("oldestAllowedTs", oldestAllowedTs)) + return + } + } + } + sampleTyp := typ sampleTemporality := temporality var value float64 @@ -822,6 +956,18 @@ func (c *clickhouseMetricsExporter) processExponentialHistogram(b *batch, metric addDDSketchSample := func(batch *batch, dp pmetric.ExponentialHistogramDataPoint) { unixMilli := dp.Timestamp().AsTime().UnixMilli() + + if c.minAcceptedTs.Load() != nil { + minAcceptedNs := c.minAcceptedTs.Load().(uint64) + if minAcceptedNs > 0 { + oldestAllowedTs := int64(minAcceptedNs / 1e6) + if unixMilli < oldestAllowedTs { + c.logger.Debug("skipping old metric data point", zap.String("metric", name), zap.Int64("ts", unixMilli), zap.Int64("oldestAllowedTs", oldestAllowedTs)) + return + } + } + } + positive := toStore(dp.Positive()) negative := toStore(dp.Negative()) gamma := math.Pow(2, math.Pow(2, float64(-dp.Scale()))) @@ -964,7 +1110,19 @@ func (c *clickhouseMetricsExporter) PushMetrics(ctx context.Context, md pmetric. case <-c.closeChan: return errors.New("shutdown has been called") default: - return c.writeBatch(ctx, c.prepareBatch(ctx, md)) + err := c.writeBatch(ctx, c.prepareBatch(ctx, md)) + if err != nil { + // we try to remove metrics older than the retention period + // but if we get too many partitions for single INSERT block, we will drop the data + // Check if this is a ClickHouse exception with error code 252 (TOO_MANY_PARTS) + var chErr *clickhouse.Exception + if errors.As(err, &chErr) && chErr.Code == 252 { + c.logger.Warn("too many partitions for single INSERT block, dropping the batch") + return nil + } + return err + } + return nil } } diff --git a/go.mod b/go.mod index 160a6bef..3c1dfea8 100644 --- a/go.mod +++ b/go.mod @@ -272,6 +272,7 @@ require ( filippo.io/edwards25519 v1.1.0 // indirect github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect github.com/99designs/keyring v1.2.2 // indirect + github.com/AfterShip/clickhouse-sql-parser v0.4.13 // indirect github.com/AthenZ/athenz v1.12.13 // indirect github.com/Azure/azure-amqp-common-go/v4 v4.2.0 // indirect github.com/Azure/azure-event-hubs-go/v3 v3.6.2 // indirect diff --git a/go.sum b/go.sum index 3851f1f1..8f3a8e7f 100644 --- a/go.sum +++ b/go.sum @@ -631,6 +631,8 @@ github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMb github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4= github.com/99designs/keyring v1.2.2 h1:pZd3neh/EmUzWONb35LxQfvuY7kiSXAq3HQd97+XBn0= github.com/99designs/keyring v1.2.2/go.mod h1:wes/FrByc8j7lFOAGLGSNEg8f/PaI3cgTBqhFkHUrPk= +github.com/AfterShip/clickhouse-sql-parser v0.4.13 h1:mu7oB33REsIKe5FQdFXbasftCR/a0lCwX+usCq/QwdQ= +github.com/AfterShip/clickhouse-sql-parser v0.4.13/go.mod h1:W0Z82wJWkJxz2RVun/RMwxue3g7ut47Xxl+SFqdJGus= github.com/AthenZ/athenz v1.12.13 h1:OhZNqZsoBXNrKBJobeUUEirPDnwt0HRo4kQMIO1UwwQ= github.com/AthenZ/athenz v1.12.13/go.mod h1:XXDXXgaQzXaBXnJX6x/bH4yF6eon2lkyzQZ0z/dxprE= github.com/Azure/azure-amqp-common-go/v4 v4.2.0 h1:q/jLx1KJ8xeI8XGfkOWMN9XrXzAfVTkyvCxPvHCjd2I= diff --git a/pkg/clickhouse/ttl.go b/pkg/clickhouse/ttl.go new file mode 100644 index 00000000..1caa6cc9 --- /dev/null +++ b/pkg/clickhouse/ttl.go @@ -0,0 +1,176 @@ +package clickhouse + +import ( + "context" + "fmt" + "strconv" + "strings" + + "github.com/AfterShip/clickhouse-sql-parser/parser" + "github.com/ClickHouse/clickhouse-go/v2" + "go.uber.org/zap" +) + +type TTLResponse struct { + EngineFull string `ch:"engine_full"` +} + +type TTLParser struct { + logger *zap.Logger +} + +func NewTTLParser(logger *zap.Logger) *TTLParser { + return &TTLParser{ + logger: logger, + } +} + +func (p *TTLParser) GetTableTTLDays(ctx context.Context, conn clickhouse.Conn, database, tableName string) uint64 { + if conn == nil { + p.logger.Debug("no database connection available, cannot determine TTL") + return 0 + } + + var dbResp []TTLResponse + query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE database = '%s' AND name = '%s'", database, tableName) + + err := conn.Select(ctx, &dbResp, query) + if err != nil { + p.logger.Error("error while fetching ttl from database", + zap.Error(err), zap.String("database", database), zap.String("table", tableName)) + return 0 + } + + if len(dbResp) == 0 { + p.logger.Warn("table not found in system.tables", zap.String("database", database), zap.String("table", tableName)) + return 0 + } + + ttlDays := p.ParseTTLFromEngineFull(dbResp[0].EngineFull) + if ttlDays > 0 { + p.logger.Info("TTL configured for table", + zap.String("database", database), zap.String("table", tableName), zap.Uint64("ttl_days", ttlDays)) + return ttlDays + } + + p.logger.Info("TTL not configured for table", zap.String("database", database), zap.String("table", tableName)) + return 0 +} + +func (p *TTLParser) ParseTTLFromEngineFull(engineFull string) uint64 { + // engine_full for parsing + sqlStatement := "CREATE TABLE dummy_and_dumb (id Int32) ENGINE = " + engineFull + + clickhouseParser := parser.NewParser(sqlStatement) + stmts, err := clickhouseParser.ParseStmts() + if err != nil { + p.logger.Debug("failed to parse engine_full string", + zap.Error(err), + zap.String("engine_full", engineFull)) + return 0 + } + + if len(stmts) == 0 { + p.logger.Debug("no statements found in engine_full string", + zap.String("engine_full", engineFull)) + return 0 + } + + for _, stmt := range stmts { + if createTable, ok := stmt.(*parser.CreateTable); ok { + if createTable.Engine != nil && createTable.Engine.TTL != nil { + p.logger.Debug("TTL clause found in engine_full string", + zap.String("engine_full", engineFull)) + return p.extractTTLFromClause(createTable.Engine.TTL) + } + } + } + + return 0 +} + +func (p *TTLParser) extractTTLFromClause(ttlClause *parser.TTLClause) uint64 { + if ttlClause == nil || len(ttlClause.Items) == 0 { + return 0 + } + + ttlExpr := ttlClause.Items[0] + if ttlExpr == nil || ttlExpr.Expr == nil { + return 0 + } + + exprStr := ttlExpr.Expr.String() + return p.extractIntervalFromString(exprStr) +} + +func (p *TTLParser) extractIntervalFromString(exprStr string) uint64 { + // toIntervalXXX functions in the expression + intervalFunctions := []struct { + funcName string + intervalType string + }{ + {"toIntervalSecond", "Second"}, + {"toIntervalMinute", "Minute"}, + {"toIntervalHour", "Hour"}, + {"toIntervalDay", "Day"}, + {"toIntervalWeek", "Week"}, + {"toIntervalMonth", "Month"}, + {"toIntervalYear", "Year"}, + } + + for _, intervalFunc := range intervalFunctions { + if strings.Contains(exprStr, intervalFunc.funcName) { + if value := p.extractNumericValueFromFunction(exprStr, intervalFunc.funcName); value > 0 { + return p.convertToDays(intervalFunc.intervalType, value) + } + } + } + + return 0 +} + +func (p *TTLParser) extractNumericValueFromFunction(exprStr, funcName string) uint64 { + // dumb string manipulation + startIdx := strings.Index(exprStr, funcName+"(") + if startIdx == -1 { + return 0 + } + + startIdx += len(funcName) + 1 + endIdx := strings.Index(exprStr[startIdx:], ")") + if endIdx == -1 { + return 0 + } + + paramStr := strings.TrimSpace(exprStr[startIdx : startIdx+endIdx]) + if value, err := strconv.ParseUint(paramStr, 10, 64); err == nil { + return value + } + + return 0 +} + +func (p *TTLParser) convertToDays(intervalType string, value uint64) uint64 { + if value == 0 { + return 0 + } + + switch intervalType { + case "Second": + return value / (24 * 60 * 60) + case "Minute": + return value / (24 * 60) + case "Hour": + return value / 24 + case "Day": + return value + case "Week": + return value * 7 + case "Month": + return value * 30 + case "Year": + return value * 365 + default: + return 0 + } +} diff --git a/pkg/clickhouse/ttl_test.go b/pkg/clickhouse/ttl_test.go new file mode 100644 index 00000000..9548ffe5 --- /dev/null +++ b/pkg/clickhouse/ttl_test.go @@ -0,0 +1,225 @@ +package clickhouse + +import ( + "context" + "errors" + "testing" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/column" + cmock "github.com/srikanthccv/ClickHouse-go-mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" +) + +func TestTTLParser_ParseTTLFromEngineFull(t *testing.T) { + logger := zap.NewNop() + parser := NewTTLParser(logger) + + tests := []struct { + name string + engineFull string + expected uint64 + }{ + { + name: "toIntervalSecond - 30 days", + engineFull: "ReplicatedReplacingMergeTree('/clickhouse/tables/a84a78ab-54b5-4b61-ab73-8b92b96d6871/{shard}', '{replica}') PARTITION BY toDate(unix_milli / 1000) ORDER BY (env, temporality, metric_name, fingerprint, unix_milli) TTL toDateTime(unix_milli / 1000) + toIntervalSecond(2592000) SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1", + expected: 30, + }, + { + name: "toIntervalDay - 7 days", + engineFull: "MergeTree() PARTITION BY toDate(timestamp / 1000000000) ORDER BY (timestamp, id) TTL toDateTime(timestamp / 1000000000) + toIntervalDay(7) SETTINGS index_granularity = 8192", + expected: 7, + }, + { + name: "toIntervalHour - 48 hours = 2 days", + engineFull: "MergeTree() TTL toDateTime(ts) + toIntervalHour(48) SETTINGS index_granularity = 8192", + expected: 2, + }, + { + name: "toIntervalWeek - 2 weeks = 14 days", + engineFull: "MergeTree() TTL toDateTime(ts) + toIntervalWeek(2) SETTINGS index_granularity = 8192", + expected: 14, + }, + { + name: "toIntervalMonth - 3 months = 90 days", + engineFull: "MergeTree() TTL toDateTime(ts) + toIntervalMonth(3) SETTINGS index_granularity = 8192", + expected: 90, + }, + { + name: "no TTL", + engineFull: "MergeTree() PARTITION BY toDate(timestamp) ORDER BY (timestamp, id) SETTINGS index_granularity = 8192", + expected: 0, + }, + { + name: "empty string", + engineFull: "", + expected: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := parser.ParseTTLFromEngineFull(tt.engineFull) + if result != tt.expected { + t.Errorf("ParseTTLFromEngineFull() = %v, want %v", result, tt.expected) + } + }) + } +} + +func TestTTLParser_GetTableTTLDays_WithMock(t *testing.T) { + observedZapCore, observedLogs := observer.New(zap.DebugLevel) + observedLogger := zap.New(observedZapCore) + parser := NewTTLParser(observedLogger) + + tests := []struct { + name string + database string + tableName string + setupMock func(cmock.ClickConnMockCommon) + expected uint64 + expectedLogMessages []string + expectedLogLevel zapcore.Level + }{ + { + name: "successful TTL query with 7 days", + database: "test_db", + tableName: "test_table", + setupMock: func(mock cmock.ClickConnMockCommon) { + columns := []cmock.ColumnType{ + {Name: "engine_full", Type: column.Type("String")}, + } + values := [][]any{ + {"MergeTree() TTL toDateTime(ts) + toIntervalDay(7) SETTINGS index_granularity = 8192"}, + } + rows := cmock.NewRows(columns, values) + mock.ExpectSelect("SELECT engine_full FROM system.tables WHERE database = 'test_db' AND name = 'test_table'"). + WillReturnRows(rows) + }, + expected: 7, + expectedLogMessages: []string{"TTL configured for table"}, + expectedLogLevel: zapcore.InfoLevel, + }, + { + name: "successful TTL query with 30 days from seconds", + database: "metrics_db", + tableName: "samples", + setupMock: func(mock cmock.ClickConnMockCommon) { + columns := []cmock.ColumnType{ + {Name: "engine_full", Type: column.Type("String")}, + } + values := [][]any{ + {"ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}', '{replica}') PARTITION BY toDate(unix_milli / 1000) ORDER BY (env, temporality, metric_name, fingerprint, unix_milli) TTL toDateTime(unix_milli / 1000) + toIntervalSecond(2592000) SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1"}, + } + rows := cmock.NewRows(columns, values) + mock.ExpectSelect("SELECT engine_full FROM system.tables WHERE database = 'metrics_db' AND name = 'samples'"). + WillReturnRows(rows) + }, + expected: 30, // 2592000s = 30d + expectedLogMessages: []string{"TTL configured for table"}, + expectedLogLevel: zapcore.InfoLevel, + }, + { + name: "table not found", + database: "nonexistent_db", + tableName: "nonexistent_table", + setupMock: func(mock cmock.ClickConnMockCommon) { + columns := []cmock.ColumnType{ + {Name: "engine_full", Type: column.Type("String")}, + } + values := [][]any{} + rows := cmock.NewRows(columns, values) + mock.ExpectSelect("SELECT engine_full FROM system.tables WHERE database = 'nonexistent_db' AND name = 'nonexistent_table'"). + WillReturnRows(rows) + }, + expected: 0, + expectedLogMessages: []string{"table not found in system.tables"}, + expectedLogLevel: zapcore.WarnLevel, + }, + { + name: "no TTL configured", + database: "test_db", + tableName: "no_ttl_table", + setupMock: func(mock cmock.ClickConnMockCommon) { + columns := []cmock.ColumnType{ + {Name: "engine_full", Type: column.Type("String")}, + } + values := [][]any{ + {"MergeTree() PARTITION BY toDate(timestamp) ORDER BY (timestamp, id) SETTINGS index_granularity = 8192"}, + } + rows := cmock.NewRows(columns, values) + mock.ExpectSelect("SELECT engine_full FROM system.tables WHERE database = 'test_db' AND name = 'no_ttl_table'"). + WillReturnRows(rows) + }, + expected: 0, + expectedLogMessages: []string{"TTL not configured for table"}, + expectedLogLevel: zapcore.InfoLevel, + }, + { + name: "database error", + database: "error_db", + tableName: "error_table", + setupMock: func(mock cmock.ClickConnMockCommon) { + mock.ExpectSelect("SELECT engine_full FROM system.tables WHERE database = 'error_db' AND name = 'error_table'"). + WillReturnError(errors.New("database connection failed")) + }, + expected: 0, + expectedLogMessages: []string{"error while fetching ttl from database"}, + expectedLogLevel: zapcore.ErrorLevel, + }, + { + name: "nil connection", + database: "test_db", + tableName: "test_table", + setupMock: func(mock cmock.ClickConnMockCommon) {}, + expected: 0, + expectedLogMessages: []string{"no database connection available, cannot determine TTL"}, + expectedLogLevel: zapcore.DebugLevel, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + observedLogs.TakeAll() + + var conn clickhouse.Conn + + if tt.name != "nil connection" { + mockConn, err := cmock.NewClickHouseNative(nil) + require.NoError(t, err) + tt.setupMock(mockConn) + conn = mockConn + defer func() { + err := mockConn.ExpectationsWereMet() + require.NoError(t, err) + }() + } + + result := parser.GetTableTTLDays(context.Background(), conn, tt.database, tt.tableName) + require.Equal(t, tt.expected, result) + + logs := observedLogs.All() + require.NotEmpty(t, logs, "Expected logs but got none") + + foundExpectedLog := false + for _, log := range logs { + for _, expectedMsg := range tt.expectedLogMessages { + if log.Level == tt.expectedLogLevel && + len(log.Message) > 0 && + (log.Message == expectedMsg || (len(expectedMsg) > 0 && log.Message[:len(expectedMsg)] == expectedMsg)) { + foundExpectedLog = true + break + } + } + if foundExpectedLog { + break + } + } + require.True(t, foundExpectedLog, "Expected log message '%v' with level %v not found in logs: %v", + tt.expectedLogMessages, tt.expectedLogLevel, logs) + }) + } +}