diff --git a/.mockery.yaml b/.mockery.yaml index 5016d2b0947..849e59ec0d7 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -65,3 +65,6 @@ packages: github.com/jaegertracing/jaeger/internal/storage/v2/api/tracestore: config: all: true + github.com/jaegertracing/jaeger/internal/storage/v1/cassandra/spanstore: + interfaces: + CoreSpanWriter: {} diff --git a/internal/storage/v1/cassandra/factory.go b/internal/storage/v1/cassandra/factory.go index ff45620376b..84f9ca6b6bc 100644 --- a/internal/storage/v1/cassandra/factory.go +++ b/internal/storage/v1/cassandra/factory.go @@ -143,7 +143,7 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { if err != nil { return nil, err } - return cspanstore.NewSpanWriter(f.session, f.Options.SpanStoreWriteCacheTTL, f.metricsFactory, f.logger, options...) + return cspanstore.NewSpanWriterV1(f.session, f.Options.SpanStoreWriteCacheTTL, f.metricsFactory, f.logger, options...) } // CreateDependencyReader implements storage.Factory diff --git a/internal/storage/v1/cassandra/savetracetest/main.go b/internal/storage/v1/cassandra/savetracetest/main.go index 6051ae5eabb..98a9c603221 100644 --- a/internal/storage/v1/cassandra/savetracetest/main.go +++ b/internal/storage/v1/cassandra/savetracetest/main.go @@ -46,7 +46,7 @@ func main() { logger.Fatal("Failed to initialize tracer", zap.Error(err)) } defer tracerCloser(context.Background()) - spanStore, err := cspanstore.NewSpanWriter(cqlSession, time.Hour*12, noScope, logger) + spanStore, err := cspanstore.NewSpanWriterV1(cqlSession, time.Hour*12, noScope, logger) if err != nil { logger.Fatal("Failed to create span writer", zap.Error(err)) } diff --git a/internal/storage/v1/cassandra/spanstore/dbmodel/converter.go b/internal/storage/v1/cassandra/spanstore/dbmodel/converter.go index af790d1130c..d6f828a800b 100644 --- a/internal/storage/v1/cassandra/spanstore/dbmodel/converter.go +++ b/internal/storage/v1/cassandra/spanstore/dbmodel/converter.go @@ -28,11 +28,11 @@ var ( } domainToDBValueTypeMap = map[model.ValueType]string{ - model.StringType: stringType, - model.BoolType: boolType, - model.Int64Type: int64Type, - model.Float64Type: float64Type, - model.BinaryType: binaryType, + model.StringType: StringType, + model.BoolType: BoolType, + model.Int64Type: Int64Type, + model.Float64Type: Float64Type, + model.BinaryType: BinaryType, } ) @@ -152,15 +152,15 @@ func (c converter) fromDBWarnings(tags []KeyValue) ([]string, error) { func (converter) fromDBTag(tag *KeyValue) (model.KeyValue, error) { switch tag.ValueType { - case stringType: + case StringType: return model.String(tag.Key, tag.ValueString), nil - case boolType: + case BoolType: return model.Bool(tag.Key, tag.ValueBool), nil - case int64Type: + case Int64Type: return model.Int64(tag.Key, tag.ValueInt64), nil - case float64Type: + case Float64Type: return model.Float64(tag.Key, tag.ValueFloat64), nil - case binaryType: + case BinaryType: return model.Binary(tag.Key, tag.ValueBinary), nil default: return model.KeyValue{}, fmt.Errorf("invalid ValueType in %+v", tag) diff --git a/internal/storage/v1/cassandra/spanstore/dbmodel/converter_test.go b/internal/storage/v1/cassandra/spanstore/dbmodel/converter_test.go index b80a8eddc80..0ff218d1f5d 100644 --- a/internal/storage/v1/cassandra/spanstore/dbmodel/converter_test.go +++ b/internal/storage/v1/cassandra/spanstore/dbmodel/converter_test.go @@ -46,27 +46,27 @@ var ( someDBTags = []KeyValue{ { Key: someStringTagKey, - ValueType: stringType, + ValueType: StringType, ValueString: someStringTagValue, }, { Key: someBoolTagKey, - ValueType: boolType, + ValueType: BoolType, ValueBool: someBoolTagValue, }, { Key: someLongTagKey, - ValueType: int64Type, + ValueType: Int64Type, ValueInt64: someLongTagValue, }, { Key: someDoubleTagKey, - ValueType: float64Type, + ValueType: Float64Type, ValueFloat64: someDoubleTagValue, }, { Key: someBinaryTagKey, - ValueType: binaryType, + ValueType: BinaryType, ValueBinary: someBinaryTagValue, }, } diff --git a/internal/storage/v1/cassandra/spanstore/dbmodel/model.go b/internal/storage/v1/cassandra/spanstore/dbmodel/model.go index 00df21668eb..05a5199cfe6 100644 --- a/internal/storage/v1/cassandra/spanstore/dbmodel/model.go +++ b/internal/storage/v1/cassandra/spanstore/dbmodel/model.go @@ -19,11 +19,12 @@ const ( childOf = "child-of" followsFrom = "follows-from" - stringType = "string" - boolType = "bool" - int64Type = "int64" - float64Type = "float64" - binaryType = "binary" + StringType = "string" + BoolType = "bool" + Int64Type = "int64" + Float64Type = "float64" + BinaryType = "binary" + spanKindKey = "span.kind" ) // TraceID is a serializable form of model.TraceID @@ -59,25 +60,25 @@ type KeyValue struct { func (t *KeyValue) compareValues(that *KeyValue) int { switch t.ValueType { - case stringType: + case StringType: return strings.Compare(t.ValueString, that.ValueString) - case boolType: + case BoolType: if t.ValueBool != that.ValueBool { if !t.ValueBool { return -1 } return 1 } - case int64Type: + case Int64Type: return int(t.ValueInt64 - that.ValueInt64) - case float64Type: + case Float64Type: if t.ValueFloat64 != that.ValueFloat64 { if t.ValueFloat64 < that.ValueFloat64 { return -1 } return 1 } - case binaryType: + case BinaryType: return bytes.Compare(t.ValueBinary, that.ValueBinary) default: return -1 // theoretical case, not stating them equal but placing the base pointer before other @@ -123,18 +124,18 @@ func (t *KeyValue) Equal(that any) bool { func (t *KeyValue) AsString() string { switch t.ValueType { - case stringType: + case StringType: return t.ValueString - case boolType: + case BoolType: if t.ValueBool { return "true" } return "false" - case int64Type: + case Int64Type: return strconv.FormatInt(t.ValueInt64, 10) - case float64Type: + case Float64Type: return strconv.FormatFloat(t.ValueFloat64, 'g', 10, 64) - case binaryType: + case BinaryType: return hex.EncodeToString(t.ValueBinary) default: return "unknown type " + t.ValueType @@ -203,3 +204,12 @@ func (t TraceID) ToDomain() model.TraceID { func (t TraceID) String() string { return t.ToDomain().String() } + +func GetSpanKind(ds *Span) string { + for _, tag := range ds.Tags { + if tag.Key == spanKindKey { + return tag.ValueString + } + } + return "" +} diff --git a/internal/storage/v1/cassandra/spanstore/dbmodel/model_test.go b/internal/storage/v1/cassandra/spanstore/dbmodel/model_test.go index 4bd68e242c4..a01bd91ebac 100644 --- a/internal/storage/v1/cassandra/spanstore/dbmodel/model_test.go +++ b/internal/storage/v1/cassandra/spanstore/dbmodel/model_test.go @@ -289,7 +289,7 @@ func TestKeyValueAsString(t *testing.T) { name: "StringType", kv: KeyValue{ Key: "k", - ValueType: stringType, + ValueType: StringType, ValueString: "hello", }, expect: "hello", @@ -298,7 +298,7 @@ func TestKeyValueAsString(t *testing.T) { name: "BoolTrue", kv: KeyValue{ Key: "k", - ValueType: boolType, + ValueType: BoolType, ValueBool: true, }, expect: "true", @@ -307,7 +307,7 @@ func TestKeyValueAsString(t *testing.T) { name: "BoolFalse", kv: KeyValue{ Key: "k", - ValueType: boolType, + ValueType: BoolType, ValueBool: false, }, expect: "false", @@ -316,7 +316,7 @@ func TestKeyValueAsString(t *testing.T) { name: "Int64Type", kv: KeyValue{ Key: "k", - ValueType: int64Type, + ValueType: Int64Type, ValueInt64: 12345, }, expect: "12345", @@ -325,7 +325,7 @@ func TestKeyValueAsString(t *testing.T) { name: "Float64Type", kv: KeyValue{ Key: "k", - ValueType: float64Type, + ValueType: Float64Type, ValueFloat64: 12.34, }, expect: "12.34", @@ -334,7 +334,7 @@ func TestKeyValueAsString(t *testing.T) { name: "BinaryType", kv: KeyValue{ Key: "k", - ValueType: binaryType, + ValueType: BinaryType, ValueBinary: []byte{0xAB, 0xCD, 0xEF}, }, expect: "abcdef", diff --git a/internal/storage/v1/cassandra/spanstore/dbmodel/tag_filter_exact_match_test.go b/internal/storage/v1/cassandra/spanstore/dbmodel/tag_filter_exact_match_test.go index 5dfef02c5b8..b4117093213 100644 --- a/internal/storage/v1/cassandra/spanstore/dbmodel/tag_filter_exact_match_test.go +++ b/internal/storage/v1/cassandra/spanstore/dbmodel/tag_filter_exact_match_test.go @@ -30,11 +30,11 @@ func TestBlacklistFilter(t *testing.T) { for _, test := range tt { var inputKVs []KeyValue for _, i := range test.input { - inputKVs = append(inputKVs, KeyValue{Key: i, ValueType: stringType, ValueString: ""}) + inputKVs = append(inputKVs, KeyValue{Key: i, ValueType: StringType, ValueString: ""}) } var expectedKVs []KeyValue for _, e := range test.expected { - expectedKVs = append(expectedKVs, KeyValue{Key: e, ValueType: stringType, ValueString: ""}) + expectedKVs = append(expectedKVs, KeyValue{Key: e, ValueType: StringType, ValueString: ""}) } SortKVs(expectedKVs) @@ -78,11 +78,11 @@ func TestWhitelistFilter(t *testing.T) { for _, test := range tt { var inputKVs []KeyValue for _, i := range test.input { - inputKVs = append(inputKVs, KeyValue{Key: i, ValueType: stringType, ValueString: ""}) + inputKVs = append(inputKVs, KeyValue{Key: i, ValueType: StringType, ValueString: ""}) } var expectedKVs []KeyValue for _, e := range test.expected { - expectedKVs = append(expectedKVs, KeyValue{Key: e, ValueType: stringType, ValueString: ""}) + expectedKVs = append(expectedKVs, KeyValue{Key: e, ValueType: StringType, ValueString: ""}) } SortKVs(expectedKVs) diff --git a/internal/storage/v1/cassandra/spanstore/dbmodel/tag_filter_test.go b/internal/storage/v1/cassandra/spanstore/dbmodel/tag_filter_test.go index 76db3370312..f2fcd344fe1 100644 --- a/internal/storage/v1/cassandra/spanstore/dbmodel/tag_filter_test.go +++ b/internal/storage/v1/cassandra/spanstore/dbmodel/tag_filter_test.go @@ -27,7 +27,7 @@ type onlyStringsFilter struct{} func (onlyStringsFilter) filterStringTags(tags []KeyValue) []KeyValue { var ret []KeyValue for _, tag := range tags { - if tag.ValueType == stringType { + if tag.ValueType == StringType { ret = append(ret, tag) } } @@ -47,7 +47,7 @@ func (f onlyStringsFilter) FilterLogFields(_ *Span, logFields []KeyValue) []KeyV } func TestChainedTagFilter(t *testing.T) { - expectedTags := []KeyValue{{Key: someStringTagKey, ValueType: stringType, ValueString: someStringTagValue}} + expectedTags := []KeyValue{{Key: someStringTagKey, ValueType: StringType, ValueString: someStringTagValue}} filter := NewChainedTagFilter(DefaultTagFilter, onlyStringsFilter{}) filteredTags := filter.FilterProcessTags(nil, someDBTags) compareTags(t, expectedTags, filteredTags) diff --git a/internal/storage/v1/cassandra/spanstore/dbmodel/unique_tags.go b/internal/storage/v1/cassandra/spanstore/dbmodel/unique_tags.go index 2ab3ddaa7db..c2fe186f3bf 100644 --- a/internal/storage/v1/cassandra/spanstore/dbmodel/unique_tags.go +++ b/internal/storage/v1/cassandra/spanstore/dbmodel/unique_tags.go @@ -14,7 +14,7 @@ func GetAllUniqueTags(span *Span, tagFilter TagFilter) []TagInsertion { SortKVs(allTags) uniqueTags := make([]TagInsertion, 0, len(allTags)) for i := range allTags { - if allTags[i].ValueType == binaryType { + if allTags[i].ValueType == BinaryType { continue // do not index binary tags } if i > 0 && allTags[i-1].Equal(&allTags[i]) { diff --git a/internal/storage/v1/cassandra/spanstore/mocks/mocks.go b/internal/storage/v1/cassandra/spanstore/mocks/mocks.go new file mode 100644 index 00000000000..84477734d57 --- /dev/null +++ b/internal/storage/v1/cassandra/spanstore/mocks/mocks.go @@ -0,0 +1,137 @@ +// Copyright (c) The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 +// +// Run 'make generate-mocks' to regenerate. + +// Code generated by mockery; DO NOT EDIT. +// github.com/vektra/mockery +// template: testify + +package mocks + +import ( + "github.com/jaegertracing/jaeger/internal/storage/v1/cassandra/spanstore/dbmodel" + mock "github.com/stretchr/testify/mock" +) + +// NewCoreSpanWriter creates a new instance of CoreSpanWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewCoreSpanWriter(t interface { + mock.TestingT + Cleanup(func()) +}) *CoreSpanWriter { + mock := &CoreSpanWriter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} + +// CoreSpanWriter is an autogenerated mock type for the CoreSpanWriter type +type CoreSpanWriter struct { + mock.Mock +} + +type CoreSpanWriter_Expecter struct { + mock *mock.Mock +} + +func (_m *CoreSpanWriter) EXPECT() *CoreSpanWriter_Expecter { + return &CoreSpanWriter_Expecter{mock: &_m.Mock} +} + +// Close provides a mock function for the type CoreSpanWriter +func (_mock *CoreSpanWriter) Close() error { + ret := _mock.Called() + + if len(ret) == 0 { + panic("no return value specified for Close") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func() error); ok { + r0 = returnFunc() + } else { + r0 = ret.Error(0) + } + return r0 +} + +// CoreSpanWriter_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type CoreSpanWriter_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *CoreSpanWriter_Expecter) Close() *CoreSpanWriter_Close_Call { + return &CoreSpanWriter_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *CoreSpanWriter_Close_Call) Run(run func()) *CoreSpanWriter_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *CoreSpanWriter_Close_Call) Return(err error) *CoreSpanWriter_Close_Call { + _c.Call.Return(err) + return _c +} + +func (_c *CoreSpanWriter_Close_Call) RunAndReturn(run func() error) *CoreSpanWriter_Close_Call { + _c.Call.Return(run) + return _c +} + +// WriteSpan provides a mock function for the type CoreSpanWriter +func (_mock *CoreSpanWriter) WriteSpan(span *dbmodel.Span) error { + ret := _mock.Called(span) + + if len(ret) == 0 { + panic("no return value specified for WriteSpan") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(*dbmodel.Span) error); ok { + r0 = returnFunc(span) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// CoreSpanWriter_WriteSpan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WriteSpan' +type CoreSpanWriter_WriteSpan_Call struct { + *mock.Call +} + +// WriteSpan is a helper method to define mock.On call +// - span *dbmodel.Span +func (_e *CoreSpanWriter_Expecter) WriteSpan(span interface{}) *CoreSpanWriter_WriteSpan_Call { + return &CoreSpanWriter_WriteSpan_Call{Call: _e.mock.On("WriteSpan", span)} +} + +func (_c *CoreSpanWriter_WriteSpan_Call) Run(run func(span *dbmodel.Span)) *CoreSpanWriter_WriteSpan_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 *dbmodel.Span + if args[0] != nil { + arg0 = args[0].(*dbmodel.Span) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *CoreSpanWriter_WriteSpan_Call) Return(err error) *CoreSpanWriter_WriteSpan_Call { + _c.Call.Return(err) + return _c +} + +func (_c *CoreSpanWriter_WriteSpan_Call) RunAndReturn(run func(span *dbmodel.Span) error) *CoreSpanWriter_WriteSpan_Call { + _c.Call.Return(run) + return _c +} diff --git a/internal/storage/v1/cassandra/spanstore/writer.go b/internal/storage/v1/cassandra/spanstore/writer.go index fb2c520889e..190bc26c11c 100644 --- a/internal/storage/v1/cassandra/spanstore/writer.go +++ b/internal/storage/v1/cassandra/spanstore/writer.go @@ -5,7 +5,6 @@ package spanstore import ( - "context" "encoding/json" "fmt" "strings" @@ -76,6 +75,11 @@ type spanWriterMetrics struct { durationIndex *casmetrics.Table } +type CoreSpanWriter interface { + WriteSpan(span *dbmodel.Span) error + Close() error +} + // SpanWriter handles all writes to Cassandra for the Jaeger data model type SpanWriter struct { session cassandra.Session @@ -130,22 +134,21 @@ func (s *SpanWriter) Close() error { } // WriteSpan saves the span into Cassandra -func (s *SpanWriter) WriteSpan(_ context.Context, span *model.Span) error { - ds := dbmodel.FromDomain(span) +func (s *SpanWriter) WriteSpan(span *dbmodel.Span) error { if s.storageMode&storeFlag == storeFlag { - if err := s.writeSpan(span, ds); err != nil { + if err := s.writeSpan(span); err != nil { return err } } if s.storageMode&indexFlag == indexFlag { - if err := s.writeIndexes(span, ds); err != nil { + if err := s.writeIndexes(span); err != nil { return err } } return nil } -func (s *SpanWriter) writeSpan(_ *model.Span, ds *dbmodel.Span) error { +func (s *SpanWriter) writeSpan(ds *dbmodel.Span) error { mainQuery := s.session.Query( insertSpan, ds.TraceID, @@ -167,11 +170,11 @@ func (s *SpanWriter) writeSpan(_ *model.Span, ds *dbmodel.Span) error { return nil } -func (s *SpanWriter) writeIndexes(span *model.Span, ds *dbmodel.Span) error { - spanKind, _ := span.GetSpanKind() // if not found it returns "" +func (s *SpanWriter) writeIndexes(ds *dbmodel.Span) error { + spanKind := dbmodel.GetSpanKind(ds) // if not found it returns "" if err := s.saveServiceNameAndOperationName(dbmodel.Operation{ ServiceName: ds.ServiceName, - SpanKind: string(spanKind), + SpanKind: spanKind, OperationName: ds.OperationName, }); err != nil { // should this be a soft failure? @@ -190,16 +193,13 @@ func (s *SpanWriter) writeIndexes(span *model.Span, ds *dbmodel.Span) error { } } - if span.Flags.IsFirehoseEnabled() { - return nil // skipping expensive indexing - } - if err := s.indexByTags(ds); err != nil { return s.logError(ds, err, "Failed to index tags", s.logger) } if s.indexFilter(ds, dbmodel.DurationIndex) { - if err := s.indexByDuration(ds, span.StartTime); err != nil { + //nolint:gosec // G115 + if err := s.indexByDuration(ds, model.EpochMicrosecondsAsTime(uint64(ds.StartTime))); err != nil { return s.logError(ds, err, "Failed to index duration", s.logger) } } diff --git a/internal/storage/v1/cassandra/spanstore/writer_test.go b/internal/storage/v1/cassandra/spanstore/writer_test.go index 42a1126d9b2..6a574ca1dc2 100644 --- a/internal/storage/v1/cassandra/spanstore/writer_test.go +++ b/internal/storage/v1/cassandra/spanstore/writer_test.go @@ -5,11 +5,9 @@ package spanstore import ( - "context" "errors" "fmt" "strings" - "sync/atomic" "testing" "time" @@ -18,14 +16,14 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" - "github.com/jaegertracing/jaeger-idl/model/v1" "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/internal/storage/cassandra/mocks" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore" "github.com/jaegertracing/jaeger/internal/storage/v1/cassandra/spanstore/dbmodel" "github.com/jaegertracing/jaeger/internal/testutils" ) +var traceId = [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1} + type spanWriterTest struct { session *mocks.Session logger *zap.Logger @@ -54,8 +52,6 @@ func withSpanWriter(t *testing.T, writeCacheTTL time.Duration, fn func(w *spanWr fn(w) } -var _ spanstore.Writer = &SpanWriter{} // check API conformance - func TestNewSpanWriter(t *testing.T) { t.Run("test span writer creation", func(t *testing.T) { withSpanWriter(t, 0, func(w *spanWriterTest) { @@ -93,7 +89,6 @@ func TestClientClose(t *testing.T) { func TestSpanWriter(t *testing.T) { testCases := []struct { caption string - firehose bool mainQueryError error tagsQueryError error serviceNameQueryError error @@ -106,10 +101,6 @@ func TestSpanWriter(t *testing.T) { { caption: "main query", }, - { - caption: "main firehose query", - firehose: true, - }, { caption: "main query error", mainQueryError: errors.New("main query error"), @@ -179,20 +170,25 @@ func TestSpanWriter(t *testing.T) { testCase := tc // capture loop var t.Run(testCase.caption, func(t *testing.T) { withSpanWriter(t, 0, func(w *spanWriterTest) { - span := &model.Span{ - TraceID: model.NewTraceID(0, 1), + span := &dbmodel.Span{ + TraceID: traceId, OperationName: "operation-a", - Tags: model.KeyValues{ - model.String("x", "y"), - model.String("json", `{"x":"y"}`), // string tag with json value will not be inserted + Tags: []dbmodel.KeyValue{ + { + Key: "x", + ValueType: dbmodel.StringType, + ValueString: "y", + }, + { + Key: "json", + ValueType: dbmodel.StringType, + ValueString: `{"x":"y"}`, + }, // string tag with json value will not be inserted }, - Process: &model.Process{ + Process: dbmodel.Process{ ServiceName: "service-a", }, } - if testCase.firehose { - span.Flags = model.FirehoseFlag - } spanQuery := &mocks.Query{} spanQuery.On("Bind", mock.Anything).Return(spanQuery) @@ -228,7 +224,7 @@ func TestSpanWriter(t *testing.T) { w.writer.serviceNamesWriter = func(_ /* serviceName */ string) error { return testCase.serviceNameError } w.writer.operationNamesWriter = func(_ dbmodel.Operation) error { return testCase.serviceNameError } - err := w.writer.WriteSpan(context.Background(), span) + err := w.writer.WriteSpan(span) if testCase.expectedError == "" { require.NoError(t, err) @@ -318,9 +314,9 @@ func TestStorageMode_IndexOnly(t *testing.T) { withSpanWriter(t, 0, func(w *spanWriterTest) { w.writer.serviceNamesWriter = func(_ /* serviceName */ string) error { return nil } w.writer.operationNamesWriter = func(_ dbmodel.Operation) error { return nil } - span := &model.Span{ - TraceID: model.NewTraceID(0, 1), - Process: &model.Process{ + span := &dbmodel.Span{ + TraceID: traceId, + Process: dbmodel.Process{ ServiceName: "service-a", }, } @@ -341,7 +337,7 @@ func TestStorageMode_IndexOnly(t *testing.T) { w.session.On("Query", serviceOperationIndex).Return(serviceOperationNameQuery) w.session.On("Query", durationIndex).Return(durationNoOperationQuery).Once() - err := w.writer.WriteSpan(context.Background(), span) + err := w.writer.WriteSpan(span) require.NoError(t, err) serviceNameQuery.AssertExpectations(t) @@ -361,13 +357,13 @@ func TestStorageMode_IndexOnly_WithFilter(t *testing.T) { w.writer.indexFilter = filterEverything w.writer.serviceNamesWriter = func(_ /* serviceName */ string) error { return nil } w.writer.operationNamesWriter = func(_ dbmodel.Operation) error { return nil } - span := &model.Span{ - TraceID: model.NewTraceID(0, 1), - Process: &model.Process{ + span := &dbmodel.Span{ + TraceID: traceId, + Process: dbmodel.Process{ ServiceName: "service-a", }, } - err := w.writer.WriteSpan(context.Background(), span) + err := w.writer.WriteSpan(span) require.NoError(t, err) w.session.AssertExpectations(t) w.session.AssertNotCalled(t, "Query", serviceOperationIndex, mock.Anything) @@ -376,67 +372,15 @@ func TestStorageMode_IndexOnly_WithFilter(t *testing.T) { }, StoreIndexesOnly()) } -func TestStorageMode_IndexOnly_FirehoseSpan(t *testing.T) { - withSpanWriter(t, 0, func(w *spanWriterTest) { - var serviceWritten atomic.Pointer[string] - var operationWritten atomic.Pointer[dbmodel.Operation] - empty := "" - serviceWritten.Store(&empty) - operationWritten.Store(&dbmodel.Operation{}) - w.writer.serviceNamesWriter = func(serviceName string) error { - serviceWritten.Store(&serviceName) - return nil - } - w.writer.operationNamesWriter = func(operation dbmodel.Operation) error { - operationWritten.Store(&operation) - return nil - } - span := &model.Span{ - TraceID: model.NewTraceID(0, 1), - OperationName: "package-delivery", - Process: &model.Process{ - ServiceName: "planet-express", - }, - Flags: model.Flags(8), - } - - serviceNameQuery := &mocks.Query{} - serviceNameQuery.On("Bind", mock.Anything).Return(serviceNameQuery) - serviceNameQuery.On("Exec").Return(nil) - serviceNameQuery.On("String").Return("select from service_name_index") - - serviceOperationNameQuery := &mocks.Query{} - serviceOperationNameQuery.On("Bind", mock.Anything).Return(serviceOperationNameQuery) - serviceOperationNameQuery.On("Exec").Return(nil) - serviceOperationNameQuery.On("String").Return("select from service_operation_index") - - // Define expected queries - w.session.On("Query", serviceNameIndex).Return(serviceNameQuery) - w.session.On("Query", serviceOperationIndex).Return(serviceOperationNameQuery) - - err := w.writer.WriteSpan(context.Background(), span) - require.NoError(t, err) - w.session.AssertExpectations(t) - w.session.AssertNotCalled(t, "Query", tagIndex, mock.Anything) - w.session.AssertNotCalled(t, "Query", durationIndex, mock.Anything) - assert.Equal(t, "planet-express", *serviceWritten.Load()) - assert.Equal(t, dbmodel.Operation{ - ServiceName: "planet-express", - SpanKind: "", - OperationName: "package-delivery", - }, *operationWritten.Load()) - }, StoreIndexesOnly()) -} - func TestStorageMode_StoreWithoutIndexing(t *testing.T) { withSpanWriter(t, 0, func(w *spanWriterTest) { w.writer.serviceNamesWriter = func(_ /* serviceName */ string) error { assert.Fail(t, "Non indexing store shouldn't index") return nil } - span := &model.Span{ - TraceID: model.NewTraceID(0, 1), - Process: &model.Process{ + span := &dbmodel.Span{ + TraceID: traceId, + Process: dbmodel.Process{ ServiceName: "service-a", }, } @@ -444,7 +388,7 @@ func TestStorageMode_StoreWithoutIndexing(t *testing.T) { spanQuery.On("Exec").Return(nil) w.session.On("Query", insertSpan, mock.Anything).Return(spanQuery) - err := w.writer.WriteSpan(context.Background(), span) + err := w.writer.WriteSpan(span) require.NoError(t, err) spanQuery.AssertExpectations(t) diff --git a/internal/storage/v1/cassandra/spanstore/writerv1.go b/internal/storage/v1/cassandra/spanstore/writerv1.go new file mode 100644 index 00000000000..b662ed62dbe --- /dev/null +++ b/internal/storage/v1/cassandra/spanstore/writerv1.go @@ -0,0 +1,46 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package spanstore + +import ( + "context" + "time" + + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger-idl/model/v1" + "github.com/jaegertracing/jaeger/internal/metrics" + "github.com/jaegertracing/jaeger/internal/storage/cassandra" + "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore" + "github.com/jaegertracing/jaeger/internal/storage/v1/cassandra/spanstore/dbmodel" +) + +var _ spanstore.Writer = &SpanWriterV1{} // check API conformance + +type SpanWriterV1 struct { + writer CoreSpanWriter +} + +func NewSpanWriterV1( + session cassandra.Session, + writeCacheTTL time.Duration, + metricsFactory metrics.Factory, + logger *zap.Logger, + options ...Option, +) (*SpanWriterV1, error) { + writer, err := NewSpanWriter(session, writeCacheTTL, metricsFactory, logger, options...) + if err != nil { + return nil, err + } + return &SpanWriterV1{writer: writer}, nil +} + +func (s *SpanWriterV1) WriteSpan(_ context.Context, span *model.Span) error { + ds := dbmodel.FromDomain(span) + return s.writer.WriteSpan(ds) +} + +func (s *SpanWriterV1) Close() error { + return s.writer.Close() +} diff --git a/internal/storage/v1/cassandra/spanstore/writerv1_test.go b/internal/storage/v1/cassandra/spanstore/writerv1_test.go new file mode 100644 index 00000000000..ecd6246e92c --- /dev/null +++ b/internal/storage/v1/cassandra/spanstore/writerv1_test.go @@ -0,0 +1,65 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package spanstore + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger-idl/model/v1" + "github.com/jaegertracing/jaeger/internal/metricstest" + "github.com/jaegertracing/jaeger/internal/storage/cassandra/mocks" + "github.com/jaegertracing/jaeger/internal/storage/v1/cassandra/spanstore/dbmodel" + storemocks "github.com/jaegertracing/jaeger/internal/storage/v1/cassandra/spanstore/mocks" + "github.com/jaegertracing/jaeger/internal/testutils" +) + +func TestErrNewSpanWriterV1(t *testing.T) { + session := &mocks.Session{} + query := &mocks.Query{} + query.On("Exec").Return(errors.New("some error")) + session.On("Query", + fmt.Sprintf(tableCheckStmt, schemas[latestVersion].tableName), + mock.Anything).Return(query) + session.On("Query", + fmt.Sprintf(tableCheckStmt, schemas[previousVersion].tableName), + mock.Anything).Return(query) + metricsFactory := metricstest.NewFactory(0) + logger, _ := testutils.NewLogger() + _, err := NewSpanWriterV1(session, 0, metricsFactory, logger) + require.ErrorContains(t, err, "neither table operation_names_v2 nor operation_names exist") +} + +func TestWriteSpan(t *testing.T) { + data, err := json.Marshal(map[string]any{"x": "y"}) + require.NoError(t, err) + span := &model.Span{ + TraceID: model.NewTraceID(0, 1), + OperationName: "operation-a", + Tags: model.KeyValues{ + model.String("x", "y"), + model.Binary("json", data), // string tag with json value will not be inserted + }, + Process: &model.Process{ + ServiceName: "service-a", + }, + } + mockWriter := &storemocks.CoreSpanWriter{} + mockWriter.On("WriteSpan", dbmodel.FromDomain(span)).Return(nil) + writer := SpanWriterV1{writer: mockWriter} + require.NoError(t, writer.WriteSpan(context.Background(), span)) +} + +func TestSpanWriterV1Close(t *testing.T) { + mockWriter := &storemocks.CoreSpanWriter{} + mockWriter.On("Close").Return(nil) + writer := SpanWriterV1{writer: mockWriter} + require.NoError(t, writer.Close()) +}