Skip to content

Commit

Permalink
Add traces tag attributes v2 with max distinct values limit (#478)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored Dec 12, 2024
1 parent aff7ae4 commit 63faaf0
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 23 deletions.
51 changes: 51 additions & 0 deletions cmd/signozschemamigrator/schema_migrator/traces_migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,4 +536,55 @@ var TracesMigrations = []SchemaMigrationRecord{
// no point of down here as we don't use these
},
},
{
MigrationID: 1004,
UpItems: []Operation{
CreateTableOperation{
Database: "signoz_traces",
Table: "tag_attributes_v2",
Columns: []Column{
{Name: "unix_milli", Type: ColumnTypeInt64, Codec: "Delta(8), ZSTD(1)"},
{Name: "tag_key", Type: ColumnTypeString, Codec: "ZSTD(1)"},
{Name: "tag_type", Type: LowCardinalityColumnType{ColumnTypeString}, Codec: "ZSTD(1)"},
{Name: "tag_data_type", Type: LowCardinalityColumnType{ColumnTypeString}, Codec: "ZSTD(1)"},
{Name: "string_value", Type: ColumnTypeString, Codec: "ZSTD(1)"},
{Name: "number_value", Type: NullableColumnType{ColumnTypeFloat64}, Codec: "ZSTD(1)"},
},
Indexes: []Index{
{Name: "string_value_index", Expression: "string_value", Type: "ngrambf_v1(4, 1024, 3, 0)", Granularity: 1},
{Name: "number_value_index", Expression: "number_value", Type: "minmax", Granularity: 1},
},
Engine: ReplacingMergeTree{
MergeTree: MergeTree{
PartitionBy: "toDate(unix_milli / 1000)",
OrderBy: "(tag_key, tag_type, tag_data_type, string_value, number_value)",
TTL: "toDateTime(unix_milli / 1000) + toIntervalSecond(1296000)",
Settings: TableSettings{
{Name: "index_granularity", Value: "8192"},
{Name: "ttl_only_drop_parts", Value: "1"},
{Name: "allow_nullable_key", Value: "1"},
},
},
},
},
CreateTableOperation{
Database: "signoz_traces",
Table: "distributed_tag_attributes_v2",
Columns: []Column{
{Name: "unix_milli", Type: ColumnTypeInt64, Codec: "Delta(8), ZSTD(1)"},
{Name: "tag_key", Type: ColumnTypeString, Codec: "ZSTD(1)"},
{Name: "tag_type", Type: LowCardinalityColumnType{ColumnTypeString}, Codec: "ZSTD(1)"},
{Name: "tag_data_type", Type: LowCardinalityColumnType{ColumnTypeString}, Codec: "ZSTD(1)"},
{Name: "string_value", Type: ColumnTypeString, Codec: "ZSTD(1)"},
{Name: "number_value", Type: NullableColumnType{ColumnTypeFloat64}, Codec: "ZSTD(1)"},
},
Engine: Distributed{
Database: "signoz_traces",
Table: "tag_attributes_v2",
ShardingKey: "cityHash64(rand())",
},
},
},
DownItems: []Operation{},
},
}
18 changes: 12 additions & 6 deletions exporter/clickhousetracesexporter/clickhouse_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,16 @@ func (f *Factory) CreateSpanWriter() (Writer, error) {
indexTable: cfg.IndexTable,
errorTable: cfg.ErrorTable,
attributeTable: cfg.AttributeTable,
attributeTableV2: cfg.AttributeTableV2,
attributeKeyTable: cfg.AttributeKeyTable,
encoding: cfg.Encoding,
exporterId: cfg.ExporterId,

useNewSchema: cfg.UseNewSchema,
indexTableV3: cfg.IndexTableV3,
resourceTableV3: cfg.ResourceTableV3,
useNewSchema: cfg.UseNewSchema,
indexTableV3: cfg.IndexTableV3,
resourceTableV3: cfg.ResourceTableV3,
maxDistinctValues: cfg.MaxDistinctValues,
fetchKeysInterval: cfg.FetchKeysInterval,
})
}

Expand All @@ -141,13 +144,16 @@ func (f *Factory) CreateArchiveSpanWriter() (Writer, error) {
indexTable: cfg.IndexTable,
errorTable: cfg.ErrorTable,
attributeTable: cfg.AttributeTable,
attributeTableV2: cfg.AttributeTableV2,
attributeKeyTable: cfg.AttributeKeyTable,
encoding: cfg.Encoding,
exporterId: cfg.ExporterId,

useNewSchema: cfg.UseNewSchema,
indexTableV3: cfg.IndexTableV3,
resourceTableV3: cfg.ResourceTableV3,
useNewSchema: cfg.UseNewSchema,
indexTableV3: cfg.IndexTableV3,
resourceTableV3: cfg.ResourceTableV3,
maxDistinctValues: cfg.MaxDistinctValues,
fetchKeysInterval: cfg.FetchKeysInterval,
})
}

