Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0ceaea7
initial commit
lalitb Jan 7, 2026
496718c
add shared concurrency, and nit fixes
lalitb Jan 12, 2026
d3e0f50
fix
lalitb Jan 12, 2026
f3f98de
lint/clippy warnings
lalitb Jan 12, 2026
10b0373
Merge branch 'main' into grpc-http-receiver
lalitb Jan 12, 2026
6557831
doc lint
lalitb Jan 12, 2026
9a52222
Merge branch 'grpc-http-receiver' of github.com:lalitb/otel-arrow int…
lalitb Jan 12, 2026
260e6ae
add cancellation of in-flight during shutdown
lalitb Jan 14, 2026
f50e392
markdown lint
lalitb Jan 14, 2026
1e8519f
update doc to consistent with code
lalitb Jan 14, 2026
71e70ce
Merge branch 'main' into grpc-http-receiver
lalitb Jan 14, 2026
88562fa
Merge branch 'main' into grpc-http-receiver
lalitb Jan 14, 2026
f35c574
reorganise doc
lalitb Jan 14, 2026
ba15f59
Merge branch 'grpc-http-receiver' of github.com:lalitb/otel-arrow int…
lalitb Jan 14, 2026
854245f
Merge branch 'main' into grpc-http-receiver
lalitb Jan 14, 2026
6d7bc30
Merge branch 'main' into grpc-http-receiver
lalitb Jan 15, 2026
5a1c10a
Merge branch 'main' into grpc-http-receiver
lalitb Jan 15, 2026
54a592b
restore gRPC backpressure
lalitb Jan 15, 2026
3fc39fe
fix doc for backpressure
lalitb Jan 15, 2026
c08ee6e
Merge branch 'main' into grpc-http-receiver
lalitb Jan 16, 2026
3ef6cc9
Replaced the stale MetricsRegistryHandle usages with TelemetryRegistr…
lalitb Jan 16, 2026
d9c1fdf
Merge branch 'main' into grpc-http-receiver
lalitb Jan 16, 2026
704a7b7
Merge branch 'main' into grpc-http-receiver
lalitb Jan 20, 2026
a74680b
Use explicit drop() for spawned task handles to clarify intent
lalitb Jan 20, 2026
e7f02e2
Merge branch 'grpc-http-receiver' of github.com:lalitb/otel-arrow int…
lalitb Jan 20, 2026
cc09b34
Merge branch 'main' into grpc-http-receiver
lalitb Jan 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pretty_assertions = "1.4.1"
proc-macro2 = "1.0"
prometheus = "0.14.0"
prost = "0.14"
prost-types = "0.14"
quote = "1.0"
rand = "0.9.2"
rcgen = "0.14"
Expand All @@ -143,6 +144,7 @@ smallvec = { version = "1.15" , features = ["union"] }
socket2 = { version = "0.6.1", features = ["all"] }
ipnet = "2.11.0"
syn = { version = "2.0", features = ["full", "extra-traits"] }
zstd = "0.13"
sysinfo = "0.37"
tempfile = "3"
thiserror = "2.0.17"
Expand All @@ -166,6 +168,8 @@ tonic-prost = "0.14"
tower = "0.5.2"
tower-service = "0.3"
hyper-util = "0.1.17"
hyper = { version = "1", features = ["http1", "server", "client"] }
http-body-util = "0.1"
trybuild = "1.0"
unsync = "0.1.2"
url = "2.5.7"
Expand Down
8 changes: 8 additions & 0 deletions rust/otap-dataflow/configs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ A basic OTLP pipeline configuration:
- Receives OTLP traffic on `127.0.0.1:4317`
- Exports OTLP traffic to `http://127.0.0.1:1235`

### `otlp-http-otlp.yaml`

OTLP receiver over both protocols:

- Receives OTLP/gRPC on `127.0.0.1:4317`
- Receives OTLP/HTTP on `127.0.0.1:4318`
- Exports OTLP/gRPC traffic to `http://127.0.0.1:4319`

### `otlp-perf.yaml`

OTLP receiver with performance metrics:
Expand Down
45 changes: 45 additions & 0 deletions rust/otap-dataflow/configs/otlp-http-otlp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
settings:
default_pipeline_ctrl_msg_channel_size: 100
default_node_ctrl_msg_channel_size: 100
default_pdata_channel_size: 100

