From 2f41db5bd3b20a329793fba1934fed103ff346e8 Mon Sep 17 00:00:00 2001 From: Bryn Cooke Date: Sat, 22 Jul 2023 13:44:14 +0100 Subject: [PATCH 1/7] Add compression support for otlp tonic --- opentelemetry-otlp/Cargo.toml | 1 + opentelemetry-otlp/src/exporter/grpcio.rs | 9 +----- opentelemetry-otlp/src/exporter/mod.rs | 27 ++++++++++++++++- opentelemetry-otlp/src/exporter/tonic.rs | 36 +++++++++++++++++++++++ opentelemetry-otlp/src/lib.rs | 36 +++++++++++++++++++---- opentelemetry-otlp/src/logs.rs | 13 ++++++-- opentelemetry-otlp/src/metric.rs | 21 +++++++++---- opentelemetry-otlp/src/span.rs | 15 +++++++++- opentelemetry-otlp/tests/smoke.rs | 13 ++++++++ 9 files changed, 148 insertions(+), 23 deletions(-) diff --git a/opentelemetry-otlp/Cargo.toml b/opentelemetry-otlp/Cargo.toml index 0d580ae2bf..6bd18d9eef 100644 --- a/opentelemetry-otlp/Cargo.toml +++ b/opentelemetry-otlp/Cargo.toml @@ -70,6 +70,7 @@ default = ["grpc-tonic", "trace"] # grpc using tonic grpc-tonic = ["tonic", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic"] +gzip-tonic = ["tonic/gzip"] tls = ["tonic/tls"] tls-roots = ["tls", "tonic/tls-roots"] diff --git a/opentelemetry-otlp/src/exporter/grpcio.rs b/opentelemetry-otlp/src/exporter/grpcio.rs index fd3e6a24f4..70a389b71d 100644 --- a/opentelemetry-otlp/src/exporter/grpcio.rs +++ b/opentelemetry-otlp/src/exporter/grpcio.rs @@ -1,3 +1,4 @@ +use crate::exporter::Compression; use crate::ExportConfig; #[cfg(feature = "serialize")] use serde::{Deserialize, Serialize}; @@ -47,14 +48,6 @@ pub struct Credentials { pub key: String, } -/// The compression algorithm to use when sending data. -#[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))] -#[derive(Clone, Copy, Debug)] -pub enum Compression { - /// Compresses data using gzip. - Gzip, -} - impl From for grpcio::CompressionAlgorithms { fn from(compression: Compression) -> Self { match compression { diff --git a/opentelemetry-otlp/src/exporter/mod.rs b/opentelemetry-otlp/src/exporter/mod.rs index 1d03ebc6d0..8baca5986a 100644 --- a/opentelemetry-otlp/src/exporter/mod.rs +++ b/opentelemetry-otlp/src/exporter/mod.rs @@ -8,7 +8,9 @@ use crate::exporter::grpcio::GrpcioExporterBuilder; use crate::exporter::http::HttpExporterBuilder; #[cfg(feature = "grpc-tonic")] use crate::exporter::tonic::TonicExporterBuilder; -use crate::Protocol; +use crate::{Error, Protocol}; +#[cfg(feature = "serialize")] +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::str::FromStr; use std::time::Duration; @@ -21,6 +23,8 @@ pub const OTEL_EXPORTER_OTLP_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_ENDPOINT"; pub const OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT: &str = OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT; /// Protocol the exporter will use. Either `http/protobuf` or `grpc`. pub const OTEL_EXPORTER_OTLP_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_PROTOCOL"; +/// Compression algorithm to use, defaults to none. +pub const OTEL_EXPORTER_OTLP_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_COMPRESSION"; #[cfg(feature = "http-proto")] /// Default protocol, using http-proto. @@ -79,6 +83,27 @@ impl Default for ExportConfig { } } +/// The compression algorithm to use when sending data. +#[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum Compression { + /// Compresses data using gzip. + #[cfg(any(feature = "gzip-tonic", feature = "grpc-sys"))] + Gzip, +} + +impl FromStr for Compression { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + #[cfg(any(feature = "gzip-tonic", feature = "grpc-sys"))] + "gzip" => Ok(Compression::Gzip), + _ => Err(Error::UnsupportedCompressionAlgorithm(s.to_string())), + } + } +} + /// default protocol based on enabled features fn default_protocol() -> Protocol { match OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT { diff --git a/opentelemetry-otlp/src/exporter/tonic.rs b/opentelemetry-otlp/src/exporter/tonic.rs index 1cf50a4a5c..f93a4b190b 100644 --- a/opentelemetry-otlp/src/exporter/tonic.rs +++ b/opentelemetry-otlp/src/exporter/tonic.rs @@ -1,3 +1,4 @@ +use crate::exporter::Compression; use crate::ExportConfig; use std::fmt::{Debug, Formatter}; use tonic::metadata::MetadataMap; @@ -18,6 +19,22 @@ pub struct TonicConfig { /// TLS settings for the collector endpoint. #[cfg(feature = "tls")] pub tls_config: Option, + + /// The compression algorithm to use when communicating with the collector. + pub compression: Option, +} + +impl From for tonic::codec::CompressionEncoding { + fn from(compression: Compression) -> Self { + match compression { + #[cfg(feature = "gzip-tonic")] + Compression::Gzip => tonic::codec::CompressionEncoding::Gzip, + #[cfg(not(feature = "gzip-tonic"))] + Compression::Gzip => panic!( + "gzip compression is not enabled, add the tonic feature 'gzip' to your project" + ), + } + } } /// Build a trace exporter that uses [tonic] as grpc layer and opentelemetry protocol. @@ -60,6 +77,7 @@ impl Default for TonicExporterBuilder { )), #[cfg(feature = "tls")] tls_config: None, + compression: None, }; TonicExporterBuilder { @@ -94,6 +112,12 @@ impl TonicExporterBuilder { self } + /// Set the compression algorithm to use when communicating with the collector. + pub fn with_compression(mut self, compression: Compression) -> Self { + self.tonic_config.compression = Some(compression); + self + } + /// Use `channel` as tonic's transport channel. /// this will override tls config and should only be used /// when working with non-HTTP transports. @@ -119,6 +143,8 @@ impl TonicExporterBuilder { #[cfg(test)] mod tests { + #[cfg(feature = "gzip-tonic")] + use crate::exporter::Compression; use crate::TonicExporterBuilder; use tonic::metadata::{MetadataMap, MetadataValue}; @@ -151,4 +177,14 @@ mod tests { .len() ); } + + #[test] + #[cfg(feature = "gzip-tonic")] + fn test_with_compression() { + // metadata should merge with the current one with priority instead of just replacing it + let mut metadata = MetadataMap::new(); + metadata.insert("foo", "bar".parse().unwrap()); + let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip); + assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip); + } } diff --git a/opentelemetry-otlp/src/lib.rs b/opentelemetry-otlp/src/lib.rs index a70d565239..3a13853324 100644 --- a/opentelemetry-otlp/src/lib.rs +++ b/opentelemetry-otlp/src/lib.rs @@ -189,24 +189,28 @@ mod metric; mod span; mod transform; +pub use crate::exporter::Compression; pub use crate::exporter::ExportConfig; #[cfg(feature = "trace")] pub use crate::span::{ - OtlpTracePipeline, SpanExporter, SpanExporterBuilder, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, - OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, + OtlpTracePipeline, SpanExporter, SpanExporterBuilder, OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, }; #[cfg(feature = "metrics")] pub use crate::metric::{ MetricsExporter, MetricsExporterBuilder, OtlpMetricPipeline, - OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, + OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, + OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, }; #[cfg(feature = "logs")] -pub use crate::logs::*; +pub use crate::logs::{ + LogExporter, LogExporterBuilder, OtlpLogPipeline, OTEL_EXPORTER_OTLP_LOGS_COMPRESSION, +}; pub use crate::exporter::{ - HasExportConfig, WithExportConfig, OTEL_EXPORTER_OTLP_ENDPOINT, + HasExportConfig, WithExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT, OTEL_EXPORTER_OTLP_PROTOCOL, OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT, OTEL_EXPORTER_OTLP_TIMEOUT, OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT, @@ -217,7 +221,7 @@ use opentelemetry_sdk::export::ExportError; use std::time::{Duration, SystemTime, UNIX_EPOCH}; #[cfg(feature = "grpc-sys")] -pub use crate::exporter::grpcio::{Compression, Credentials, GrpcioConfig, GrpcioExporterBuilder}; +pub use crate::exporter::grpcio::{Credentials, GrpcioConfig, GrpcioExporterBuilder}; #[cfg(feature = "http-proto")] pub use crate::exporter::http::HttpExporterBuilder; #[cfg(feature = "grpc-tonic")] @@ -347,6 +351,10 @@ pub enum Error { /// The pipeline will need a exporter to complete setup. Throw this error if none is provided. #[error("no exporter builder is provided, please provide one using with_exporter() method")] NoExporterBuilder, + + /// Unsupported compression algorithm. + #[error("unsupported compression algorithm '{0}'")] + UnsupportedCompressionAlgorithm(String), } #[cfg(feature = "grpc-tonic")] @@ -389,3 +397,19 @@ pub(crate) fn to_nanos(time: SystemTime) -> u64 { .unwrap_or_else(|_| Duration::from_secs(0)) .as_nanos() as u64 } + +pub(crate) fn resolve_compression( + tonic_config: &TonicConfig, + env_override: &'static str, +) -> Result, Error> { + tonic_config + .compression + .map(Ok) + .or_else(|| { + std::env::var(OTEL_EXPORTER_OTLP_COMPRESSION) + .or_else(|_| std::env::var(env_override)) + .ok() + .map(|v| v.parse()) + }) + .transpose() +} diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index d73223a0b5..4bc5bbaeb5 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -46,7 +46,7 @@ use { use std::{collections::HashMap, sync::Arc}; use crate::exporter::ExportConfig; -use crate::OtlpPipeline; +use crate::{resolve_compression, OtlpPipeline}; use async_trait::async_trait; use std::{ borrow::Cow, @@ -57,6 +57,9 @@ use std::{ use opentelemetry_api::logs::{LogError, LoggerProvider}; use opentelemetry_sdk::{self, export::logs::LogData, logs::BatchMessage, runtime::RuntimeChannel}; +/// Compression algorithm to use, defaults to none. +pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION"; + impl OtlpPipeline { /// Create a OTLP logging pipeline. pub fn logging(self) -> OtlpLogPipeline { @@ -232,10 +235,16 @@ impl LogExporter { tonic_config: TonicConfig, channel: tonic::transport::Channel, ) -> Result { + let mut log_exporter = TonicLogsServiceClient::new(channel); + if let Some(compression) = + resolve_compression(&tonic_config, OTEL_EXPORTER_OTLP_LOGS_COMPRESSION)? + { + log_exporter = log_exporter.send_compressed(compression.into()); + } Ok(LogExporter::Tonic { timeout: config.timeout, metadata: tonic_config.metadata, - log_exporter: TonicLogsServiceClient::new(channel), + log_exporter, }) } diff --git a/opentelemetry-otlp/src/metric.rs b/opentelemetry-otlp/src/metric.rs index 76b457d769..41732cda1c 100644 --- a/opentelemetry-otlp/src/metric.rs +++ b/opentelemetry-otlp/src/metric.rs @@ -6,13 +6,14 @@ use crate::exporter::tonic::TonicExporterBuilder; use crate::transform::sink; -use crate::{Error, OtlpPipeline}; +use crate::{resolve_compression, Error, OtlpPipeline}; use async_trait::async_trait; use core::fmt; use opentelemetry_api::{ global, metrics::{MetricsError, Result}, }; +use opentelemetry_http::Bytes; #[cfg(feature = "grpc-tonic")] use opentelemetry_proto::tonic::collector::metrics::v1::{ metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest, @@ -36,6 +37,7 @@ use std::str::FromStr; use std::sync::Mutex; use std::time; use std::time::Duration; +use tonic::codegen::{Body, StdError}; use tonic::metadata::KeyAndValueRef; #[cfg(feature = "grpc-tonic")] use tonic::transport::Channel; @@ -61,7 +63,8 @@ use { pub const OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"; /// Max waiting time for the backend to process each metrics batch, defaults to 10s. pub const OTEL_EXPORTER_OTLP_METRICS_TIMEOUT: &str = "OTEL_EXPORTER_OTLP_METRICS_TIMEOUT"; - +/// Compression algorithm to use, defaults to none. +pub const OTEL_EXPORTER_OTLP_METRICS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_METRICS_COMPRESSION"; impl OtlpPipeline { /// Create a OTLP metrics pipeline. pub fn metrics(self, rt: RT) -> OtlpMetricPipeline @@ -356,6 +359,8 @@ impl MetricsExporter { }, Err(_) => config.timeout, }; + let compression = + resolve_compression(&tonic_config, OTEL_EXPORTER_OTLP_METRICS_COMPRESSION)?; let endpoint = Channel::from_shared(endpoint).map_err::(Into::into)?; @@ -376,11 +381,18 @@ impl MetricsExporter { tokio::spawn(async move { match export_builder.interceptor { Some(interceptor) => { - let client = MetricsServiceClient::with_interceptor(channel, interceptor); + let mut client = MetricsServiceClient::with_interceptor(channel, interceptor); + if let Some(compression) = compression { + client = client.send_compressed(compression.into()); + } + export_sink(client, receiver).await } None => { - let client = MetricsServiceClient::new(channel); + let mut client = MetricsServiceClient::new(channel); + if let Some(compression) = compression { + client = client.send_compressed(compression.into()) + } export_sink(client, receiver).await } } @@ -465,7 +477,6 @@ async fn http_send_request( Ok(()) } -use tonic::codegen::{Body, Bytes, StdError}; async fn export_sink( mut client: MetricsServiceClient, mut receiver: tokio::sync::mpsc::Receiver, diff --git a/opentelemetry-otlp/src/span.rs b/opentelemetry-otlp/src/span.rs index 8b6870f3b0..f09bd706e1 100644 --- a/opentelemetry-otlp/src/span.rs +++ b/opentelemetry-otlp/src/span.rs @@ -51,6 +51,8 @@ use { use {std::collections::HashMap, std::sync::Arc}; use crate::exporter::ExportConfig; +#[cfg(feature = "gzip-tonic")] +use crate::resolve_compression; use crate::OtlpPipeline; use opentelemetry_api::{ @@ -74,6 +76,8 @@ use sdk::runtime::RuntimeChannel; pub const OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"; /// Max waiting time for the backend to process each spans batch, defaults to 10s. pub const OTEL_EXPORTER_OTLP_TRACES_TIMEOUT: &str = "OTEL_EXPORTER_OTLP_TRACES_TIMEOUT"; +/// Compression algorithm to use, defaults to none. +pub const OTEL_EXPORTER_OTLP_TRACES_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_TRACES_COMPRESSION"; impl OtlpPipeline { /// Create a OTLP tracing pipeline. @@ -386,10 +390,19 @@ impl SpanExporter { tonic_config: TonicConfig, channel: tonic::transport::Channel, ) -> Result { + #[allow(unused_mut)] + let mut trace_exporter = TonicTraceServiceClient::new(channel); + #[cfg(feature = "gzip-tonic")] + if let Some(compression) = + resolve_compression(&tonic_config, OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)? + { + trace_exporter = trace_exporter.send_compressed(compression.into()) + } + Ok(SpanExporter::Tonic { timeout: config.timeout, metadata: tonic_config.metadata, - trace_exporter: TonicTraceServiceClient::new(channel), + trace_exporter, }) } diff --git a/opentelemetry-otlp/tests/smoke.rs b/opentelemetry-otlp/tests/smoke.rs index d7e6c5a32d..278d61df0b 100644 --- a/opentelemetry-otlp/tests/smoke.rs +++ b/opentelemetry-otlp/tests/smoke.rs @@ -9,6 +9,8 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ use std::{net::SocketAddr, sync::Mutex}; use tokio::sync::mpsc; use tokio_stream::wrappers::TcpListenerStream; +#[cfg(feature = "gzip-tonic")] +use tonic::codec::CompressionEncoding; struct MockServer { tx: Mutex>, @@ -57,6 +59,10 @@ async fn setup() -> (SocketAddr, mpsc::Receiver) { }); let (req_tx, req_rx) = mpsc::channel(10); + #[cfg(feature = "gzip-tonic")] + let service = TraceServiceServer::new(MockServer::new(req_tx)) + .accept_compressed(CompressionEncoding::Gzip); + #[cfg(not(feature = "gzip-tonic"))] let service = TraceServiceServer::new(MockServer::new(req_tx)); tokio::task::spawn(async move { tonic::transport::Server::builder() @@ -80,6 +86,13 @@ async fn smoke_tracer() { let tracer = opentelemetry_otlp::new_pipeline() .tracing() .with_exporter( + #[cfg(feature = "gzip-tonic")] + opentelemetry_otlp::new_exporter() + .tonic() + .with_compression(opentelemetry_otlp::Compression::Gzip) + .with_endpoint(format!("http://{}", addr)) + .with_metadata(metadata), + #[cfg(not(feature = "gzip-tonic"))] opentelemetry_otlp::new_exporter() .tonic() .with_endpoint(format!("http://{}", addr)) From b9b2282496e4f39e1598b81d6a6bb277db82f5b4 Mon Sep 17 00:00:00 2001 From: Bryn Cooke Date: Sat, 22 Jul 2023 13:46:04 +0100 Subject: [PATCH 2/7] Changelog --- opentelemetry-otlp/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index e1e1be6955..bba1ab6190 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -4,6 +4,7 @@ ### Added - Add OTLP HTTP Metrics Exporter [#1020](https://github.com/open-telemetry/opentelemetry-rust/pull/1020). +- Add tonic compression support [#1165](https://github.com/open-telemetry/opentelemetry-rust/pull/1165). ## v0.12.0 From e55e2ecdd1d25612279ff9af5997fe42d6de22db Mon Sep 17 00:00:00 2001 From: Bryn Cooke Date: Sat, 22 Jul 2023 14:00:01 +0100 Subject: [PATCH 3/7] Fix compile error when tonic is not used. --- opentelemetry-otlp/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/opentelemetry-otlp/src/lib.rs b/opentelemetry-otlp/src/lib.rs index 3a13853324..8e04cee600 100644 --- a/opentelemetry-otlp/src/lib.rs +++ b/opentelemetry-otlp/src/lib.rs @@ -398,6 +398,7 @@ pub(crate) fn to_nanos(time: SystemTime) -> u64 { .as_nanos() as u64 } +#[cfg(feature = "grpc-tonic")] pub(crate) fn resolve_compression( tonic_config: &TonicConfig, env_override: &'static str, From 7aacdae9531c1dbd838b9b668ad301c37ac13f91 Mon Sep 17 00:00:00 2001 From: Bryn Cooke Date: Sat, 22 Jul 2023 21:11:22 +0100 Subject: [PATCH 4/7] Lint --- opentelemetry-otlp/src/exporter/mod.rs | 11 ++++++-- opentelemetry-otlp/src/exporter/tonic.rs | 34 ++++++++++++++++++------ opentelemetry-otlp/src/lib.rs | 17 ------------ opentelemetry-otlp/src/logs.rs | 6 ++--- opentelemetry-otlp/src/metric.rs | 30 +++++++++++---------- opentelemetry-otlp/src/span.rs | 7 ++--- 6 files changed, 56 insertions(+), 49 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/mod.rs b/opentelemetry-otlp/src/exporter/mod.rs index 8baca5986a..b4cd4682c5 100644 --- a/opentelemetry-otlp/src/exporter/mod.rs +++ b/opentelemetry-otlp/src/exporter/mod.rs @@ -12,6 +12,7 @@ use crate::{Error, Protocol}; #[cfg(feature = "serialize")] use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::fmt::{Display, Formatter}; use std::str::FromStr; use std::time::Duration; @@ -88,16 +89,22 @@ impl Default for ExportConfig { #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum Compression { /// Compresses data using gzip. - #[cfg(any(feature = "gzip-tonic", feature = "grpc-sys"))] Gzip, } +impl Display for Compression { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Compression::Gzip => write!(f, "gzip"), + } + } +} + impl FromStr for Compression { type Err = Error; fn from_str(s: &str) -> Result { match s { - #[cfg(any(feature = "gzip-tonic", feature = "grpc-sys"))] "gzip" => Ok(Compression::Gzip), _ => Err(Error::UnsupportedCompressionAlgorithm(s.to_string())), } diff --git a/opentelemetry-otlp/src/exporter/tonic.rs b/opentelemetry-otlp/src/exporter/tonic.rs index f93a4b190b..6c776dd10a 100644 --- a/opentelemetry-otlp/src/exporter/tonic.rs +++ b/opentelemetry-otlp/src/exporter/tonic.rs @@ -1,6 +1,7 @@ use crate::exporter::Compression; -use crate::ExportConfig; +use crate::{ExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION}; use std::fmt::{Debug, Formatter}; +use tonic::codec::CompressionEncoding; use tonic::metadata::MetadataMap; #[cfg(feature = "tls")] use tonic::transport::ClientTlsConfig; @@ -24,19 +25,36 @@ pub struct TonicConfig { pub compression: Option, } -impl From for tonic::codec::CompressionEncoding { - fn from(compression: Compression) -> Self { - match compression { +impl TryFrom for tonic::codec::CompressionEncoding { + type Error = crate::Error; + + fn try_from(value: Compression) -> Result { + match value { #[cfg(feature = "gzip-tonic")] - Compression::Gzip => tonic::codec::CompressionEncoding::Gzip, + Compression::Gzip => Ok(tonic::codec::CompressionEncoding::Gzip), #[cfg(not(feature = "gzip-tonic"))] - Compression::Gzip => panic!( - "gzip compression is not enabled, add the tonic feature 'gzip' to your project" - ), + Compression::Gzip => Err(crate::Error::UnsupportedCompressionAlgorithm( + value.to_string(), + )), } } } +pub(crate) fn resolve_compression( + tonic_config: &TonicConfig, + env_override: &'static str, +) -> Result, crate::Error> { + if let Some(compression) = tonic_config.compression { + return Ok(Some(compression.try_into()?)); + } + if let Ok(compression) = std::env::var(env_override) { + return Ok(Some(compression.parse::()?.try_into()?)); + } else if let Ok(compression) = std::env::var(OTEL_EXPORTER_OTLP_COMPRESSION) { + return Ok(Some(compression.parse::()?.try_into()?)); + }; + Ok(None) +} + /// Build a trace exporter that uses [tonic] as grpc layer and opentelemetry protocol. /// /// It allows users to diff --git a/opentelemetry-otlp/src/lib.rs b/opentelemetry-otlp/src/lib.rs index 8e04cee600..9429b99ab5 100644 --- a/opentelemetry-otlp/src/lib.rs +++ b/opentelemetry-otlp/src/lib.rs @@ -397,20 +397,3 @@ pub(crate) fn to_nanos(time: SystemTime) -> u64 { .unwrap_or_else(|_| Duration::from_secs(0)) .as_nanos() as u64 } - -#[cfg(feature = "grpc-tonic")] -pub(crate) fn resolve_compression( - tonic_config: &TonicConfig, - env_override: &'static str, -) -> Result, Error> { - tonic_config - .compression - .map(Ok) - .or_else(|| { - std::env::var(OTEL_EXPORTER_OTLP_COMPRESSION) - .or_else(|_| std::env::var(env_override)) - .ok() - .map(|v| v.parse()) - }) - .transpose() -} diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 4bc5bbaeb5..52430acc5f 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -4,7 +4,7 @@ #[cfg(feature = "grpc-tonic")] use { - crate::exporter::tonic::{TonicConfig, TonicExporterBuilder}, + crate::exporter::tonic::{resolve_compression, TonicConfig, TonicExporterBuilder}, opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient as TonicLogsServiceClient, ExportLogsServiceRequest as TonicRequest, @@ -46,7 +46,7 @@ use { use std::{collections::HashMap, sync::Arc}; use crate::exporter::ExportConfig; -use crate::{resolve_compression, OtlpPipeline}; +use crate::OtlpPipeline; use async_trait::async_trait; use std::{ borrow::Cow, @@ -239,7 +239,7 @@ impl LogExporter { if let Some(compression) = resolve_compression(&tonic_config, OTEL_EXPORTER_OTLP_LOGS_COMPRESSION)? { - log_exporter = log_exporter.send_compressed(compression.into()); + log_exporter = log_exporter.send_compressed(compression); } Ok(LogExporter::Tonic { timeout: config.timeout, diff --git a/opentelemetry-otlp/src/metric.rs b/opentelemetry-otlp/src/metric.rs index 41732cda1c..e6c15f75b5 100644 --- a/opentelemetry-otlp/src/metric.rs +++ b/opentelemetry-otlp/src/metric.rs @@ -4,20 +4,15 @@ //! //! Currently, OTEL metrics exporter only support GRPC connection via tonic on tokio runtime. -use crate::exporter::tonic::TonicExporterBuilder; use crate::transform::sink; -use crate::{resolve_compression, Error, OtlpPipeline}; +use crate::{Error, OtlpPipeline}; use async_trait::async_trait; use core::fmt; use opentelemetry_api::{ global, metrics::{MetricsError, Result}, }; -use opentelemetry_http::Bytes; -#[cfg(feature = "grpc-tonic")] -use opentelemetry_proto::tonic::collector::metrics::v1::{ - metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest, -}; + use opentelemetry_sdk::{ metrics::{ data::{ResourceMetrics, Temporality}, @@ -32,17 +27,23 @@ use opentelemetry_sdk::{ Resource, }; use std::fmt::{Debug, Formatter}; -#[cfg(feature = "grpc-tonic")] -use std::str::FromStr; use std::sync::Mutex; use std::time; use std::time::Duration; use tonic::codegen::{Body, StdError}; use tonic::metadata::KeyAndValueRef; #[cfg(feature = "grpc-tonic")] -use tonic::transport::Channel; -#[cfg(feature = "grpc-tonic")] -use tonic::Request; +use { + crate::exporter::tonic::{resolve_compression, TonicExporterBuilder}, + opentelemetry_proto::tonic::collector::metrics::v1::{ + metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest, + }, + std::str::FromStr, + tonic::codegen::Bytes, + tonic::transport::Channel, + tonic::Request, +}; + #[cfg(feature = "http-proto")] use { crate::exporter::http::HttpExporterBuilder, @@ -383,7 +384,7 @@ impl MetricsExporter { Some(interceptor) => { let mut client = MetricsServiceClient::with_interceptor(channel, interceptor); if let Some(compression) = compression { - client = client.send_compressed(compression.into()); + client = client.send_compressed(compression); } export_sink(client, receiver).await @@ -391,7 +392,7 @@ impl MetricsExporter { None => { let mut client = MetricsServiceClient::new(channel); if let Some(compression) = compression { - client = client.send_compressed(compression.into()) + client = client.send_compressed(compression) } export_sink(client, receiver).await } @@ -477,6 +478,7 @@ async fn http_send_request( Ok(()) } +#[cfg(feature = "grpc-tonic")] async fn export_sink( mut client: MetricsServiceClient, mut receiver: tokio::sync::mpsc::Receiver, diff --git a/opentelemetry-otlp/src/span.rs b/opentelemetry-otlp/src/span.rs index f09bd706e1..698078e7f4 100644 --- a/opentelemetry-otlp/src/span.rs +++ b/opentelemetry-otlp/src/span.rs @@ -9,7 +9,7 @@ use std::time::Duration; use std::str::FromStr; #[cfg(feature = "grpc-tonic")] use { - crate::exporter::tonic::{TonicConfig, TonicExporterBuilder}, + crate::exporter::tonic::{resolve_compression, TonicConfig, TonicExporterBuilder}, opentelemetry_proto::tonic::collector::trace::v1::{ trace_service_client::TraceServiceClient as TonicTraceServiceClient, ExportTraceServiceRequest as TonicRequest, @@ -51,8 +51,6 @@ use { use {std::collections::HashMap, std::sync::Arc}; use crate::exporter::ExportConfig; -#[cfg(feature = "gzip-tonic")] -use crate::resolve_compression; use crate::OtlpPipeline; use opentelemetry_api::{ @@ -392,11 +390,10 @@ impl SpanExporter { ) -> Result { #[allow(unused_mut)] let mut trace_exporter = TonicTraceServiceClient::new(channel); - #[cfg(feature = "gzip-tonic")] if let Some(compression) = resolve_compression(&tonic_config, OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)? { - trace_exporter = trace_exporter.send_compressed(compression.into()) + trace_exporter = trace_exporter.send_compressed(compression) } Ok(SpanExporter::Tonic { From 25608543c5e86de18a8c12b8dc9732190cd71100 Mon Sep 17 00:00:00 2001 From: Bryn Cooke Date: Sat, 22 Jul 2023 22:38:02 +0100 Subject: [PATCH 5/7] Add test --- opentelemetry-otlp/src/exporter/mod.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/opentelemetry-otlp/src/exporter/mod.rs b/opentelemetry-otlp/src/exporter/mod.rs index b4cd4682c5..88e74088b7 100644 --- a/opentelemetry-otlp/src/exporter/mod.rs +++ b/opentelemetry-otlp/src/exporter/mod.rs @@ -249,13 +249,15 @@ impl WithExportConfig for B { mod tests { // If an env test fails then the mutex will be poisoned and the following error will be displayed. const LOCK_POISONED_MESSAGE: &str = "one of the other pipeline builder from env tests failed"; + use crate::exporter::{ default_endpoint, default_protocol, WithExportConfig, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT, OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT, OTEL_EXPORTER_OTLP_PROTOCOL_GRPC, OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_PROTOBUF, OTEL_EXPORTER_OTLP_TIMEOUT, OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT, }; - use crate::{new_exporter, Protocol, OTEL_EXPORTER_OTLP_PROTOCOL}; + use crate::{new_exporter, Compression, Protocol, OTEL_EXPORTER_OTLP_PROTOCOL}; + use std::str::FromStr; use std::sync::Mutex; // Make sure env tests are not running concurrently @@ -377,4 +379,15 @@ mod tests { std::env::remove_var(OTEL_EXPORTER_OTLP_TIMEOUT); assert!(std::env::var(OTEL_EXPORTER_OTLP_TIMEOUT).is_err()); } + + #[test] + fn test_compression_parse() { + assert_eq!(Compression::from_str("gzip").unwrap(), Compression::Gzip); + Compression::from_str("bad_compression").expect_err("bad compression"); + } + + #[test] + fn test_compression_to_str() { + assert_eq!(Compression::Gzip.to_string(), "gzip"); + } } From 7a3c247db74920f4e76cf717c5c8e8d8adae8ede Mon Sep 17 00:00:00 2001 From: Bryn Cooke Date: Mon, 24 Jul 2023 11:22:42 +0100 Subject: [PATCH 6/7] Take in feedback --- opentelemetry-otlp/src/exporter/tonic.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/tonic.rs b/opentelemetry-otlp/src/exporter/tonic.rs index 6c776dd10a..ab1d3bfb2f 100644 --- a/opentelemetry-otlp/src/exporter/tonic.rs +++ b/opentelemetry-otlp/src/exporter/tonic.rs @@ -45,14 +45,14 @@ pub(crate) fn resolve_compression( env_override: &'static str, ) -> Result, crate::Error> { if let Some(compression) = tonic_config.compression { - return Ok(Some(compression.try_into()?)); - } - if let Ok(compression) = std::env::var(env_override) { - return Ok(Some(compression.parse::()?.try_into()?)); + Ok(Some(compression.try_into()?)) + } else if let Ok(compression) = std::env::var(env_override) { + Ok(Some(compression.parse::()?.try_into()?)) } else if let Ok(compression) = std::env::var(OTEL_EXPORTER_OTLP_COMPRESSION) { - return Ok(Some(compression.parse::()?.try_into()?)); - }; - Ok(None) + Ok(Some(compression.parse::()?.try_into()?)) + } else { + Ok(None) + } } /// Build a trace exporter that uses [tonic] as grpc layer and opentelemetry protocol. From 0dca7b4f942beeaecc09660480dd29db42164994 Mon Sep 17 00:00:00 2001 From: Bryn Cooke Date: Mon, 24 Jul 2023 11:27:10 +0100 Subject: [PATCH 7/7] Remove unneeded clippy annotation. --- opentelemetry-otlp/src/span.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/opentelemetry-otlp/src/span.rs b/opentelemetry-otlp/src/span.rs index 698078e7f4..d217ef3025 100644 --- a/opentelemetry-otlp/src/span.rs +++ b/opentelemetry-otlp/src/span.rs @@ -388,7 +388,6 @@ impl SpanExporter { tonic_config: TonicConfig, channel: tonic::transport::Channel, ) -> Result { - #[allow(unused_mut)] let mut trace_exporter = TonicTraceServiceClient::new(channel); if let Some(compression) = resolve_compression(&tonic_config, OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)?