Expand Down
8 changes: 8 additions & 0 deletions exporter/clickhousetracesexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@ package clickhousetracesexporter

import (
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

type AttributesLimits struct {
FetchKeysInterval time.Duration `mapstructure:"fetch_keys_interval" default:"10m"`
MaxDistinctValues int `mapstructure:"max_distinct_values" default:"25000"`
}

// Config defines configuration for tracing exporter.
type Config struct {
Options `mapstructure:",squash"`
Expand All @@ -32,6 +38,8 @@ type Config struct {
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
exporterhelper.QueueConfig `mapstructure:"sending_queue"`
UseNewSchema bool `mapstructure:"use_new_schema" default:"false"`

AttributesLimits AttributesLimits `mapstructure:"attributes_limits"`
}

var _ component.Config = (*Config)(nil)
Expand Down
4 changes: 4 additions & 0 deletions exporter/clickhousetracesexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,9 @@ func Test_loadConfig(t *testing.T) {
NumConsumers: 5,
QueueSize: 100,
},
AttributesLimits: AttributesLimits{
FetchKeysInterval: 10 * time.Minute,
MaxDistinctValues: 25000,
},
})
}
5 changes: 5 additions & 0 deletions exporter/clickhousetracesexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package clickhousetracesexporter

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
Expand All @@ -35,6 +36,10 @@ func createDefaultConfig() component.Config {
TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(),
QueueConfig: exporterhelper.NewDefaultQueueConfig(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
AttributesLimits: AttributesLimits{
FetchKeysInterval: 10 * time.Minute,
MaxDistinctValues: 25000,
},
}
}

Expand Down
33 changes: 22 additions & 11 deletions exporter/clickhousetracesexporter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package clickhousetracesexporter

