Skip to content

Commit f0368d1

Browse files
feat: add OTLP canary query (#4488)
* Working gRPC probe * Fixed incorrect guessing of OTLP sample types * Fixed lints
1 parent e0e688b commit f0368d1

File tree

4 files changed

+275
-6
lines changed

4 files changed

+275
-6
lines changed

.vscode/launch.json

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,26 @@
1010
"request": "launch",
1111
"mode": "auto",
1212
"program": "${workspaceFolder}/cmd/pyroscope",
13+
},
14+
{
15+
"name": "canary-exporter",
16+
"type": "go",
17+
"request": "launch",
18+
"mode": "auto",
19+
"program": "${workspaceFolder}/cmd/profilecli",
20+
"args": [
21+
"canary-exporter"
22+
]
23+
}
24+
],
25+
"compounds": [
26+
{
27+
"name": "canary+pyroscope",
28+
"configurations": [
29+
"pyroscope",
30+
"canary-exporter"
31+
],
32+
"stopAll": true
1333
}
1434
]
1535
}

cmd/profilecli/canary_exporter.go

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ func newCanaryExporter(params *canaryExporterParams) *canaryExporter {
160160
}
161161

162162
ce.queryProbes = append(ce.queryProbes, &queryProbe{name: "query-select-merge-profile", f: ce.testSelectMergeProfile})
163+
ce.queryProbes = append(ce.queryProbes, &queryProbe{name: "query-select-merge-otlp-profile", f: ce.testSelectMergeOTLPProfile})
163164

