diff --git a/cmd/jaeger/internal/integration/clickhouse_test.go b/cmd/jaeger/internal/integration/clickhouse_test.go index fe96e57243b..d8068366647 100644 --- a/cmd/jaeger/internal/integration/clickhouse_test.go +++ b/cmd/jaeger/internal/integration/clickhouse_test.go @@ -16,17 +16,15 @@ func TestClickHouseStorage(t *testing.T) { StorageIntegration: integration.StorageIntegration{ CleanUp: purge, SkipList: []string{ - // Tag-related tests are temporarily skipped pending the redesign of - // attribute handling in ClickHouse storage to support typed attributes. - "Tags_in_one_spot_-_Tags", + // The following tests are skipped because ClickHouse does not support + // querying event attributes yet. "Tags_in_one_spot_-_Logs", - "Tags_in_one_spot_-_Process", "Tags_in_different_spots", - "Tags_+_Operation_name", - "Tags_+_Operation_name_+_max_Duration", "Tags_+_Operation_name_+_Duration_range", - "Tags_+_Duration_range", - "Tags_+_max_Duration", + "Multi-spot_Tags_+_Operation_name", + "Multi-spot_Tags_+_Operation_name_+_max_Duration", + "Multi-spot_Tags_+_Duration_range", + "Multi-spot_Tags_+_max_Duration", }, }, } diff --git a/internal/storage/v2/clickhouse/sql/queries.go b/internal/storage/v2/clickhouse/sql/queries.go index a9827359f19..51b05d3f877 100644 --- a/internal/storage/v2/clickhouse/sql/queries.go +++ b/internal/storage/v2/clickhouse/sql/queries.go @@ -254,6 +254,14 @@ WHERE GROUP BY name, span_kind ` +const SelectAttributeMetadata = ` +SELECT + attribute_key, + type, + level +FROM + attribute_metadata` + const TruncateSpans = `TRUNCATE TABLE spans` const TruncateServices = `TRUNCATE TABLE services` diff --git a/internal/storage/v2/clickhouse/tracestore/attribute_metadata.go b/internal/storage/v2/clickhouse/tracestore/attribute_metadata.go new file mode 100644 index 00000000000..09fdf38265f --- /dev/null +++ b/internal/storage/v2/clickhouse/tracestore/attribute_metadata.go @@ -0,0 +1,84 @@ +// Copyright (c) 2026 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package tracestore + +import ( + "context" + "fmt" + "strings" + + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/jaegertracing/jaeger/internal/storage/v2/clickhouse/sql" + "github.com/jaegertracing/jaeger/internal/storage/v2/clickhouse/tracestore/dbmodel" +) + +// attributeMetadata maps attribute keys to levels, and each level to a list of types. +// Structure: attributeMetadata[key][level] = []type +// Example: attributeMetadata["http.status"]["span"] = ["int", "str"] +type attributeMetadata map[string]map[string][]string + +// getAttributeMetadata retrieves the types stored in ClickHouse for string attributes. +// +// The query service forwards all attribute filters as strings (via AsString()), regardless +// of their actual type. For example: +// - A bool attribute stored as true becomes the string "true" +// - An int attribute stored as 123 becomes the string "123" +// +// To query ClickHouse correctly, we need to: +// 1. Look up the actual type(s) from the attribute_metadata table +// 2. Convert the string back to the original type +// 3. Query the appropriate typed column (bool_attributes, int_attributes, etc.) +// +// Since attributes can be stored with different types across different spans +// (e.g. "http.status" could be an int in one span and a string in another), +// the metadata can return multiple types for a single key. We build OR conditions +// to match any of the possible types. +// +// Only string-typed attributes from the query are looked up, since other types +// (bool, int, double, etc.) are already correctly typed in the query parameters. +func (r *Reader) getAttributeMetadata(ctx context.Context, attributes pcommon.Map) (attributeMetadata, error) { + query, args := buildSelectAttributeMetadataQuery(attributes) + rows, err := r.conn.Query(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("failed to query attribute metadata: %w", err) + } + defer rows.Close() + + metadata := make(attributeMetadata) + for rows.Next() { + var attrMeta dbmodel.AttributeMetadata + if err := rows.ScanStruct(&attrMeta); err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + + if metadata[attrMeta.AttributeKey] == nil { + metadata[attrMeta.AttributeKey] = make(map[string][]string) + } + metadata[attrMeta.AttributeKey][attrMeta.Level] = append(metadata[attrMeta.AttributeKey][attrMeta.Level], attrMeta.Type) + } + return metadata, nil +} + +func buildSelectAttributeMetadataQuery(attributes pcommon.Map) (string, []any) { + var q strings.Builder + q.WriteString(sql.SelectAttributeMetadata) + args := []any{} + + var placeholders []string + for key, attr := range attributes.All() { + if attr.Type() == pcommon.ValueTypeStr { + placeholders = append(placeholders, "?") + args = append(args, key) + } + } + + if len(placeholders) > 0 { + q.WriteString(" WHERE attribute_key IN (") + q.WriteString(strings.Join(placeholders, ", ")) + q.WriteString(")") + } + q.WriteString(" GROUP BY attribute_key, type, level") + return q.String(), args +} diff --git a/internal/storage/v2/clickhouse/tracestore/attribute_metadata_test.go b/internal/storage/v2/clickhouse/tracestore/attribute_metadata_test.go new file mode 100644 index 00000000000..9048c8f8eea --- /dev/null +++ b/internal/storage/v2/clickhouse/tracestore/attribute_metadata_test.go @@ -0,0 +1,69 @@ +// Copyright (c) 2026 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package tracestore + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/jaegertracing/jaeger/internal/storage/v2/clickhouse/sql" + "github.com/jaegertracing/jaeger/internal/storage/v2/clickhouse/tracestore/dbmodel" +) + +func TestGetAttributeMetadata_ErrorCases(t *testing.T) { + attrs := pcommon.NewMap() + attrs.PutStr("http.method", "GET") + + tests := []struct { + name string + driver *testDriver + expectedErr string + }{ + { + name: "QueryError", + driver: &testDriver{ + t: t, + queryResponses: map[string]*testQueryResponse{ + sql.SelectAttributeMetadata: { + rows: nil, + err: assert.AnError, + }, + }, + }, + expectedErr: "failed to query attribute metadata", + }, + { + name: "ScanStructError", + driver: &testDriver{ + t: t, + queryResponses: map[string]*testQueryResponse{ + sql.SelectAttributeMetadata: { + rows: &testRows[dbmodel.AttributeMetadata]{ + data: []dbmodel.AttributeMetadata{{ + AttributeKey: "http.method", + Type: "str", + Level: "span", + }}, + scanErr: assert.AnError, + }, + err: nil, + }, + }, + }, + expectedErr: "failed to scan row", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + reader := NewReader(tt.driver, ReaderConfig{}) + _, err := reader.getAttributeMetadata(t.Context(), attrs) + require.Error(t, err) + assert.ErrorContains(t, err, tt.expectedErr) + }) + } +} diff --git a/internal/storage/v2/clickhouse/tracestore/dbmodel/attribute_metadata.go b/internal/storage/v2/clickhouse/tracestore/dbmodel/attribute_metadata.go new file mode 100644 index 00000000000..4187ba5c7b6 --- /dev/null +++ b/internal/storage/v2/clickhouse/tracestore/dbmodel/attribute_metadata.go @@ -0,0 +1,22 @@ +// Copyright (c) 2026 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package dbmodel + +// AttributeMetadata represents metadata about an attribute stored in ClickHouse. +// This is populated by the attribute_metadata materialized view which tracks +// all unique (attribute_key, type, level) tuples observed in the spans table. +// +// The same attribute key can have multiple entries with different types or levels. +// For example, "http.status" might appear as both type="int" and type="str" if +// different spans store it with different types. +type AttributeMetadata struct { + // AttributeKey is the name of the attribute (e.g., "http.status", "service.name") + AttributeKey string `ch:"attribute_key"` + // Type is the data type of the attribute value. + // One of: "bool", "double", "int", "str", "bytes", "map", "slice" + Type string `ch:"type"` + // Level is the scope level where this attribute appears. + // One of: "span", "resource", "scope" + Level string `ch:"level"` +} diff --git a/internal/storage/v2/clickhouse/tracestore/reader.go b/internal/storage/v2/clickhouse/tracestore/reader.go index 15b03f3e11e..438431b5315 100644 --- a/internal/storage/v2/clickhouse/tracestore/reader.go +++ b/internal/storage/v2/clickhouse/tracestore/reader.go @@ -9,6 +9,7 @@ import ( "encoding/hex" "fmt" "iter" + "strconv" "strings" "time" @@ -143,7 +144,7 @@ func (r *Reader) FindTraces( query tracestore.TraceQueryParams, ) iter.Seq2[[]ptrace.Traces, error] { return func(yield func([]ptrace.Traces, error) bool) { - traceIDsQuery, args, err := r.buildFindTraceIDsQuery(query) + traceIDsQuery, args, err := r.buildFindTraceIDsQuery(ctx, query) if err != nil { yield(nil, fmt.Errorf("failed to build query: %w", err)) return @@ -206,7 +207,7 @@ func (r *Reader) FindTraceIDs( query tracestore.TraceQueryParams, ) iter.Seq2[[]tracestore.FoundTraceID, error] { return func(yield func([]tracestore.FoundTraceID, error) bool) { - q, args, err := r.buildFindTraceIDsQuery(query) + q, args, err := r.buildFindTraceIDsQuery(ctx, query) if err != nil { yield(nil, fmt.Errorf("failed to build query: %w", err)) return @@ -243,7 +244,10 @@ func buildFindTracesQuery(traceIDsQuery string) string { return sql.SelectSpansQuery + " WHERE s.trace_id IN (SELECT trace_id FROM (" + traceIDsQuery + ")) ORDER BY s.trace_id" } -func (r *Reader) buildFindTraceIDsQuery(query tracestore.TraceQueryParams) (string, []any, error) { +func (r *Reader) buildFindTraceIDsQuery( + ctx context.Context, + query tracestore.TraceQueryParams, +) (string, []any, error) { limit := query.SearchDepth if limit == 0 { limit = r.config.DefaultSearchDepth @@ -281,57 +285,213 @@ func (r *Reader) buildFindTraceIDsQuery(query tracestore.TraceQueryParams) (stri args = append(args, query.StartTimeMax) } - for key, attr := range query.Attributes.All() { - var attrType string - var val any + // Only query attribute metadata if requested and string attributes are present. + // Non-string attributes (bool/double/int/bytes/slice/map) don't require metadata. + var attributeMetadata attributeMetadata + if hasStringAttributes(query.Attributes) { + am, err := r.getAttributeMetadata(ctx, query.Attributes) + if err != nil { + return "", nil, fmt.Errorf("failed to get attribute metadata: %w", err) + } + attributeMetadata = am + } + + if err := buildAttributeConditions(&q, &args, query.Attributes, attributeMetadata); err != nil { + return "", nil, err + } + + q.WriteString(" LIMIT ?") + args = append(args, limit) + + return q.String(), args, nil +} + +// hasStringAttributes returns true if any attribute in the map is of string type. +func hasStringAttributes(attributes pcommon.Map) bool { + for _, attr := range attributes.All() { + if attr.Type() == pcommon.ValueTypeStr { + return true + } + } + return false +} + +func buildAttributeConditions(q *strings.Builder, args *[]any, attributes pcommon.Map, metadata attributeMetadata) error { + for key, attr := range attributes.All() { + q.WriteString(" AND (") switch attr.Type() { case pcommon.ValueTypeBool: - attrType = "bool" - val = attr.Bool() + buildBoolAttributeCondition(q, args, key, attr) case pcommon.ValueTypeDouble: - attrType = "double" - val = attr.Double() + buildDoubleAttributeCondition(q, args, key, attr) case pcommon.ValueTypeInt: - attrType = "int" - val = attr.Int() + buildIntAttributeCondition(q, args, key, attr) case pcommon.ValueTypeStr: - attrType = "str" - val = attr.Str() + if err := buildStringAttributeCondition(q, args, key, attr, metadata); err != nil { + return err + } case pcommon.ValueTypeBytes: - attrType = "complex" - key = "@bytes@" + key - val = base64.StdEncoding.EncodeToString(attr.Bytes().AsRaw()) + buildBytesAttributeCondition(q, args, key, attr) case pcommon.ValueTypeSlice: - attrType = "complex" - key = "@slice@" + key - b, err := marshalValueForQuery(attr) - if err != nil { - return "", nil, fmt.Errorf("failed to marshal slice attribute %q: %w", key, err) + if err := buildSliceAttributeCondition(q, args, key, attr); err != nil { + return err } - val = b case pcommon.ValueTypeMap: - attrType = "complex" - key = "@map@" + key - b, err := marshalValueForQuery(attr) - if err != nil { - return "", nil, fmt.Errorf("failed to marshal map attribute %q: %w", key, err) + if err := buildMapAttributeCondition(q, args, key, attr); err != nil { + return err } - val = b default: - return "", nil, fmt.Errorf("unsupported attribute type %v for key %s", attr.Type(), key) + return fmt.Errorf("unsupported attribute type %v for key %s", attr.Type(), key) } - q.WriteString(" AND (") - q.WriteString("arrayExists((key, value) -> key = ? AND value = ?, s." + attrType + "_attributes.key, s." + attrType + "_attributes.value)") - q.WriteString(" OR ") - q.WriteString("arrayExists((key, value) -> key = ? AND value = ?, s.resource_" + attrType + "_attributes.key, s.resource_" + attrType + "_attributes.value)") q.WriteString(")") - args = append(args, key, val, key, val) } - q.WriteString(" LIMIT ?") - args = append(args, limit) + return nil +} - return q.String(), args, nil +func buildBoolAttributeCondition(q *strings.Builder, args *[]any, key string, attr pcommon.Value) { + q.WriteString("arrayExists((key, value) -> key = ? AND value = ?, s.bool_attributes.key, s.bool_attributes.value)") + q.WriteString(" OR ") + q.WriteString("arrayExists((key, value) -> key = ? AND value = ?, s.resource_bool_attributes.key, s.resource_bool_attributes.value)") + *args = append(*args, key, attr.Bool(), key, attr.Bool()) +} + +func buildDoubleAttributeCondition(q *strings.Builder, args *[]any, key string, attr pcommon.Value) { + q.WriteString("arrayExists((key, value) -> key = ? AND value = ?, s.double_attributes.key, s.double_attributes.value)") + q.WriteString(" OR ") + q.WriteString("arrayExists((key, value) -> key = ? AND value = ?, s.resource_double_attributes.key, s.resource_double_attributes.value)") + *args = append(*args, key, attr.Double(), key, attr.Double()) +} + +func buildIntAttributeCondition(q *strings.Builder, args *[]any, key string, attr pcommon.Value) { + q.WriteString("arrayExists((key, value) -> key = ? AND value = ?, s.int_attributes.key, s.int_attributes.value)") + q.WriteString(" OR ") + q.WriteString("arrayExists((key, value) -> key = ? AND value = ?, s.resource_int_attributes.key, s.resource_int_attributes.value)") + *args = append(*args, key, attr.Int(), key, attr.Int()) +} + +func buildBytesAttributeCondition(q *strings.Builder, args *[]any, key string, attr pcommon.Value) { + attrKey := "@bytes@" + key + val := base64.StdEncoding.EncodeToString(attr.Bytes().AsRaw()) + q.WriteString("arrayExists((key, value) -> key = ? AND value = ?, s.complex_attributes.key, s.complex_attributes.value)") + q.WriteString(" OR ") + q.WriteString("arrayExists((key, value) -> key = ? AND value = ?, s.resource_complex_attributes.key, s.resource_complex_attributes.value)") + *args = append(*args, attrKey, val, attrKey, val) +} + +func buildSliceAttributeCondition(q *strings.Builder, args *[]any, key string, attr pcommon.Value) error { + attrKey := "@slice@" + key + b, err := marshalValueForQuery(attr) + if err != nil { + return fmt.Errorf("failed to marshal slice attribute %q: %w", key, err) + } + q.WriteString("arrayExists((key, value) -> key = ? AND value = ?, s.complex_attributes.key, s.complex_attributes.value)") + q.WriteString(" OR ") + q.WriteString("arrayExists((key, value) -> key = ? AND value = ?, s.resource_complex_attributes.key, s.resource_complex_attributes.value)") + *args = append(*args, attrKey, b, attrKey, b) + return nil +} + +func buildMapAttributeCondition(q *strings.Builder, args *[]any, key string, attr pcommon.Value) error { + attrKey := "@map@" + key + b, err := marshalValueForQuery(attr) + if err != nil { + return fmt.Errorf("failed to marshal map attribute %q: %w", key, err) + } + q.WriteString("arrayExists((key, value) -> key = ? AND value = ?, s.complex_attributes.key, s.complex_attributes.value)") + q.WriteString(" OR ") + q.WriteString("arrayExists((key, value) -> key = ? AND value = ?, s.resource_complex_attributes.key, s.resource_complex_attributes.value)") + *args = append(*args, attrKey, b, attrKey, b) + return nil +} + +// buildStringAttributeCondition adds a condition for string attributes by looking up their +// actual stored type(s) and level(s) from the attribute_metadata table. +// +// String attributes require special handling because the query service passes all +// attributes as strings (via AsString()), regardless of their actual stored type. +// We must look up the attribute_metadata to determine the actual type(s) and +// level(s) where this attribute is stored, then convert the string back to the +// appropriate type for querying. +func buildStringAttributeCondition(q *strings.Builder, args *[]any, key string, attr pcommon.Value, metadata attributeMetadata) error { + levelTypes, ok := metadata[key] + + // if no metadata found, assume string type + if !ok { + q.WriteString("arrayExists((key, value) -> key = ? AND value = ?, s.str_attributes.key, s.str_attributes.value)") + q.WriteString(" OR ") + q.WriteString("arrayExists((key, value) -> key = ? AND value = ?, s.resource_str_attributes.key, s.resource_str_attributes.value)") + *args = append(*args, key, attr.Str(), key, attr.Str()) + return nil + } + + first := true + for level, types := range levelTypes { + for _, t := range types { + if !first { + q.WriteString(" OR ") + } + first = false + + attrKey := key + var val any + + switch t { + case "bool": + b, err := strconv.ParseBool(attr.Str()) + if err != nil { + return fmt.Errorf("failed to parse bool attribute %q: %w", key, err) + } + val = b + case "double": + f, err := strconv.ParseFloat(attr.Str(), 64) + if err != nil { + return fmt.Errorf("failed to parse double attribute %q: %w", key, err) + } + val = f + case "int": + i, err := strconv.ParseInt(attr.Str(), 10, 64) + if err != nil { + return fmt.Errorf("failed to parse int attribute %q: %w", key, err) + } + val = i + case "str": + val = attr.Str() + case "bytes": + attrKey = "@bytes@" + key + decoded, err := base64.StdEncoding.DecodeString(attr.Str()) + if err != nil { + return fmt.Errorf("failed to decode bytes attribute %q: %w", key, err) + } + val = string(decoded) + // TODO: support map and slice + default: + return fmt.Errorf("unsupported attribute type %q for key %q", t, key) + } + + var colType string + if t == "bytes" || t == "map" || t == "slice" { + colType = "complex" + } else { + colType = t + } + + var prefix string + switch level { + case "resource": + prefix = "resource_" + case "scope": + prefix = "scope_" + default: + prefix = "" + } + + q.WriteString("arrayExists((key, value) -> key = ? AND value = ?, s." + prefix + colType + "_attributes.key, s." + prefix + colType + "_attributes.value)") + *args = append(*args, attrKey, val) + } + } + + return nil } diff --git a/internal/storage/v2/clickhouse/tracestore/reader_test.go b/internal/storage/v2/clickhouse/tracestore/reader_test.go index 55d37bf59d1..95c4f4b25e8 100644 --- a/internal/storage/v2/clickhouse/tracestore/reader_test.go +++ b/internal/storage/v2/clickhouse/tracestore/reader_test.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "reflect" + "strings" "testing" "time" @@ -39,8 +40,39 @@ var ( time.Time{}, }, } + testAttributeMetadata = []dbmodel.AttributeMetadata{ + {AttributeKey: "span.flag", Type: "bool", Level: "span"}, + {AttributeKey: "resource.latency", Type: "double", Level: "resource"}, + {AttributeKey: "scope.attempt", Type: "int", Level: "scope"}, + {AttributeKey: "http.method", Type: "str", Level: "span"}, + {AttributeKey: "resource.checksum", Type: "bytes", Level: "resource"}, + } ) +func buildTestAttributes() pcommon.Map { + attrs := pcommon.NewMap() + attrs.PutBool("login_successful", true) + attrs.PutDouble("response_time", 0.123) + attrs.PutInt("attempt_count", 1) + b := attrs.PutEmptyBytes("file.checksum") + s := attrs.PutEmptySlice("http.headers") + m := attrs.PutEmptyMap("http.cookies") + + b.FromRaw([]byte{0x12, 0x34, 0x56, 0x78}) + s.AppendEmpty().SetStr("header1: value1") + m.PutStr("session_id", "abc123") + + // these attributes will require type lookup from attribute_metadata + attrs.PutStr("no.metadata", "nonexistent") // no metadata entry + attrs.PutStr("http.method", "GET") + attrs.PutStr("span.flag", "true") + attrs.PutStr("resource.latency", "0.5") + attrs.PutStr("scope.attempt", "7") + attrs.PutStr("resource.checksum", "EjRWeA==") + + return attrs +} + func scanSpanRowFn() func(dest any, src *dbmodel.SpanRow) error { return func(dest any, src *dbmodel.SpanRow) error { ptrs, ok := dest.([]any) @@ -129,6 +161,17 @@ func scanSpanRowFn() func(dest any, src *dbmodel.SpanRow) error { } } +func scanAttributeMetadataFn() func(dest any, src dbmodel.AttributeMetadata) error { + return func(dest any, src dbmodel.AttributeMetadata) error { + ptr, ok := dest.(*dbmodel.AttributeMetadata) + if !ok { + return fmt.Errorf("expected *dbmodel.AttributeMetadata for dest, got %T", dest) + } + *ptr = src + return nil + } +} + func scanTraceIDFn() func(dest any, src []any) error { return func(dest any, src []any) error { ptrs, ok := dest.([]any) @@ -201,7 +244,7 @@ func TestGetTraces_Success(t *testing.T) { require.NoError(t, err) require.Len(t, conn.recordedQueries, 1) - verifyQuerySnapshot(t, conn.recordedQueries[0]) + verifyQuerySnapshot(t, conn.recordedQueries...) requireTracesEqual(t, tt.data, traces) }) } @@ -401,14 +444,6 @@ func TestGetServices(t *testing.T) { {Name: "serviceB"}, {Name: "serviceC"}, }, - scanFn: func(dest any, src dbmodel.Service) error { - svc, ok := dest.(*dbmodel.Service) - if !ok { - return errors.New("dest is not *dbmodel.Service") - } - *svc = src - return nil - }, scanErr: assert.AnError, }, err: nil, @@ -430,7 +465,7 @@ func TestGetServices(t *testing.T) { } else { require.NoError(t, err) require.Len(t, test.conn.recordedQueries, 1) - verifyQuerySnapshot(t, test.conn.recordedQueries[0]) + verifyQuerySnapshot(t, test.conn.recordedQueries...) require.Equal(t, test.expected, result) } }) @@ -554,14 +589,6 @@ func TestGetOperations(t *testing.T) { {Name: "operationB"}, {Name: "operationC"}, }, - scanFn: func(dest any, src dbmodel.Operation) error { - svc, ok := dest.(*dbmodel.Operation) - if !ok { - return errors.New("dest is not *dbmodel.Operation") - } - *svc = src - return nil - }, scanErr: assert.AnError, }, err: nil, @@ -583,7 +610,7 @@ func TestGetOperations(t *testing.T) { } else { require.NoError(t, err) require.Len(t, test.conn.recordedQueries, 1) - verifyQuerySnapshot(t, test.conn.recordedQueries[0]) + verifyQuerySnapshot(t, test.conn.recordedQueries...) require.Equal(t, test.expected, result) } }) @@ -628,7 +655,7 @@ func TestFindTraces_Success(t *testing.T) { require.NoError(t, err) require.Len(t, conn.recordedQueries, 1) - verifyQuerySnapshot(t, conn.recordedQueries[0]) + verifyQuerySnapshot(t, conn.recordedQueries...) requireTracesEqual(t, tt.data, traces) }) } @@ -638,6 +665,12 @@ func TestFindTraces_WithFilters(t *testing.T) { conn := &testDriver{ t: t, queryResponses: map[string]*testQueryResponse{ + sql.SelectAttributeMetadata: { + rows: &testRows[dbmodel.AttributeMetadata]{ + data: testAttributeMetadata, + scanFn: scanAttributeMetadataFn(), + }, + }, sql.SelectSpansQuery: { rows: &testRows[*dbmodel.SpanRow]{ data: multipleSpans, @@ -649,18 +682,7 @@ func TestFindTraces_WithFilters(t *testing.T) { } reader := NewReader(conn, testReaderConfig) - attributes := pcommon.NewMap() - attributes.PutBool("login_successful", true) - attributes.PutDouble("response_time", 0.123) - attributes.PutInt("attempt_count", 1) - attributes.PutStr("http.method", "GET") - b := attributes.PutEmptyBytes("file.checksum") - s := attributes.PutEmptySlice("http.headers") - m := attributes.PutEmptyMap("http.cookies") - - b.FromRaw([]byte{0x12, 0x34, 0x56, 0x78}) - s.AppendEmpty().SetStr("header1: value1") - m.PutStr("session_id", "abc123") + attributes := buildTestAttributes() iter := reader.FindTraces(context.Background(), tracestore.TraceQueryParams{ ServiceName: "serviceA", @@ -674,23 +696,14 @@ func TestFindTraces_WithFilters(t *testing.T) { }) traces, err := jiter.FlattenWithErrors(iter) require.NoError(t, err) - require.Len(t, conn.recordedQueries, 1) - verifyQuerySnapshot(t, conn.recordedQueries[0]) + require.Len(t, conn.recordedQueries, 2) + verifyQuerySnapshot(t, conn.recordedQueries...) requireTracesEqual(t, multipleSpans, traces) } func TestFindTraces_SearchDepthExceedsMax(t *testing.T) { driver := &testDriver{ t: t, - queryResponses: map[string]*testQueryResponse{ - sql.SelectSpansQuery: { - rows: &testRows[*dbmodel.SpanRow]{ - data: singleSpan, - scanFn: scanSpanRowFn(), - }, - err: nil, - }, - }, } reader := NewReader(driver, testReaderConfig) iter := reader.FindTraces(context.Background(), tracestore.TraceQueryParams{ @@ -843,6 +856,12 @@ func TestFindTraceIDs(t *testing.T) { driver := &testDriver{ t: t, queryResponses: map[string]*testQueryResponse{ + sql.SelectAttributeMetadata: { + rows: &testRows[dbmodel.AttributeMetadata]{ + data: testAttributeMetadata, + scanFn: scanAttributeMetadataFn(), + }, + }, sql.SearchTraceIDs: { rows: &testRows[[]any]{ data: testTraceIDsData, @@ -853,18 +872,7 @@ func TestFindTraceIDs(t *testing.T) { }, } reader := NewReader(driver, testReaderConfig) - attributes := pcommon.NewMap() - attributes.PutBool("login_successful", true) - attributes.PutDouble("response_time", 0.123) - attributes.PutInt("attempt_count", 1) - attributes.PutStr("http.method", "GET") - b := attributes.PutEmptyBytes("file.checksum") - s := attributes.PutEmptySlice("http.headers") - m := attributes.PutEmptyMap("http.cookies") - - b.FromRaw([]byte{0x12, 0x34, 0x56, 0x78}) - s.AppendEmpty().SetStr("header1: value1") - m.PutStr("session_id", "abc123") + attributes := buildTestAttributes() iter := reader.FindTraceIDs(context.Background(), tracestore.TraceQueryParams{ ServiceName: "serviceA", @@ -878,8 +886,8 @@ func TestFindTraceIDs(t *testing.T) { }) ids, err := jiter.FlattenWithErrors(iter) require.NoError(t, err) - require.Len(t, driver.recordedQueries, 1) - verifyQuerySnapshot(t, driver.recordedQueries[0]) + require.Len(t, driver.recordedQueries, 2) + verifyQuerySnapshot(t, driver.recordedQueries...) require.Equal(t, []tracestore.FoundTraceID{ { TraceID: pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}), @@ -1154,7 +1162,7 @@ func TestFindTraceIDs_BuildQueryError(t *testing.T) { require.ErrorContains(t, err, "failed to build query") } -func TestBuildSearchTraceIDsQuery_MarshalErrors(t *testing.T) { +func TestBuildFindTraceIDsQuery_MarshalErrors(t *testing.T) { orig := marshalValueForQuery t.Cleanup(func() { marshalValueForQuery = orig }) marshalValueForQuery = func(pcommon.Value) (string, error) { @@ -1167,7 +1175,7 @@ func TestBuildSearchTraceIDsQuery_MarshalErrors(t *testing.T) { s.AppendEmpty() reader := NewReader(&testDriver{t: t}, testReaderConfig) - _, _, err := reader.buildFindTraceIDsQuery(tracestore.TraceQueryParams{Attributes: attrs}) + _, _, err := reader.buildFindTraceIDsQuery(t.Context(), tracestore.TraceQueryParams{Attributes: attrs}) require.Error(t, err) require.ErrorContains(t, err, "failed to marshal slice attribute") @@ -1179,9 +1187,87 @@ func TestBuildSearchTraceIDsQuery_MarshalErrors(t *testing.T) { m.PutEmpty("key") reader := NewReader(&testDriver{t: t}, testReaderConfig) - _, _, err := reader.buildFindTraceIDsQuery(tracestore.TraceQueryParams{Attributes: attrs}) + _, _, err := reader.buildFindTraceIDsQuery(t.Context(), tracestore.TraceQueryParams{Attributes: attrs}) require.Error(t, err) require.ErrorContains(t, err, "failed to marshal map attribute") }) } + +func TestBuildFindTraceIDsQuery_AttributeMetadataError(t *testing.T) { + td := &testDriver{ + t: t, + queryResponses: map[string]*testQueryResponse{ + sql.SelectAttributeMetadata: { + rows: nil, + err: assert.AnError, + }, + }, + } + + reader := NewReader(td, testReaderConfig) + _, _, err := reader.buildFindTraceIDsQuery(t.Context(), tracestore.TraceQueryParams{Attributes: buildTestAttributes()}) + require.ErrorContains(t, err, "failed to get attribute metadata") +} + +func TestBuildStringAttributeCondition_Errors(t *testing.T) { + cases := []struct { + name string + attrValue string + metadata attributeMetadata + expectedErr string + }{ + { + name: "parse bool fails", + attrValue: "not-bool", + metadata: attributeMetadata{ + "k": {"span": {"bool"}}, + }, + expectedErr: "failed to parse bool attribute", + }, + { + name: "parse double fails", + attrValue: "not-float", + metadata: attributeMetadata{ + "k": {"span": {"double"}}, + }, + expectedErr: "failed to parse double attribute", + }, + { + name: "parse int fails", + attrValue: "not-int", + metadata: attributeMetadata{ + "k": {"span": {"int"}}, + }, + expectedErr: "failed to parse int attribute", + }, + { + name: "decode bytes fails", + attrValue: "!not-base64!", + metadata: attributeMetadata{ + "k": {"span": {"bytes"}}, + }, + expectedErr: "failed to decode bytes attribute", + }, + { + name: "unsupported type", + attrValue: "whatever", + metadata: attributeMetadata{ + "k": {"span": {"unknown"}}, + }, + expectedErr: "unsupported attribute type", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + attr := pcommon.NewValueStr(tc.attrValue) + var q strings.Builder + var args []any + + err := buildStringAttributeCondition(&q, &args, "k", attr, tc.metadata) + require.Error(t, err) + assert.ErrorContains(t, err, tc.expectedErr) + }) + } +} diff --git a/internal/storage/v2/clickhouse/tracestore/snapshots/TestFindTraceIDs_1.sql b/internal/storage/v2/clickhouse/tracestore/snapshots/TestFindTraceIDs_1.sql index dcb4d75b624..3c19ba30abc 100644 --- a/internal/storage/v2/clickhouse/tracestore/snapshots/TestFindTraceIDs_1.sql +++ b/internal/storage/v2/clickhouse/tracestore/snapshots/TestFindTraceIDs_1.sql @@ -1,7 +1,6 @@ -SELECT DISTINCT - s.trace_id, - t.start, - t.end -FROM spans s -LEFT JOIN trace_id_timestamps t ON s.trace_id = t.trace_id -WHERE 1=1 AND s.service_name = ? AND s.name = ? AND s.duration >= ? AND s.duration <= ? AND s.start_time >= ? AND s.start_time <= ? AND (arrayExists((key, value) -> key = ? AND value = ?, s.bool_attributes.key, s.bool_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_bool_attributes.key, s.resource_bool_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.double_attributes.key, s.double_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_double_attributes.key, s.resource_double_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.int_attributes.key, s.int_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_int_attributes.key, s.resource_int_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.str_attributes.key, s.str_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_str_attributes.key, s.resource_str_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.complex_attributes.key, s.complex_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_complex_attributes.key, s.resource_complex_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.complex_attributes.key, s.complex_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_complex_attributes.key, s.resource_complex_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.complex_attributes.key, s.complex_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_complex_attributes.key, s.resource_complex_attributes.value)) LIMIT ? +SELECT + attribute_key, + type, + level +FROM + attribute_metadata WHERE attribute_key IN (?, ?, ?, ?, ?, ?) GROUP BY attribute_key, type, level diff --git a/internal/storage/v2/clickhouse/tracestore/snapshots/TestFindTraceIDs_2.sql b/internal/storage/v2/clickhouse/tracestore/snapshots/TestFindTraceIDs_2.sql new file mode 100644 index 00000000000..dd8548f49f6 --- /dev/null +++ b/internal/storage/v2/clickhouse/tracestore/snapshots/TestFindTraceIDs_2.sql @@ -0,0 +1,7 @@ +SELECT DISTINCT + s.trace_id, + t.start, + t.end +FROM spans s +LEFT JOIN trace_id_timestamps t ON s.trace_id = t.trace_id +WHERE 1=1 AND s.service_name = ? AND s.name = ? AND s.duration >= ? AND s.duration <= ? AND s.start_time >= ? AND s.start_time <= ? AND (arrayExists((key, value) -> key = ? AND value = ?, s.bool_attributes.key, s.bool_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_bool_attributes.key, s.resource_bool_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.double_attributes.key, s.double_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_double_attributes.key, s.resource_double_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.int_attributes.key, s.int_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_int_attributes.key, s.resource_int_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.complex_attributes.key, s.complex_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_complex_attributes.key, s.resource_complex_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.complex_attributes.key, s.complex_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_complex_attributes.key, s.resource_complex_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.complex_attributes.key, s.complex_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_complex_attributes.key, s.resource_complex_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.str_attributes.key, s.str_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_str_attributes.key, s.resource_str_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.str_attributes.key, s.str_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.bool_attributes.key, s.bool_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.resource_double_attributes.key, s.resource_double_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.scope_int_attributes.key, s.scope_int_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.resource_complex_attributes.key, s.resource_complex_attributes.value)) LIMIT ? diff --git a/internal/storage/v2/clickhouse/tracestore/snapshots/TestFindTraces_WithFilters_1.sql b/internal/storage/v2/clickhouse/tracestore/snapshots/TestFindTraces_WithFilters_1.sql index 64ba2866e4f..3c19ba30abc 100644 --- a/internal/storage/v2/clickhouse/tracestore/snapshots/TestFindTraces_WithFilters_1.sql +++ b/internal/storage/v2/clickhouse/tracestore/snapshots/TestFindTraces_WithFilters_1.sql @@ -1,79 +1,6 @@ SELECT - id, - trace_id, - trace_state, - parent_span_id, - name, - kind, - start_time, - status_code, - status_message, - duration, - bool_attributes.key, - bool_attributes.value, - double_attributes.key, - double_attributes.value, - int_attributes.key, - int_attributes.value, - str_attributes.key, - str_attributes.value, - complex_attributes.key, - complex_attributes.value, - events.name, - events.timestamp, - events.bool_attributes.key, - events.bool_attributes.value, - events.double_attributes.key, - events.double_attributes.value, - events.int_attributes.key, - events.int_attributes.value, - events.str_attributes.key, - events.str_attributes.value, - events.complex_attributes.key, - events.complex_attributes.value, - links.trace_id, - links.span_id, - links.trace_state, - links.bool_attributes.key, - links.bool_attributes.value, - links.double_attributes.key, - links.double_attributes.value, - links.int_attributes.key, - links.int_attributes.value, - links.str_attributes.key, - links.str_attributes.value, - links.complex_attributes.key, - links.complex_attributes.value, - service_name, - resource_bool_attributes.key, - resource_bool_attributes.value, - resource_double_attributes.key, - resource_double_attributes.value, - resource_int_attributes.key, - resource_int_attributes.value, - resource_str_attributes.key, - resource_str_attributes.value, - resource_complex_attributes.key, - resource_complex_attributes.value, - scope_name, - scope_version, - scope_bool_attributes.key, - scope_bool_attributes.value, - scope_double_attributes.key, - scope_double_attributes.value, - scope_int_attributes.key, - scope_int_attributes.value, - scope_str_attributes.key, - scope_str_attributes.value, - scope_complex_attributes.key, - scope_complex_attributes.value + attribute_key, + type, + level FROM - spans s - WHERE s.trace_id IN (SELECT trace_id FROM ( -SELECT DISTINCT - s.trace_id, - t.start, - t.end -FROM spans s -LEFT JOIN trace_id_timestamps t ON s.trace_id = t.trace_id -WHERE 1=1 AND s.service_name = ? AND s.name = ? AND s.duration >= ? AND s.duration <= ? AND s.start_time >= ? AND s.start_time <= ? AND (arrayExists((key, value) -> key = ? AND value = ?, s.bool_attributes.key, s.bool_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_bool_attributes.key, s.resource_bool_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.double_attributes.key, s.double_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_double_attributes.key, s.resource_double_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.int_attributes.key, s.int_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_int_attributes.key, s.resource_int_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.str_attributes.key, s.str_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_str_attributes.key, s.resource_str_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.complex_attributes.key, s.complex_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_complex_attributes.key, s.resource_complex_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.complex_attributes.key, s.complex_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_complex_attributes.key, s.resource_complex_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.complex_attributes.key, s.complex_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_complex_attributes.key, s.resource_complex_attributes.value)) LIMIT ?)) ORDER BY s.trace_id + attribute_metadata WHERE attribute_key IN (?, ?, ?, ?, ?, ?) GROUP BY attribute_key, type, level diff --git a/internal/storage/v2/clickhouse/tracestore/snapshots/TestFindTraces_WithFilters_2.sql b/internal/storage/v2/clickhouse/tracestore/snapshots/TestFindTraces_WithFilters_2.sql new file mode 100644 index 00000000000..f5f3cf4db4f --- /dev/null +++ b/internal/storage/v2/clickhouse/tracestore/snapshots/TestFindTraces_WithFilters_2.sql @@ -0,0 +1,79 @@ +SELECT + id, + trace_id, + trace_state, + parent_span_id, + name, + kind, + start_time, + status_code, + status_message, + duration, + bool_attributes.key, + bool_attributes.value, + double_attributes.key, + double_attributes.value, + int_attributes.key, + int_attributes.value, + str_attributes.key, + str_attributes.value, + complex_attributes.key, + complex_attributes.value, + events.name, + events.timestamp, + events.bool_attributes.key, + events.bool_attributes.value, + events.double_attributes.key, + events.double_attributes.value, + events.int_attributes.key, + events.int_attributes.value, + events.str_attributes.key, + events.str_attributes.value, + events.complex_attributes.key, + events.complex_attributes.value, + links.trace_id, + links.span_id, + links.trace_state, + links.bool_attributes.key, + links.bool_attributes.value, + links.double_attributes.key, + links.double_attributes.value, + links.int_attributes.key, + links.int_attributes.value, + links.str_attributes.key, + links.str_attributes.value, + links.complex_attributes.key, + links.complex_attributes.value, + service_name, + resource_bool_attributes.key, + resource_bool_attributes.value, + resource_double_attributes.key, + resource_double_attributes.value, + resource_int_attributes.key, + resource_int_attributes.value, + resource_str_attributes.key, + resource_str_attributes.value, + resource_complex_attributes.key, + resource_complex_attributes.value, + scope_name, + scope_version, + scope_bool_attributes.key, + scope_bool_attributes.value, + scope_double_attributes.key, + scope_double_attributes.value, + scope_int_attributes.key, + scope_int_attributes.value, + scope_str_attributes.key, + scope_str_attributes.value, + scope_complex_attributes.key, + scope_complex_attributes.value +FROM + spans s + WHERE s.trace_id IN (SELECT trace_id FROM ( +SELECT DISTINCT + s.trace_id, + t.start, + t.end +FROM spans s +LEFT JOIN trace_id_timestamps t ON s.trace_id = t.trace_id +WHERE 1=1 AND s.service_name = ? AND s.name = ? AND s.duration >= ? AND s.duration <= ? AND s.start_time >= ? AND s.start_time <= ? AND (arrayExists((key, value) -> key = ? AND value = ?, s.bool_attributes.key, s.bool_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_bool_attributes.key, s.resource_bool_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.double_attributes.key, s.double_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_double_attributes.key, s.resource_double_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.int_attributes.key, s.int_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_int_attributes.key, s.resource_int_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.complex_attributes.key, s.complex_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_complex_attributes.key, s.resource_complex_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.complex_attributes.key, s.complex_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_complex_attributes.key, s.resource_complex_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.complex_attributes.key, s.complex_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_complex_attributes.key, s.resource_complex_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.str_attributes.key, s.str_attributes.value) OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_str_attributes.key, s.resource_str_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.str_attributes.key, s.str_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.bool_attributes.key, s.bool_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.resource_double_attributes.key, s.resource_double_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.scope_int_attributes.key, s.scope_int_attributes.value)) AND (arrayExists((key, value) -> key = ? AND value = ?, s.resource_complex_attributes.key, s.resource_complex_attributes.value)) LIMIT ?)) ORDER BY s.trace_id