Skip to content

Commit 690a4f6

Browse files
Add OTLP HTTP/json and HTTP/protobuf ingestion (#4495)
* Added HTTP/json and HTTP/protobuf OTLP ingestion * Add http/json+http/protobuf ingestion with canary * Updated endpoint path to match OTLP
1 parent f0368d1 commit 690a4f6

File tree

4 files changed

+174
-19
lines changed

4 files changed

+174
-19
lines changed

cmd/profilecli/canary_exporter.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -333,21 +333,19 @@ func (ce *canaryExporter) testPyroscopeCell(ctx context.Context) error {
333333

334334
// ingest via OTLP HTTP/JSON
335335
// Note: HTTP endpoints are not yet implemented in Pyroscope (see pkg/api/api.go:204-205)
336-
// Uncomment when /v1/profiles endpoint is available
337-
// if err := ce.runProbe(ctx, "ingest-otlp-http-json", func(rCtx context.Context) error {
338-
// return ce.testIngestOTLPHttpJson(rCtx, now)
339-
// }); err != nil {
340-
// ingestErrors.Add(fmt.Errorf("error during OTLP HTTP/JSON ingestion: %w", err))
341-
// }
336+
if err := ce.runProbe(ctx, "ingest-otlp-http-json", func(rCtx context.Context) error {
337+
return ce.testIngestOTLPHttpJson(rCtx, now)
338+
}); err != nil {
339+
ingestErrors.Add(fmt.Errorf("error during OTLP HTTP/JSON ingestion: %w", err))
340+
}
342341

343342
// ingest via OTLP HTTP/Protobuf
344343
// Note: HTTP endpoints are not yet implemented in Pyroscope (see pkg/api/api.go:204-205)
345-
// Uncomment when /v1/profiles endpoint is available
346-
// if err := ce.runProbe(ctx, "ingest-otlp-http-protobuf", func(rCtx context.Context) error {
347-
// return ce.testIngestOTLPHttpProtobuf(rCtx, now)
348-
// }); err != nil {
349-
// ingestErrors.Add(fmt.Errorf("error during OTLP HTTP/Protobuf ingestion: %w", err))
350-
// }
344+
if err := ce.runProbe(ctx, "ingest-otlp-http-protobuf", func(rCtx context.Context) error {
345+
return ce.testIngestOTLPHttpProtobuf(rCtx, now)
346+
}); err != nil {
347+
ingestErrors.Add(fmt.Errorf("error during OTLP HTTP/Protobuf ingestion: %w", err))
348+
}
351349

352350
// Report ingestion errors if any
353351
if ingestErrors.Err() != nil {

cmd/profilecli/canary_exporter_probes.go

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
resourcev1 "go.opentelemetry.io/proto/otlp/resource/v1"
2626
"google.golang.org/grpc"
2727
"google.golang.org/grpc/credentials/insecure"
28+
"google.golang.org/protobuf/encoding/protojson"
29+
"google.golang.org/protobuf/proto"
2830

2931
googlev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
3032
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
@@ -215,6 +217,86 @@ func (ce *canaryExporter) testIngestOTLPGrpc(ctx context.Context, now time.Time)
215217
return nil
216218
}
217219

220+
func (ce *canaryExporter) testIngestOTLPHttpJson(ctx context.Context, now time.Time) error {
221+
// Generate the OTLP profile with the appropriate ingestion method label
222+
req := ce.generateOTLPProfile(now, "otlp/http/json")
223+
224+
// Marshal to JSON
225+
jsonData, err := protojson.Marshal(req)
226+
if err != nil {
227+
return fmt.Errorf("failed to marshal OTLP profile to JSON: %w", err)
228+
}
229+
230+
// Create HTTP request using the instrumented client
231+
url := ce.params.URL + "/v1development/profiles"
232+
httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(jsonData))
233+
if err != nil {
234+
return fmt.Errorf("failed to create HTTP request: %w", err)
235+
}
236+
httpReq.Header.Set("Content-Type", "application/json")
237+
238+
// Send the request using the instrumented client (ce.params.client is set by doTrace)
239+
resp, err := ce.params.client.Do(httpReq)
240+
if err != nil {
241+
return fmt.Errorf("failed to send HTTP request: %w", err)
242+
}
243+
defer resp.Body.Close()
244+
245+
// Read the body to ensure the transport is fully traced
246+
body, err := io.ReadAll(resp.Body)
247+
if err != nil {
248+
return fmt.Errorf("failed to read response body: %w", err)
249+
}
250+
251+
// Check response status
252+
if resp.StatusCode != http.StatusOK {
253+
return fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body))
254+
}
255+
256+
level.Info(logger).Log("msg", "successfully ingested OTLP profile via HTTP/JSON")
257+
return nil
258+
}
259+
260+
func (ce *canaryExporter) testIngestOTLPHttpProtobuf(ctx context.Context, now time.Time) error {
261+
// Generate the OTLP profile with the appropriate ingestion method label
262+
req := ce.generateOTLPProfile(now, "otlp/http/protobuf")
263+
264+
// Marshal to protobuf
265+
protoData, err := proto.Marshal(req)
266+
if err != nil {
267+
return fmt.Errorf("failed to marshal OTLP profile to protobuf: %w", err)
268+
}
269+
270+
// Create HTTP request using the instrumented client
271+
url := ce.params.URL + "/v1development/profiles"
272+
httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(protoData))
273+
if err != nil {
274+
return fmt.Errorf("failed to create HTTP request: %w", err)
275+
}
276+
httpReq.Header.Set("Content-Type", "application/x-protobuf")
277+
278+
// Send the request using the instrumented client (ce.params.client is set by doTrace)
279+
resp, err := ce.params.client.Do(httpReq)
280+
if err != nil {
281+
return fmt.Errorf("failed to send HTTP request: %w", err)
282+
}
283+
defer resp.Body.Close()
284+
285+
// Read the body to ensure the transport is fully traced
286+
body, err := io.ReadAll(resp.Body)
287+
if err != nil {
288+
return fmt.Errorf("failed to read response body: %w", err)
289+
}
290+
291+
// Check response status
292+
if resp.StatusCode != http.StatusOK {
293+
return fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body))
294+
}
295+
296+
level.Info(logger).Log("msg", "successfully ingested OTLP profile via HTTP/Protobuf")
297+
return nil
298+
}
299+
218300
func (ce *canaryExporter) testSelectMergeProfile(ctx context.Context, now time.Time) error {
219301
respQuery, err := ce.params.queryClient().SelectMergeProfile(ctx, connect.NewRequest(&querierv1.SelectMergeProfileRequest{
220302
Start: now.UnixMilli(),
@@ -295,8 +377,8 @@ func (ce *canaryExporter) testSelectMergeOTLPProfile(ctx context.Context, now ti
295377

296378
// Verify the expected stacktraces from the OTLP profile
297379
expected := map[string]int64{
298-
"func2>func1": 10,
299-
"func1": 20,
380+
"func2>func1": 30, // 10 samples from each of the 3 ingestion methods
381+
"func1": 60, // 20 samples * 3
300382
}
301383
actual := make(map[string]int64)
302384

pkg/api/api.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, limits *validation
201201
})
202202

203203
a.RegisterRoute("/opentelemetry.proto.collector.profiles.v1development.ProfilesService/Export", otlpHandler, writePathOpts...)
204-
// TODO(@petethepig): implement http/protobuf and http/json support
205-
// a.RegisterRoute("/v1/profiles", otlpHandler, true, true, "POST")
204+
a.RegisterRoute("/v1development/profiles", otlpHandler, writePathOpts...)
206205
}
207206

208207
// RegisterMemberlistKV registers the endpoints associated with the memberlist KV store.

pkg/ingester/otlp/ingest_handler.go

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package otlp
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"net/http"
78
"strings"
89

910
"google.golang.org/grpc"
1011
"google.golang.org/grpc/codes"
1112
"google.golang.org/grpc/keepalive"
13+
"google.golang.org/protobuf/encoding/protojson"
1214
"google.golang.org/protobuf/proto"
1315

1416
"github.com/go-kit/log"
@@ -61,10 +63,14 @@ func NewOTLPIngestHandler(cfg server.Config, svc PushService, l log.Logger, me b
6163
return
6264
}
6365

64-
// Handle HTTP requests (if we want to support HTTP/Protobuf in future)
65-
//if r.URL.Path == "/v1/profiles" {}
66+
// Handle HTTP/Protobuf and HTTP/Binary requests
67+
contentType := r.Header.Get("Content-Type")
68+
if contentType == "application/json" || contentType == "application/x-protobuf" || contentType == "application/protobuf" {
69+
h.handleHTTPProtobuf(w, r)
70+
return
71+
}
6672

67-
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
73+
http.Error(w, fmt.Sprintf("Unsupported Content-Type: %s", contentType), http.StatusUnsupportedMediaType)
6874
})
6975