164165
if params.QueryProbeSet == "all" {
165166
ce.queryProbes = append(ce.queryProbes, &queryProbe{"query-profile-types", ce.testProfileTypes})
@@ -256,7 +257,10 @@ func (ce *canaryExporter) doTrace(ctx context.Context, probeName string) (rCtx c
256257

257258
return rCtx, func(result bool) {
258259
// At this point body is fully read and we can write end time.
259-
tt.current.end = time.Now()
260+
// Note: tt.current may be nil for non-HTTP probes (e.g., gRPC)
261+
if tt.current != nil {
262+
tt.current.end = time.Now()
263+
}
260264

261265
// record body size
262266
ce.metrics.bodyUncompressedLength.WithLabelValues(probeName).Set(float64(tt.bodySize.Load()))
@@ -310,11 +314,46 @@ func (ce *canaryExporter) doTrace(ctx context.Context, probeName string) (rCtx c
310314
func (ce *canaryExporter) testPyroscopeCell(ctx context.Context) error {
311315
now := time.Now()
312316

313-
// ingest a fake profile
317+
// Run all ingest probes
318+
var ingestErrors multierror.MultiError
319+
320+
// ingest a fake profile using the original method
314321
if err := ce.runProbe(ctx, "ingest", func(rCtx context.Context) error {
315322
return ce.testIngestProfile(rCtx, now)
316323
}); err != nil {
317-
return fmt.Errorf("error during ingestion: %w", err)
324+
ingestErrors.Add(fmt.Errorf("error during standard ingestion: %w", err))
325+
}
326+
327+
// ingest via OTLP gRPC
328+
if err := ce.runProbe(ctx, "ingest-otlp-grpc", func(rCtx context.Context) error {
329+
return ce.testIngestOTLPGrpc(rCtx, now)
330+
}); err != nil {
331+
ingestErrors.Add(fmt.Errorf("error during OTLP gRPC ingestion: %w", err))
332+
}
333+
334+
// ingest via OTLP HTTP/JSON
335+
// Note: HTTP endpoints are not yet implemented in Pyroscope (see pkg/api/api.go:204-205)
336+
// Uncomment when /v1/profiles endpoint is available
337+
// if err := ce.runProbe(ctx, "ingest-otlp-http-json", func(rCtx context.Context) error {
338+
// return ce.testIngestOTLPHttpJson(rCtx, now)
339+
// }); err != nil {
340+
// ingestErrors.Add(fmt.Errorf("error during OTLP HTTP/JSON ingestion: %w", err))
341+
// }
342+
343+
// ingest via OTLP HTTP/Protobuf
344+
// Note: HTTP endpoints are not yet implemented in Pyroscope (see pkg/api/api.go:204-205)
345+
// Uncomment when /v1/profiles endpoint is available
346+
// if err := ce.runProbe(ctx, "ingest-otlp-http-protobuf", func(rCtx context.Context) error {
347+
// return ce.testIngestOTLPHttpProtobuf(rCtx, now)
348+
// }); err != nil {
349+
// ingestErrors.Add(fmt.Errorf("error during OTLP HTTP/Protobuf ingestion: %w", err))
350+
// }
351+
352+
// Report ingestion errors if any
353+
if ingestErrors.Err() != nil {
354+
for _, line := range strings.Split(ingestErrors.Err().Error(), "\n") {
355+
level.Error(logger).Log("msg", "ingestion error", "err", line)
356+
}
318357
}
319358

320359
if ce.params.TestDelay > 0 {

cmd/profilecli/canary_exporter_probes.go

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ import (
1919
gprofile "github.com/google/pprof/profile"
2020
"github.com/google/uuid"
2121
"github.com/pkg/errors"
22+
profilesv1 "go.opentelemetry.io/proto/otlp/collector/profiles/v1development"
23+
commonv1 "go.opentelemetry.io/proto/otlp/common/v1"
24+
otlpprofiles "go.opentelemetry.io/proto/otlp/profiles/v1development"
25+
resourcev1 "go.opentelemetry.io/proto/otlp/resource/v1"
26+
"google.golang.org/grpc"
27+
"google.golang.org/grpc/credentials/insecure"
2228

2329
googlev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
2430
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
@@ -31,6 +37,7 @@ import (
3137

3238
const (
3339
profileTypeID = "deadmans_switch:made_up:profilos:made_up:profilos"
40+
otlpProfileTypeID = "otlp_test:otlp_test:count:otlp_test:samples"
3441
canaryExporterServiceName = "pyroscope-canary-exporter"
3542
)
3643

@@ -74,6 +81,140 @@ func (ce *canaryExporter) testIngestProfile(ctx context.Context, now time.Time)
7481
return nil
7582
}
7683

84+
// generateOTLPProfile creates an OTLP profile with the specified ingestion method label
85+
func (ce *canaryExporter) generateOTLPProfile(now time.Time, ingestionMethod string) *profilesv1.ExportProfilesServiceRequest {
86+
// Sanitize the ingestion method label value by replacing "/" with "_"
87+
sanitizedMethod := strings.ReplaceAll(ingestionMethod, "/", "_")
88+
89+
// Create the profile dictionary with custom profile type similar to pprof probe
90+
dictionary := &otlpprofiles.ProfilesDictionary{
91+
StringTable: []string{
92+
"", // 0: empty string
93+
"otlp_test", // 1
94+
"samples", // 2
95+
"count", // 3
96+
"func1", // 4
97+
"func2", // 5
98+
"ingestion_method", // 6
99+
sanitizedMethod, // 7
100+
},
101+
MappingTable: []*otlpprofiles.Mapping{
102+
{}, // 0: empty mapping (required null entry)
103+
},
104+
FunctionTable: []*otlpprofiles.Function{
105+
{NameStrindex: 0}, // 0: empty
106+
{NameStrindex: 4}, // 1: func1
107+
{NameStrindex: 5}, // 2: func2
108+
},
109+
LocationTable: []*otlpprofiles.Location{
110+
{Line: []*otlpprofiles.Line{{FunctionIndex: 1}}}, // 0: func1
111+
{Line: []*otlpprofiles.Line{{FunctionIndex: 2}}}, // 1: func2
112+
},
113+
StackTable: []*otlpprofiles.Stack{
114+
{LocationIndices: []int32{}}, // 0: empty (required null entry)
115+
{LocationIndices: []int32{1, 0}}, // 1: func2, func1 stack
116+
{LocationIndices: []int32{0}}, // 2: func1 stack
117+
},
118+
}
119+
120+
// Create profile with two samples matching the original pprof profile
121+
profile := &otlpprofiles.Profile{
122+
TimeUnixNano: uint64(now.UnixNano()),
123+
DurationNano: 0,
124+
Period: 1,
125+
SampleType: &otlpprofiles.ValueType{
126+
TypeStrindex: 1, // "otlp_test"
127+
UnitStrindex: 3, // "count"
128+
},
129+
PeriodType: &otlpprofiles.ValueType{
130+
TypeStrindex: 1, // "otlp_test"
131+
UnitStrindex: 2, // "samples"
132+
},
133+
Sample: []*otlpprofiles.Sample{
134+
{
135+
// func1>func2 with value 10
136+
StackIndex: 1, // stack_table[1]
137+
Values: []int64{10},
138+
},
139+
{
140+
// func1 with value 20
141+
StackIndex: 2, // stack_table[2]
142+
Values: []int64{20},
143+
},
144+
},
145+
}
146+
147+
// Create the resource attributes
148+
resourceAttrs := []*commonv1.KeyValue{
149+
{
150+
Key: "service.name",
151+
Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: canaryExporterServiceName}},
152+
},
153+
{
154+
Key: "job",
155+
Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "canary-exporter"}},
156+
},
157+
{
158+
Key: "instance",
159+
Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: ce.hostname}},
160+
},
161+
{
162+
Key: "ingestion_method",
163+
Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: sanitizedMethod}},
164+
},
165+
}
166+
167+
// Create the OTLP request
168+
req := &profilesv1.ExportProfilesServiceRequest{
169+
Dictionary: dictionary,
170+
ResourceProfiles: []*otlpprofiles.ResourceProfiles{
171+
{
172+
Resource: &resourcev1.Resource{
173+
Attributes: resourceAttrs,
174+
},
175+
ScopeProfiles: []*otlpprofiles.ScopeProfiles{
176+
{
177+
Scope: &commonv1.InstrumentationScope{
178+
Name: "pyroscope-canary-exporter",
179+
},
180+
Profiles: []*otlpprofiles.Profile{profile},
181+
},
182+
},
183+
},
184+
},
185+
}
186+
187+
return req
188+
}
189+
190+
func (ce *canaryExporter) testIngestOTLPGrpc(ctx context.Context, now time.Time) error {
191+
// Generate the OTLP profile with the appropriate ingestion method label
192+
req := ce.generateOTLPProfile(now, "otlp/grpc")
193+
194+
// Create gRPC connection
195+
grpcAddr := strings.TrimPrefix(ce.params.URL, "http://")
196+
grpcAddr = strings.TrimPrefix(grpcAddr, "https://")
197+
198+
conn, err := grpc.NewClient(grpcAddr,
199+
grpc.WithTransportCredentials(insecure.NewCredentials()))
200+
if err != nil {
201+
return fmt.Errorf("failed to connect to gRPC server: %w", err)
202+
}
203+
defer conn.Close()
204+
205+
// Create OTLP profiles service client
206+
client := profilesv1.NewProfilesServiceClient(conn)
207+
208+
// Send the profile
209+
_, err = client.Export(ctx, req)
210+
if err != nil {
211+
return fmt.Errorf("failed to export OTLP profile via gRPC: %w", err)
212+
}
213+
214+
level.Info(logger).Log("msg", "successfully ingested OTLP profile via gRPC")
215+
return nil
216+
}
217+
77218
func (ce *canaryExporter) testSelectMergeProfile(ctx context.Context, now time.Time) error {
78219
respQuery, err := ce.params.queryClient().SelectMergeProfile(ctx, connect.NewRequest(&querierv1.SelectMergeProfileRequest{
79220
Start: now.UnixMilli(),
@@ -128,6 +269,65 @@ func (ce *canaryExporter) testSelectMergeProfile(ctx context.Context, now time.T
128269
return nil
129270
}
130271

272+
func (ce *canaryExporter) testSelectMergeOTLPProfile(ctx context.Context, now time.Time) error {
273+
// Query specifically for OTLP gRPC ingested profiles using the custom profile type
274+
//labelSelector := fmt.Sprintf(`{service_name="%s", job="canary-exporter", instance="%s"}`, canaryExporterServiceName, ce.hostname)
275+
276+
respQuery, err := ce.params.queryClient().SelectMergeProfile(ctx, connect.NewRequest(&querierv1.SelectMergeProfileRequest{
277+
Start: now.UnixMilli(),
278+
End: now.Add(5 * time.Second).UnixMilli(),
279+
LabelSelector: ce.createLabelSelector(),
280+
ProfileTypeID: otlpProfileTypeID,
281+
}))
282+
if err != nil {
283+
return fmt.Errorf("failed to query OTLP profile: %w", err)
284+
}
285+
286+
buf, err := respQuery.Msg.MarshalVT()
287+
if err != nil {
288+
return errors.Wrap(err, "failed to marshal protobuf")
289+
}
290+
291+
gp, err := gprofile.Parse(bytes.NewReader(buf))
292+
if err != nil {
293+
return errors.Wrap(err, "failed to parse profile")
294+
}
295+
296+
// Verify the expected stacktraces from the OTLP profile
297+
expected := map[string]int64{
298+
"func2>func1": 10,
299+
"func1": 20,
300+
}
301+
actual := make(map[string]int64)
302+
303+
var sb strings.Builder
304+
for _, s := range gp.Sample {
305+
sb.Reset()
306+
for _, loc := range s.Location {
307+
if sb.Len() != 0 {
308+
_, err := sb.WriteRune('>')
309+
if err != nil {
310+
return err
311+
}
312+
}
313+
for _, line := range loc.Line {
314+
_, err := sb.WriteString(line.Function.Name)
315+
if err != nil {
316+
return err
317+
}
318+
}
319+
}
320+
actual[sb.String()] = actual[sb.String()] + s.Value[0]
321+
}
322+
323+
if diff := cmp.Diff(expected, actual); diff != "" {
324+
return fmt.Errorf("OTLP profile query mismatch (-expected, +actual):\n%s", diff)
325+
}
326+
327+
level.Info(logger).Log("msg", "successfully queried OTLP profile via gRPC")
328+
return nil
329+
}
330+
131331
func (ce *canaryExporter) testProfileTypes(ctx context.Context, now time.Time) error {
132332
respQuery, err := ce.params.queryClient().ProfileTypes(ctx, connect.NewRequest(&querierv1.ProfileTypesRequest{
133333
Start: now.UnixMilli(),

pkg/ingester/otlp/convert.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package otlp
33
import (
44
"encoding/hex"
55
"fmt"
6+
"strings"
67
"time"
78

89
otelProfile "go.opentelemetry.io/proto/otlp/profiles/v1development"
@@ -144,14 +145,23 @@ func newProfileBuilder(src *otelProfile.Profile, dictionary *otelProfile.Profile
144145
}
145146
}
146147
res.sampleProcessingTypes[i] = sampleConversionTypeSamplesToNanos
147-
}
148-
// Identify off cpu profiles
149-
if profileType == "events:nanoseconds::" && len(res.dst.SampleType) == 1 {
148+
} else if profileType == "events:nanoseconds::" && len(res.dst.SampleType) == 1 { // Identify off-CPU profiles
149+
150150
res.sampleProcessingTypes[i] = sampleConversionTypeSumEvents
151151
res.name = &typesv1.LabelPair{
152152
Name: pyromodel.LabelNameProfileName,
153153
Value: pyromodel.ProfileNameOffCpu,
154154
}
155+
} else { // Custom profile type
156+
// Try to extract profile name from the type, e.g. "wall:time:cpu:milliseconds" -> "wall"
157+
parts := strings.Split(profileType, `:`)
158+
if len(parts) >= 3 {
159+
res.name = &typesv1.LabelPair{
160+
Name: pyromodel.LabelNameProfileName,
161+
Value: parts[2],
162+
}
163+
res.sampleProcessingTypes[i] = sampleConversionTypeNone
164+
}
155165
}
156166
}
157167
if res.name == nil {

0 commit comments

Comments
 (0)