import (
"context"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/google/uuid"
Expand All @@ -30,6 +31,7 @@ const (
defaultErrorTable string = "distributed_signoz_error_index_v2"
defaultSpansTable string = "distributed_signoz_spans"
defaultAttributeTable string = "distributed_span_attributes"
defaultAttributeTableV2 string = "distributed_tag_attributes_v2"
defaultAttributeKeyTable string = "distributed_span_attributes_keys"
DefaultDurationSortTable string = "durationSort"
DefaultDurationSortMVTable string = "durationSortMV"
Expand All @@ -55,6 +57,7 @@ type namespaceConfig struct {
SpansTable string
ErrorTable string
AttributeTable string
AttributeTableV2 string
AttributeKeyTable string
DurationSortTable string
DurationSortMVTable string
Expand All @@ -69,6 +72,8 @@ type namespaceConfig struct {
UseNewSchema bool
IndexTableV3 string
ResourceTableV3 string
MaxDistinctValues int
FetchKeysInterval time.Duration
}

// Connecto defines how to connect to the database
Expand Down Expand Up @@ -129,6 +134,7 @@ func NewOptions(exporterId uuid.UUID, config Config, primaryNamespace string, us
ErrorTable: defaultErrorTable,
SpansTable: defaultSpansTable,
AttributeTable: defaultAttributeTable,
AttributeTableV2: defaultAttributeTableV2,
AttributeKeyTable: defaultAttributeKeyTable,
DurationSortTable: DefaultDurationSortTable,
DurationSortMVTable: DefaultDurationSortMVTable,
Expand All @@ -143,24 +149,29 @@ func NewOptions(exporterId uuid.UUID, config Config, primaryNamespace string, us
UseNewSchema: useNewSchema,
IndexTableV3: defaultIndexTableV3,
ResourceTableV3: defaultResourceTableV3,
MaxDistinctValues: config.AttributesLimits.MaxDistinctValues,
FetchKeysInterval: config.AttributesLimits.FetchKeysInterval,
},
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
}

for _, namespace := range otherNamespaces {
if namespace == archiveNamespace {
options.others[namespace] = &namespaceConfig{
namespace: namespace,
Datasource: datasource,
OperationsTable: "",
IndexTable: "",
SpansTable: defaultArchiveSpansTable,
Encoding: defaultEncoding,
Connector: defaultConnector,
ExporterId: exporterId,
UseNewSchema: useNewSchema,
IndexTableV3: defaultIndexTableV3,
ResourceTableV3: defaultResourceTableV3,
namespace: namespace,
Datasource: datasource,
OperationsTable: "",
IndexTable: "",
SpansTable: defaultArchiveSpansTable,
Encoding: defaultEncoding,
Connector: defaultConnector,
ExporterId: exporterId,
UseNewSchema: useNewSchema,
IndexTableV3: defaultIndexTableV3,
ResourceTableV3: defaultResourceTableV3,
AttributeTableV2: defaultAttributeTableV2,
MaxDistinctValues: config.AttributesLimits.MaxDistinctValues,
FetchKeysInterval: config.AttributesLimits.FetchKeysInterval,
}
} else {
options.others[namespace] = &namespaceConfig{namespace: namespace}
Expand Down
59 changes: 56 additions & 3 deletions exporter/clickhousetracesexporter/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"encoding/json"
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/SigNoz/signoz-otel-collector/usage"
"github.com/SigNoz/signoz-otel-collector/utils"
"github.com/google/uuid"
"github.com/jellydator/ttlcache/v3"
"go.opencensus.io/stats"
Expand All @@ -46,6 +48,14 @@ const (
EncodingProto Encoding = "protobuf"
)

type shouldSkipKey struct {
TagKey string `ch:"tag_key"`
TagType string `ch:"tag_type"`
TagDataType string `ch:"tag_data_type"`
StringCount uint64 `ch:"string_count"`
NumberCount uint64 `ch:"number_count"`
}

// SpanWriter for writing spans to ClickHouse
type SpanWriter struct {
logger *zap.Logger
Expand All @@ -55,6 +65,7 @@ type SpanWriter struct {
errorTable string
spansTable string
attributeTable string
attributeTableV2 string
attributeKeyTable string
encoding Encoding
exporterId uuid.UUID
Expand All @@ -65,6 +76,12 @@ type SpanWriter struct {

keysCache *ttlcache.Cache[string, struct{}]
rfCache *ttlcache.Cache[string, struct{}]

shouldSkipKeyValue atomic.Value // stores map[string]shouldSkipKey

maxDistinctValues int
fetchKeysInterval time.Duration
fetchShouldSkipKeysTicker *time.Ticker
}

type WriterOptions struct {
Expand All @@ -75,13 +92,16 @@ type WriterOptions struct {
indexTable string
errorTable string
attributeTable string
attributeTableV2 string
attributeKeyTable string
encoding Encoding
exporterId uuid.UUID

indexTableV3 string
resourceTableV3 string
useNewSchema bool
indexTableV3 string
resourceTableV3 string
useNewSchema bool
maxDistinctValues int
fetchKeysInterval time.Duration
}

// NewSpanWriter returns a SpanWriter for the database
Expand Down Expand Up @@ -115,6 +135,7 @@ func NewSpanWriter(options WriterOptions) *SpanWriter {
errorTable: options.errorTable,
spansTable: options.spansTable,
attributeTable: options.attributeTable,
attributeTableV2: options.attributeTableV2,
attributeKeyTable: options.attributeKeyTable,
encoding: options.encoding,
exporterId: options.exporterId,
Expand All @@ -124,11 +145,43 @@ func NewSpanWriter(options WriterOptions) *SpanWriter {
useNewSchema: options.useNewSchema,
keysCache: keysCache,
rfCache: rfCache,

maxDistinctValues: options.maxDistinctValues,
fetchKeysInterval: options.fetchKeysInterval,
fetchShouldSkipKeysTicker: time.NewTicker(options.fetchKeysInterval),
}

go writer.fetchShouldSkipKeys()
return writer
}

func (e *SpanWriter) fetchShouldSkipKeys() {
for range e.fetchShouldSkipKeysTicker.C {
query := fmt.Sprintf(`
SELECT tag_key, tag_type, tag_data_type, countDistinct(string_value) as string_count, countDistinct(number_value) as number_count
FROM %s.%s
GROUP BY tag_key, tag_type, tag_data_type
HAVING string_count > %d OR number_count > %d`, e.traceDatabase, e.attributeTableV2, e.maxDistinctValues, e.maxDistinctValues)

e.logger.Info("fetching should skip keys", zap.String("query", query))

keys := []shouldSkipKey{}

err := e.db.Select(context.Background(), &keys, query)
if err != nil {
e.logger.Error("error while fetching should skip keys", zap.Error(err))
}

shouldSkipKeys := make(map[string]shouldSkipKey)
for _, key := range keys {
mapKey := utils.MakeKeyForAttributeKeys(key.TagKey, utils.TagType(key.TagType), utils.TagDataType(key.TagDataType))
e.logger.Debug("adding to should skip keys", zap.String("key", mapKey), zap.Any("string_count", key.StringCount), zap.Any("number_count", key.NumberCount))
shouldSkipKeys[mapKey] = key
}
e.shouldSkipKeyValue.Store(shouldSkipKeys)
}
}

func (w *SpanWriter) writeIndexBatch(ctx context.Context, batchSpans []*Span) error {
var statement driver.Batch
var err error
Expand Down
Loading

0 comments on commit 63faaf0

Please sign in to comment.