7076
return h
@@ -102,6 +108,76 @@ func (h *ingestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
102108
h.handler.ServeHTTP(w, r)
103109
}
104110

111+
func (h *ingestHandler) handleHTTPProtobuf(w http.ResponseWriter, r *http.Request) {
112+
defer r.Body.Close()
113+
114+
// Read the request body - we need to read it all for protobuf unmarshaling
115+
// Note: Protobuf wire format requires reading the entire message to determine field boundaries
116+
body, err := io.ReadAll(r.Body)
117+
if err != nil {
118+
level.Error(h.log).Log("msg", "failed to read request body", "err", err)
119+
http.Error(w, "Failed to read request body", http.StatusBadRequest)
120+
return
121+
}
122+
123+
// Unmarshal the protobuf request
124+
req := &pprofileotlp.ExportProfilesServiceRequest{}
125+
126+
if r.Header.Get("Content-Type") == "application/json" {
127+
if err := protojson.Unmarshal(body, req); err != nil {
128+
level.Error(h.log).Log("msg", "failed to unmarshal JSON request", "err", err)
129+
http.Error(w, "Failed to parse JSON request", http.StatusBadRequest)
130+
return
131+
}
132+
} else {
133+
if err := proto.Unmarshal(body, req); err != nil {
134+
level.Error(h.log).Log("msg", "failed to unmarshal protobuf request", "err", err)
135+
http.Error(w, "Failed to parse protobuf request", http.StatusBadRequest)
136+
return
137+
}
138+
}
139+
140+
ctx := r.Context()
141+
// Process the request using the existing Export method
142+
// Injects multitenancy info into context if needed
143+
resp, err := h.Export(ctx, req)
144+
if err != nil {
145+
level.Error(h.log).Log("msg", "failed to process profiles", "err", err)
146+
// Convert gRPC status to HTTP status
147+
st, ok := status.FromError(err)
148+
if ok {
149+
switch st.Code() {
150+
case codes.InvalidArgument:
151+
http.Error(w, st.Message(), http.StatusBadRequest)
152+
case codes.Unauthenticated:
153+
http.Error(w, st.Message(), http.StatusUnauthorized)
154+
case codes.PermissionDenied:
155+
http.Error(w, st.Message(), http.StatusForbidden)
156+
default:
157+
http.Error(w, st.Message(), http.StatusInternalServerError)
158+
}
159+
} else {
160+
http.Error(w, err.Error(), http.StatusInternalServerError)
161+
}
162+
return
163+
}
164+
165+
// Marshal the response
166+
respBytes, err := proto.Marshal(resp)
167+
if err != nil {
168+
level.Error(h.log).Log("msg", "failed to marshal response", "err", err)
169+
http.Error(w, "Failed to marshal response", http.StatusInternalServerError)
170+
return
171+
}
172+
173+
// Write the response
174+
w.Header().Set("Content-Type", "application/x-protobuf")
175+
w.WriteHeader(http.StatusOK)
176+
if _, err := w.Write(respBytes); err != nil {
177+
level.Error(h.log).Log("msg", "failed to write response", "err", err)
178+
}
179+
}
180+
105181
func (h *ingestHandler) Export(ctx context.Context, er *pprofileotlp.ExportProfilesServiceRequest) (*pprofileotlp.ExportProfilesServiceResponse, error) {
106182
// TODO: @petethepig This logic is copied from util.AuthenticateUser and should be refactored into a common function
107183
// Extracts user ID from the request metadata and returns and injects the user ID in the context

0 commit comments

Comments
 (0)