-
Notifications
You must be signed in to change notification settings - Fork 67
Add support for OTLP/HTTP Receiver #1765
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1765 +/- ##
==========================================
- Coverage 84.40% 84.38% -0.02%
==========================================
Files 496 499 +3
Lines 145393 147090 +1697
==========================================
+ Hits 122716 124124 +1408
- Misses 22143 22432 +289
Partials 534 534
🚀 New features to boost your workflow:
|
|
@lalitb It's really great to have built-in HTTP support at the OTLP receiver level. I'm waiting for a more detailed description of the PR and the approach taken before doing a deeper review. Thanks by advance. |
Thanks @lquerel - This is still a draft while I finish benchmarks and a final review. I’ll add a detailed PR description and update the README with a summary of the changes shortly. |
| state: settings | ||
| .wait_for_result | ||
| .then(|| AckSlot::new(settings.max_concurrent_requests)), | ||
| state, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The refactor just moved the AckSlot construction out of ServerCommon::new into the receiver so HTTP and gRPC can share the same slot pool when HTTP wait-for-result is enabled.
| let std_stream: std::net::TcpStream = socket.into(); | ||
| std_stream.set_nonblocking(true)?; | ||
| TcpStream::from_std(std_stream) | ||
| socket_options::apply_socket_options( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
socket tuning is now centralized in socket_options.rs because both the proxy and the OTLP/HTTP server need the same keepalive/nodelay configuration (tokio -> std -> socket2 -> std -> tokio dance). This keeps the settings consistent across listeners and avoids duplicating the conversion code.
| { | ||
| if let Some(acceptor) = maybe_tls_acceptor.clone() { | ||
| let shutdown = shutdown.clone(); | ||
| let _ = tracker.spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This uses TaskTracker/tokio::spawn, which forces these per-connection handlers to be Send. If we want !Send HTTP handlers, serve (the HTTP server function) would need to run on a LocalSet and use spawn_local with a local tracker for draining. Note that serve is currently treated as a Send future and shares Arc-backed metrics/semaphore with the tonic gRPC path (Send + Sync), so a !Send HTTP path would also require decoupling that shared state. This is documented in the otlp_receiver.md too.
| } | ||
|
|
||
| let shutdown = shutdown.clone(); | ||
| let _ = tracker.spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comment.
@lquerel - Have updated the PR description, and also |
lquerel
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not finished the review yet, but there is one point that is bothering me and that I would like to discuss before continuing. Maybe I do not yet have the full picture, so please take the following with a grain of salt.
If I understand correctly, when HTTP is enabled, gRPC switches from GlobalConcurrencyLimitLayer to SharedConcurrencyLayer. The difference lies in where the semaphore is enforced:
GlobalConcurrencyLimitLayergates inpoll_ready. When at capacity, the service is not ready, so HTTP/2 stops accepting new streams and applies backpressure. This effectively bounds the number of in-flight requests to the configured limit.SharedConcurrencyLayerforwardspoll_readydirectly to the inner service and only waits on the semaphore insidecall. This means the server can accept an unbounded number of new streams and spawn futures that then sit parked waiting for a permit. Those parked futures still own decoded request payloads and metadata, so memory usage grows with the number of queued requests.
Is my analysis accurate? If so, I think this is a real problem, because it means memory is effectively unbounded, which is something we want to avoid as much as possible. There is no fixed cap on the number of pending requests once the semaphore is saturated. With a default 4 MiB max message size, even a few thousand queued requests could turn into multiple gigabytes of memory.
I think we need to find a way to reintroduce backpressure at the poll_ready level while still using your shared semaphore.
To avoid any OOM risk, a possible fix would be to reintroduce backpressure in poll_ready while still sharing the semaphore. For example, SharedConcurrencyLayer could acquire a permit in poll_ready like GlobalConcurrencyLimitLayer does and stash it for call, so that a not-ready state propagates back to tonic and limits stream acceptance.
@lquerel - Thanks for flagging this. You're right: with the shared layer acquiring the permit inside call, gRPC can accept an unbounded number of HTTP/2 streams that park waiting for permits, each holding its request payload in memory. I'll fix this as you suggested. On the HTTP side, we acquire the permit before collecting the body, with a timeout (default 5s or |
lquerel
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing the unbounded memory issue.
fixes: #1144
Summary
Adds HTTP/1.1 support for the OTLP receiver alongside the existing gRPC server. HTTP is disabled by default—existing gRPC-only deployments are unaffected.
Key Design Decisions
Arc<Semaphore>per pipeline instance ((i.e., per core in thread-per-core deployments, not cross-core) so total in-flight requests respect downstream channel capacity regardless of protocol mix.Why: The downstream bounded channel is the true bottleneck. If gRPC and HTTP had separate limits, a burst on one protocol could still overwhelm the shared channel. Sharing ensures backpressure is applied uniformly. Preserving
GlobalConcurrencyLimitLayerfor gRPC-only avoids introducing Arc overhead for deployments that don't need HTTP.Why: Consistency and reduced maintenance. Operators see one set of metrics regardless of protocol; ACK/NACK behavior is identical; socket tuning is centralized in the engine.
Lazy Decode (Zero-Copy Path)
OtlpProtoByteswithout deserialization, matching gRPC's lazy-decode strategySend Bounds Trade-off
Arc<Mutex<...>>state with gRPC for metrics (OtlpReceiverMetrics) and ACK slots (AckRegistry) - these are necessarily Arc-wrapped because tonic's service handlers require Send + SyncHTTP uses
tokio::spawn(notspawn_local) andArcrather thanRc.Why: Duplicating metrics/ACK state for a
!SendHTTP path would add complexity and divergence. Since tonic already forces Send on the gRPC side, sharing state via Arc is the pragmatic choice. The atomic overhead is acceptable given the I/O-bound workload.semaphore.acquire_owned()with a timeoutGlobalConcurrencyLimitLayerwhich rejects immediately at poll_readyWhy: HTTP doesn't have Tower middleware, so we use a raw semaphore. The timeout allows brief queuing during bursts (fairer than immediate rejection) while still bounding wait time. Immediate rejection would cause more client retries and load amplification.
http_body_util::Limitedto enforcemax_request_body_sizeduring body collectionWhy: HTTP/1.1 requires buffering the full body before processing (unlike gRPC streaming). Without limits, a malicious client could exhaust memory. Dual enforcement (wire + decompressed) defends against both large payloads and zip bombs where a small compressed payload expands to gigabytes.
Why: JSON would require deserialization in the receiver, breaking the zero-copy strategy. Protobuf is the canonical OTLP format and what most SDKs use. JSON support can be added as an opt-in path later if there's demand.
Key Changes
Documentation
docs/otlp-receiver.mdConfiguration Example:
Limitations
!SendHTTP path would require separate state and isn't planned here.