nodes:
receiver:
kind: receiver
plugin_urn: "urn:otel:otlp:receiver"
out_ports:
out_port:
destinations:
- exporter
dispatch_strategy: round_robin
config:
# OTLP/gRPC receiver (default port: 4317)
listening_addr: "127.0.0.1:4317"

# OTLP/HTTP receiver (default port: 4318)
http:
listening_addr: "127.0.0.1:4318"
# wait_for_result: false
# max_concurrent_requests: 0
# max_request_body_size: "4MiB"
# accept_compressed_requests: true

# timeout: "30s" # Optional: timeout for gRPC requests

# TLS / mTLS (receiver-side; gRPC only today)
# tls:
# cert_file: "/path/to/server.crt" # or `cert_pem: | ...`
# key_file: "/path/to/server.key" # or `key_pem: | ...`
# client_ca_file: "/path/to/client-ca.crt" # or `client_ca_pem: | ...`
# include_system_ca_certs_pool: false
# handshake_timeout: "10s"
# reload_interval: "5m"

exporter:
kind: exporter
plugin_urn: "urn:otel:otlp:exporter"
config:
# Downstream OTLP/gRPC collector endpoint.
# Uses a different port than this receiver to avoid accidental loops.
grpc_endpoint: "http://127.0.0.1:4319"
# timeout: "15s"
9 changes: 7 additions & 2 deletions rust/otap-dataflow/crates/otap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ tonic = { workspace = true }
tonic-middleware = { workspace = true }
tonic-prost = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
smallvec = { workspace = true }
bitflags = { workspace = true }
bytes = { workspace = true }
Expand All @@ -58,6 +59,7 @@ tower-service.workspace = true
fluke-hpack.workspace = true
serde.workspace = true
tokio-stream.workspace = true
tokio-util = { workspace = true, features = ["rt"] }
async-stream.workspace = true
humantime.workspace = true
weaver_semconv.workspace = true
Expand All @@ -67,8 +69,12 @@ rand.workspace = true
zip.workspace = true
tower = { workspace = true }
hyper-util = { workspace = true }
hyper = { workspace = true }
http-body-util = { workspace = true }
socket2 = { workspace = true }
ipnet = { workspace = true }
flate2 = { workspace = true }
zstd = { workspace = true }

# Geneva exporter dependencies
geneva-uploader = { version = "0.4.0", optional = true }
Expand All @@ -79,7 +85,6 @@ data_engine_recordset = { workspace = true, optional = true }

# Azure Monitor Exporter dependencies
azure_identity = { workspace = true, optional = true }
flate2 = { workspace = true, optional = true }
reqwest = { workspace = true, optional = true, features = ["rustls"] }
azure_core = { workspace = true, optional = true, features = ["reqwest"] }
ahash = { workspace = true, optional = true }
Expand All @@ -103,7 +108,6 @@ azure-monitor-exporter = [
"experimental-exporters",
"dep:opentelemetry-proto",
"dep:azure_identity",
"dep:flate2",
"dep:reqwest",
"dep:azure_core",
"dep:ahash",
Expand Down Expand Up @@ -131,3 +135,4 @@ wiremock.workspace = true
rustls.workspace = true
tokio-rustls.workspace = true
rustls-pki-types.workspace = true
tokio-stream.workspace = true
8 changes: 8 additions & 0 deletions rust/otap-dataflow/crates/otap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,17 @@ pub mod compression;

pub mod metrics;

pub(crate) mod socket_options;

/// Shared concurrency limiting across protocol servers
pub(crate) mod shared_concurrency;

/// gRPC service implementation
pub mod otlp_grpc;

/// OTLP/HTTP receiver support.
pub mod otlp_http;

/// Cloud specific auth utilities
pub mod cloud_auth;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,11 @@ impl ServerCommon {
effect_handler: EffectHandler<OtapPdata>,
settings: &OtlpServerSettings,
metrics: Arc<Mutex<MetricSet<OtlpReceiverMetrics>>>,
state: Option<AckSlot>,
) -> Self {
Self {
effect_handler,
state: settings
.wait_for_result
.then(|| AckSlot::new(settings.max_concurrent_requests)),
state,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refactor just moved the AckSlot construction out of ServerCommon::new into the receiver so HTTP and gRPC can share the same slot pool when HTTP wait-for-result is enabled.

settings: settings.clone(),
metrics,
}
Expand All @@ -450,9 +449,10 @@ impl LogsServiceServer {
effect_handler: EffectHandler<OtapPdata>,
settings: &OtlpServerSettings,
metrics: Arc<Mutex<MetricSet<OtlpReceiverMetrics>>>,
state: Option<AckSlot>,
) -> Self {
Self {
common: ServerCommon::new(effect_handler, settings, metrics),
common: ServerCommon::new(effect_handler, settings, metrics, state),
}
}
}
Expand Down Expand Up @@ -505,9 +505,10 @@ impl MetricsServiceServer {
effect_handler: EffectHandler<OtapPdata>,
settings: &OtlpServerSettings,
metrics: Arc<Mutex<MetricSet<OtlpReceiverMetrics>>>,
state: Option<AckSlot>,
) -> Self {
Self {
common: ServerCommon::new(effect_handler, settings, metrics),
common: ServerCommon::new(effect_handler, settings, metrics, state),
}
}
}
Expand Down Expand Up @@ -560,9 +561,10 @@ impl TraceServiceServer {
effect_handler: EffectHandler<OtapPdata>,
settings: &OtlpServerSettings,
metrics: Arc<Mutex<MetricSet<OtlpReceiverMetrics>>>,
state: Option<AckSlot>,
) -> Self {
Self {
common: ServerCommon::new(effect_handler, settings, metrics),
common: ServerCommon::new(effect_handler, settings, metrics, state),
}
}
}
Expand Down
48 changes: 8 additions & 40 deletions rust/otap-dataflow/crates/otap/src/otap_grpc/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
//!
//! The implementation uses HTTP CONNECT method to establish tunnels through proxies.

