Skip to content

Commit

Permalink
[v2][storage] Implement reverse adapter to translate v2 storage api t…
Browse files Browse the repository at this point in the history
…o v1 (#6485)

## Which problem is this PR solving?
- Resolves #6480

## Description of the changes
- This PR implements a reverse adapter (`SpanReader`) that wraps a
native v2 storage interface (`tracestore.Reader`) and downgrades it to
implement the v1 storage interface (`spanstore.Reader`).
- The reverse adapter was integrated with the v1 query service. This
code path will only get executed once we start upgrading the existing
storage implementations to implement the new `tracestore.Reader`
interface as a part of
#6458

## How was this change tested?
- CI 
- Added new unit tests

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 authored Jan 5, 2025
1 parent a44d8b1 commit 4024a24
Show file tree
Hide file tree
Showing 9 changed files with 623 additions and 14 deletions.
6 changes: 3 additions & 3 deletions cmd/query/app/apiv3/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func parseResponse(t *testing.T, body []byte, obj gogoproto.Message) {
require.NoError(t, gogojsonpb.Unmarshal(bytes.NewBuffer(body), obj))
}

func makeTestTraceV2() ptrace.Traces {
func makeTestTrace() ptrace.Traces {
trace := ptrace.NewTraces()
resources := trace.ResourceSpans().AppendEmpty()
scopes := resources.ScopeSpans().AppendEmpty()
Expand Down Expand Up @@ -147,7 +147,7 @@ func (gw *testGateway) runGatewayGetTrace(t *testing.T) {
gw.reader.
On("GetTraces", matchContext, query).
Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
yield([]ptrace.Traces{makeTestTraceV2()}, nil)
yield([]ptrace.Traces{makeTestTrace()}, nil)
})).Once()
gw.getTracesAndVerify(t, "/api/v3/traces/1", traceID)
}
Expand All @@ -156,7 +156,7 @@ func (gw *testGateway) runGatewayFindTraces(t *testing.T) {
q, qp := mockFindQueries()
gw.reader.On("FindTraces", matchContext, qp).
Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
yield([]ptrace.Traces{makeTestTraceV2()}, nil)
yield([]ptrace.Traces{makeTestTrace()}, nil)
})).Once()
gw.getTracesAndVerify(t, "/api/v3/traces?"+q.Encode(), traceID)
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/query/app/apiv3/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestGetTrace(t *testing.T) {
tsc := newTestServerClient(t)
tsc.reader.On("GetTraces", matchContext, tc.expectedQuery).
Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
yield([]ptrace.Traces{makeTestTraceV2()}, nil)
yield([]ptrace.Traces{makeTestTrace()}, nil)
})).Once()

getTraceStream, err := tsc.client.GetTrace(context.Background(), &tc.request)
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestFindTraces(t *testing.T) {
tsc := newTestServerClient(t)
tsc.reader.On("FindTraces", matchContext, mock.AnythingOfType("tracestore.TraceQueryParams")).
Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
yield([]ptrace.Traces{makeTestTraceV2()}, nil)
yield([]ptrace.Traces{makeTestTrace()}, nil)
})).Once()

responseStream, err := tsc.client.FindTraces(context.Background(), &api_v3.FindTracesRequest{
Expand All @@ -188,7 +188,7 @@ func TestFindTracesSendError(t *testing.T) {
reader := new(tracestoremocks.Reader)
reader.On("FindTraces", mock.Anything, mock.AnythingOfType("tracestore.TraceQueryParams")).
Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
yield([]ptrace.Traces{makeTestTraceV2()}, nil)
yield([]ptrace.Traces{makeTestTrace()}, nil)
})).Once()
h := &Handler{
QueryService: querysvc.NewQueryService(
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/apiv3/http_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestHTTPGatewayGetTrace(t *testing.T) {
gw.reader.
On("GetTraces", matchContext, tc.expectedQuery).
Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
yield([]ptrace.Traces{makeTestTraceV2()}, nil)
yield([]ptrace.Traces{makeTestTrace()}, nil)
})).Once()

