diff --git a/opentelemetry-http/src/lib.rs b/opentelemetry-http/src/lib.rs index f272d8d4c5..b37eec4090 100644 --- a/opentelemetry-http/src/lib.rs +++ b/opentelemetry-http/src/lib.rs @@ -55,13 +55,24 @@ pub type HttpError = Box; /// users to bring their choice of HTTP client. #[async_trait] pub trait HttpClient: Debug + Send + Sync { - /// Send the specified HTTP request + /// Send the specified HTTP request with `Vec` payload /// /// Returns the HTTP response including the status code and body. /// /// Returns an error if it can't connect to the server or the request could not be completed, /// e.g. because of a timeout, infinite redirects, or a loss of connection. - async fn send(&self, request: Request>) -> Result, HttpError>; + #[deprecated(note = "Use `send_bytes` with `Bytes` payload instead.")] + async fn send(&self, request: Request>) -> Result, HttpError> { + self.send_bytes(request.map(Into::into)).await + } + + /// Send the specified HTTP request with `Bytes` payload. + /// + /// Returns the HTTP response including the status code and body. + /// + /// Returns an error if it can't connect to the server or the request could not be completed, + /// e.g. because of a timeout, infinite redirects, or a loss of connection. + async fn send_bytes(&self, request: Request) -> Result, HttpError>; } #[cfg(feature = "reqwest")] @@ -72,7 +83,7 @@ mod reqwest { #[async_trait] impl HttpClient for reqwest::Client { - async fn send(&self, request: Request>) -> Result, HttpError> { + async fn send_bytes(&self, request: Request) -> Result, HttpError> { otel_debug!(name: "ReqwestClient.Send"); let request = request.try_into()?; let mut response = self.execute(request).await?.error_for_status()?; @@ -89,7 +100,7 @@ mod reqwest { #[cfg(not(target_arch = "wasm32"))] #[async_trait] impl HttpClient for reqwest::blocking::Client { - async fn send(&self, request: Request>) -> Result, HttpError> { + async fn send_bytes(&self, request: Request) -> Result, HttpError> { otel_debug!(name: "ReqwestBlockingClient.Send"); let request = request.try_into()?; let mut response = self.execute(request)?.error_for_status()?; @@ -159,7 +170,7 @@ pub mod hyper { #[async_trait] impl HttpClient for HyperClient { - async fn send(&self, request: Request>) -> Result, HttpError> { + async fn send_bytes(&self, request: Request) -> Result, HttpError> { otel_debug!(name: "HyperClient.Send"); let (parts, body) = request.into_parts(); let mut request = Request::from_parts(parts, Body(Full::from(body))); @@ -251,4 +262,4 @@ mod tests { assert!(got.contains(&"headername1")); assert!(got.contains(&"headername2")); } -} +} \ No newline at end of file diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 07a70bf57d..e87e2e20ca 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -44,15 +44,7 @@ mod trace; use opentelemetry_http::hyper::HyperClient; /// Configuration of the http transport -#[derive(Debug)] -#[cfg_attr( - all( - not(feature = "reqwest-client"), - not(feature = "reqwest-blocking-client"), - not(feature = "hyper-client") - ), - derive(Default) -)] +#[derive(Debug, Default)] pub struct HttpConfig { /// Select the HTTP client client: Option>, @@ -61,44 +53,6 @@ pub struct HttpConfig { headers: Option>, } -#[cfg(any( - feature = "reqwest-blocking-client", - feature = "reqwest-client", - feature = "hyper-client" -))] -impl Default for HttpConfig { - fn default() -> Self { - #[cfg(feature = "reqwest-blocking-client")] - let default_client = std::thread::spawn(|| { - Some(Arc::new(reqwest::blocking::Client::new()) as Arc) - }) - .join() - .expect("creating reqwest::blocking::Client on a new thread not to fail"); - #[cfg(all(not(feature = "reqwest-blocking-client"), feature = "reqwest-client"))] - let default_client = Some(Arc::new(reqwest::Client::new()) as Arc); - #[cfg(all( - not(feature = "reqwest-client"), - not(feature = "reqwest-blocking-client"), - feature = "hyper-client" - ))] - // TODO - support configuring custom connector and executor - let default_client = Some(Arc::new(HyperClient::with_default_connector( - Duration::from_secs(10), - None, - )) as Arc); - #[cfg(all( - not(feature = "reqwest-client"), - not(feature = "reqwest-blocking-client"), - not(feature = "hyper-client") - ))] - let default_client = None; - HttpConfig { - client: default_client, - headers: None, - } - } -} - /// Configuration for the OTLP HTTP exporter. /// /// ## Examples @@ -171,11 +125,44 @@ impl HttpExporterBuilder { }, None => self.exporter_config.timeout, }; - let http_client = self - .http_config - .client - .take() - .ok_or(crate::Error::NoHttpClient)?; + let http_client = if let Some(client) = self.http_config.client.take() { + client + } else { + #[cfg(feature = "hyper-client")] + { + Arc::new(HyperClient::with_default_connector(timeout, None)) as Arc + } + #[cfg(all(not(feature = "hyper-client"), feature = "reqwest-client"))] + { + Arc::new( + reqwest::Client::builder() + .timeout(timeout) + .build() + .map_err(|e| crate::Error::Other(e.to_string()))?, + ) as Arc + } + #[cfg(all( + not(feature = "hyper-client"), + not(feature = "reqwest-client"), + feature = "reqwest-blocking-client" + ))] + { + Arc::new( + reqwest::blocking::Client::builder() + .timeout(timeout) + .build() + .map_err(|e| crate::Error::Other(e.to_string()))?, + ) as Arc + } + #[cfg(all( + not(feature = "hyper-client"), + not(feature = "reqwest-client"), + not(feature = "reqwest-blocking-client") + ))] + { + return Err(crate::Error::NoHttpClient); + } + }; #[allow(clippy::mutable_key_type)] // http headers are not mutated let mut headers: HashMap = self .http_config @@ -203,7 +190,7 @@ impl HttpExporterBuilder { endpoint, headers, self.exporter_config.protocol, - timeout, + //timeout, )) } @@ -273,7 +260,7 @@ pub(crate) struct OtlpHttpClient { collector_endpoint: Uri, headers: HashMap, protocol: Protocol, - _timeout: Duration, + //_timeout: Duration, #[allow(dead_code)] // would be removed once we support set_resource for metrics and traces. resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, @@ -286,14 +273,14 @@ impl OtlpHttpClient { collector_endpoint: Uri, headers: HashMap, protocol: Protocol, - timeout: Duration, + //timeout: Duration, ) -> Self { OtlpHttpClient { client: Mutex::new(Some(client)), collector_endpoint, headers, protocol, - _timeout: timeout, + //_timeout: timeout, resource: ResourceAttributesWithSchema::default(), } } diff --git a/opentelemetry-zipkin/src/exporter/uploader.rs b/opentelemetry-zipkin/src/exporter/uploader.rs index 7e8fe6ec7c..fe4bfe0915 100644 --- a/opentelemetry-zipkin/src/exporter/uploader.rs +++ b/opentelemetry-zipkin/src/exporter/uploader.rs @@ -41,9 +41,9 @@ impl JsonV2Client { .method(Method::POST) .uri(self.collector_endpoint.clone()) .header(CONTENT_TYPE, "application/json") - .body(serde_json::to_vec(&spans).unwrap_or_default()) + .body(serde_json::to_vec(&spans).unwrap_or_default().into()) .map_err::(Into::into)?; - let _ = self.client.send(req).await?.error_for_status()?; + let _ = self.client.send_bytes(req).await?.error_for_status()?; Ok(()) } }