use crate::socket_options;
use base64::Engine;
use base64::prelude::*;
use http::Uri;
use ipnet::IpNet;
use serde::Deserialize;
use socket2::{Socket, TcpKeepalive};
use std::borrow::Cow;
use std::env;
use std::io;
Expand Down Expand Up @@ -575,52 +575,20 @@ async fn http_connect_tunnel_on_stream(
}

/// Applies TCP socket options (nodelay, keepalive) to a stream.
///
/// This function performs a series of conversions (tokio -> std -> socket2 -> std -> tokio)
/// to apply socket options that are not directly exposed by tokio's TcpStream.
/// Specifically, `socket2` is required to set detailed keepalive parameters (interval, retries).
fn apply_socket_options(
stream: TcpStream,
tcp_nodelay: bool,
tcp_keepalive: Option<Duration>,
tcp_keepalive_interval: Option<Duration>,
tcp_keepalive_retries: Option<u32>,
) -> io::Result<TcpStream> {
// Convert tokio TcpStream to std TcpStream, then to Socket
stream.set_nodelay(tcp_nodelay)?;

let std_stream = stream.into_std()?;
let socket: Socket = std_stream.into();

// Apply TCP keepalive settings
if let Some(keepalive_time) = tcp_keepalive {
let mut keepalive = TcpKeepalive::new().with_time(keepalive_time);

if let Some(interval) = tcp_keepalive_interval {
keepalive = keepalive.with_interval(interval);
}

#[cfg(not(target_os = "windows"))]
if let Some(retries) = tcp_keepalive_retries {
keepalive = keepalive.with_retries(retries);
}

#[cfg(target_os = "windows")]
if tcp_keepalive_retries.is_some() {
otap_df_telemetry::otel_warn!(
"Proxy.KeepaliveRetriesIgnored",
platform = "windows",
message = "tcp_keepalive_retries is configured but ignored on Windows: TcpKeepalive::with_retries is not available on this platform"
);
}

socket.set_tcp_keepalive(&keepalive)?;
}

// Convert back to std TcpStream, then to tokio TcpStream
let std_stream: std::net::TcpStream = socket.into();
std_stream.set_nonblocking(true)?;
TcpStream::from_std(std_stream)
socket_options::apply_socket_options(
Copy link
Member Author

@lalitb lalitb Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

socket tuning is now centralized in socket_options.rs because both the proxy and the OTLP/HTTP server need the same keepalive/nodelay configuration (tokio -> std -> socket2 -> std -> tokio dance). This keeps the settings consistent across listeners and avoids duplicating the conversion code.

stream,
tcp_nodelay,
tcp_keepalive,
tcp_keepalive_interval,
tcp_keepalive_retries,
)
}

/// Establishes a TCP connection to a target, optionally through an HTTP CONNECT proxy.
Expand Down
Loading
Loading