q := url.Values{}
Expand Down
5 changes: 3 additions & 2 deletions cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ type TraceQueryParameters struct {
func NewQueryService(traceReader tracestore.Reader, dependencyReader depstore.Reader, options QueryServiceOptions) *QueryService {
spanReader, err := v1adapter.GetV1Reader(traceReader)
if err != nil {
// TODO: implement a reverse adapter to convert v2 reader to v1 reader
panic(err)
// if the spanstore.Reader is not available, downgrade the native tracestore.Reader to
// a spanstore.Reader
spanReader = v1adapter.NewSpanReader(traceReader)
}
qsvc := &QueryService{
spanReader: spanReader,
Expand Down
34 changes: 29 additions & 5 deletions cmd/query/app/querysvc/query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
depsmocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
tracestoremocks "github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)
Expand Down Expand Up @@ -513,9 +514,32 @@ func TestMain(m *testing.M) {
testutils.VerifyGoLeaks(m)
}

func TestNewQueryService_PanicsForNonV1AdapterReader(t *testing.T) {
reader := &tracestoremocks.Reader{}
dependencyReader := &depsmocks.Reader{}
options := QueryServiceOptions{}
require.PanicsWithError(t, v1adapter.ErrV1ReaderNotAvailable.Error(), func() { NewQueryService(reader, dependencyReader, options) })
func TestNewQueryService_UsesCorrectTypeForSpanReader(t *testing.T) {
tests := []struct {
name string
reader tracestore.Reader
expectedType spanstore.Reader
}{
{
name: "wrapped spanstore.Reader gets extracted",
reader: func() tracestore.Reader {
reader := &spanstoremocks.Reader{}
return v1adapter.NewTraceReader(reader)
}(),
expectedType: &spanstoremocks.Reader{},
},
{
name: "tracestore.Reader gets downgraded to v1 spanstore.Reader",
reader: &tracestoremocks.Reader{},
expectedType: &v1adapter.SpanReader{},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
dependencyReader := &depsmocks.Reader{}
options := QueryServiceOptions{}
qs := NewQueryService(test.reader, dependencyReader, options)
assert.IsType(t, test.expectedType, qs.spanReader)
})
}
}
106 changes: 106 additions & 0 deletions storage_v2/v1adapter/spanreader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package v1adapter

import (
"context"
"errors"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

var _ spanstore.Reader = (*SpanReader)(nil)

var errTooManyTracesFound = errors.New("too many traces found")

// SpanReader wraps a tracestore.Reader so that it can be downgraded to implement
// the v1 spanstore.Reader interface.
type SpanReader struct {
traceReader tracestore.Reader
}

func NewSpanReader(traceReader tracestore.Reader) *SpanReader {
return &SpanReader{
traceReader: traceReader,
}
}

func (sr *SpanReader) GetTrace(ctx context.Context, query spanstore.GetTraceParameters) (*model.Trace, error) {
getTracesIter := sr.traceReader.GetTraces(ctx, tracestore.GetTraceParams{
TraceID: query.TraceID.ToOTELTraceID(),
Start: query.StartTime,
End: query.EndTime,
})
traces, err := V1TracesFromSeq2(getTracesIter)
if err != nil {
return nil, err
}
if len(traces) == 0 {
return nil, spanstore.ErrTraceNotFound
} else if len(traces) > 1 {
return nil, errTooManyTracesFound
}
return traces[0], nil
}

func (sr *SpanReader) GetServices(ctx context.Context) ([]string, error) {
return sr.traceReader.GetServices(ctx)
}

func (sr *SpanReader) GetOperations(
ctx context.Context,
query spanstore.OperationQueryParameters,
) ([]spanstore.Operation, error) {
o, err := sr.traceReader.GetOperations(ctx, tracestore.OperationQueryParams{
ServiceName: query.ServiceName,
SpanKind: query.SpanKind,
})
if err != nil || o == nil {
return nil, err
}
operations := []spanstore.Operation{}
for _, operation := range o {
operations = append(operations, spanstore.Operation{
Name: operation.Name,
SpanKind: operation.SpanKind,
})
}
return operations, nil
}

func (sr *SpanReader) FindTraces(
ctx context.Context,
query *spanstore.TraceQueryParameters,
) ([]*model.Trace, error) {
getTracesIter := sr.traceReader.FindTraces(ctx, tracestore.TraceQueryParams{
ServiceName: query.ServiceName,
OperationName: query.OperationName,
Tags: query.Tags,
StartTimeMin: query.StartTimeMin,
StartTimeMax: query.StartTimeMax,
DurationMin: query.DurationMin,
DurationMax: query.DurationMax,
NumTraces: query.NumTraces,
})
return V1TracesFromSeq2(getTracesIter)
}

func (sr *SpanReader) FindTraceIDs(
ctx context.Context,
query *spanstore.TraceQueryParameters,
) ([]model.TraceID, error) {
traceIDsIter := sr.traceReader.FindTraceIDs(ctx, tracestore.TraceQueryParams{
ServiceName: query.ServiceName,
OperationName: query.OperationName,
Tags: query.Tags,
StartTimeMin: query.StartTimeMin,
StartTimeMax: query.StartTimeMax,
DurationMin: query.DurationMin,
DurationMax: query.DurationMax,
NumTraces: query.NumTraces,
})
return V1TraceIDsFromSeq2(traceIDsIter)
}
Loading

0 comments on commit 4024a24

Please sign in to comment.