Skip to content

Otel reqwest blocking timeout support #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions opentelemetry-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,24 @@ pub type HttpError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// users to bring their choice of HTTP client.
#[async_trait]
pub trait HttpClient: Debug + Send + Sync {
/// Send the specified HTTP request
/// Send the specified HTTP request with `Vec<u8>` payload
///
/// Returns the HTTP response including the status code and body.
///
/// Returns an error if it can't connect to the server or the request could not be completed,
/// e.g. because of a timeout, infinite redirects, or a loss of connection.
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError>;
#[deprecated(note = "Use `send_bytes` with `Bytes` payload instead.")]
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
self.send_bytes(request.map(Into::into)).await
}

/// Send the specified HTTP request with `Bytes` payload.
///
/// Returns the HTTP response including the status code and body.
///
/// Returns an error if it can't connect to the server or the request could not be completed,
/// e.g. because of a timeout, infinite redirects, or a loss of connection.
async fn send_bytes(&self, request: Request<Bytes>) -> Result<Response<Bytes>, HttpError>;
}

#[cfg(feature = "reqwest")]
Expand All @@ -72,7 +83,7 @@ mod reqwest {

#[async_trait]
impl HttpClient for reqwest::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
async fn send_bytes(&self, request: Request<Bytes>) -> Result<Response<Bytes>, HttpError> {
otel_debug!(name: "ReqwestClient.Send");
let request = request.try_into()?;
let mut response = self.execute(request).await?.error_for_status()?;
Expand All @@ -89,7 +100,7 @@ mod reqwest {
#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
impl HttpClient for reqwest::blocking::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
async fn send_bytes(&self, request: Request<Bytes>) -> Result<Response<Bytes>, HttpError> {
otel_debug!(name: "ReqwestBlockingClient.Send");
let request = request.try_into()?;
let mut response = self.execute(request)?.error_for_status()?;
Expand Down Expand Up @@ -159,7 +170,7 @@ pub mod hyper {

#[async_trait]
impl HttpClient for HyperClient {
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
async fn send_bytes(&self, request: Request<Bytes>) -> Result<Response<Bytes>, HttpError> {
otel_debug!(name: "HyperClient.Send");
let (parts, body) = request.into_parts();
let mut request = Request::from_parts(parts, Body(Full::from(body)));
Expand Down Expand Up @@ -251,4 +262,4 @@ mod tests {
assert!(got.contains(&"headername1"));
assert!(got.contains(&"headername2"));
}
}
}
99 changes: 43 additions & 56 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,7 @@ mod trace;
use opentelemetry_http::hyper::HyperClient;

/// Configuration of the http transport
#[derive(Debug)]
#[cfg_attr(
all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client"),
not(feature = "hyper-client")
),
derive(Default)
)]
#[derive(Debug, Default)]
pub struct HttpConfig {
/// Select the HTTP client
client: Option<Arc<dyn HttpClient>>,
Expand All @@ -61,44 +53,6 @@ pub struct HttpConfig {
headers: Option<HashMap<String, String>>,
}

#[cfg(any(
feature = "reqwest-blocking-client",
feature = "reqwest-client",
feature = "hyper-client"
))]
impl Default for HttpConfig {
fn default() -> Self {
#[cfg(feature = "reqwest-blocking-client")]
let default_client = std::thread::spawn(|| {
Some(Arc::new(reqwest::blocking::Client::new()) as Arc<dyn HttpClient>)
})
.join()
.expect("creating reqwest::blocking::Client on a new thread not to fail");
#[cfg(all(not(feature = "reqwest-blocking-client"), feature = "reqwest-client"))]
let default_client = Some(Arc::new(reqwest::Client::new()) as Arc<dyn HttpClient>);
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client"),
feature = "hyper-client"
))]
// TODO - support configuring custom connector and executor
let default_client = Some(Arc::new(HyperClient::with_default_connector(
Duration::from_secs(10),
None,
)) as Arc<dyn HttpClient>);
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client"),
not(feature = "hyper-client")
))]
let default_client = None;
HttpConfig {
client: default_client,
headers: None,
}
}
}

/// Configuration for the OTLP HTTP exporter.
///
/// ## Examples
Expand Down Expand Up @@ -171,11 +125,44 @@ impl HttpExporterBuilder {
},
None => self.exporter_config.timeout,
};
let http_client = self
.http_config
.client
.take()
.ok_or(crate::Error::NoHttpClient)?;
let http_client = if let Some(client) = self.http_config.client.take() {
client
} else {
#[cfg(feature = "hyper-client")]
{
Arc::new(HyperClient::with_default_connector(timeout, None)) as Arc<dyn HttpClient>
}
#[cfg(all(not(feature = "hyper-client"), feature = "reqwest-client"))]
{
Arc::new(
reqwest::Client::builder()
.timeout(timeout)
.build()
.map_err(|e| crate::Error::Other(e.to_string()))?,
) as Arc<dyn HttpClient>
}
#[cfg(all(
not(feature = "hyper-client"),
not(feature = "reqwest-client"),
feature = "reqwest-blocking-client"
))]
{
Arc::new(
reqwest::blocking::Client::builder()
.timeout(timeout)
.build()
.map_err(|e| crate::Error::Other(e.to_string()))?,
) as Arc<dyn HttpClient>
}
#[cfg(all(
not(feature = "hyper-client"),
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client")
))]
{
return Err(crate::Error::NoHttpClient);
}
};
#[allow(clippy::mutable_key_type)] // http headers are not mutated
let mut headers: HashMap<HeaderName, HeaderValue> = self
.http_config
Expand Down Expand Up @@ -203,7 +190,7 @@ impl HttpExporterBuilder {
endpoint,
headers,
self.exporter_config.protocol,
timeout,
//timeout,
))
}

Expand Down Expand Up @@ -273,7 +260,7 @@ pub(crate) struct OtlpHttpClient {
collector_endpoint: Uri,
headers: HashMap<HeaderName, HeaderValue>,
protocol: Protocol,
_timeout: Duration,
//_timeout: Duration,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics and traces.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
Expand All @@ -286,14 +273,14 @@ impl OtlpHttpClient {
collector_endpoint: Uri,
headers: HashMap<HeaderName, HeaderValue>,
protocol: Protocol,
timeout: Duration,
//timeout: Duration,
) -> Self {
OtlpHttpClient {
client: Mutex::new(Some(client)),
collector_endpoint,
headers,
protocol,
_timeout: timeout,
//_timeout: timeout,
resource: ResourceAttributesWithSchema::default(),
}
}
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-zipkin/src/exporter/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ impl JsonV2Client {
.method(Method::POST)
.uri(self.collector_endpoint.clone())
.header(CONTENT_TYPE, "application/json")
.body(serde_json::to_vec(&spans).unwrap_or_default())
.body(serde_json::to_vec(&spans).unwrap_or_default().into())
.map_err::<Error, _>(Into::into)?;
let _ = self.client.send(req).await?.error_for_status()?;
let _ = self.client.send_bytes(req).await?.error_for_status()?;
Ok(())
}
}
Loading