From 9e4a5f47cd03031c4625bfb3be4276b6d91ed631 Mon Sep 17 00:00:00 2001 From: Aaron Marten <437496+AaronRM@users.noreply.github.com> Date: Wed, 15 Jan 2025 15:24:32 -0800 Subject: [PATCH 1/7] Fix comment about how to only run unit tests --- opentelemetry-otlp/tests/integration_test/src/test_utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-otlp/tests/integration_test/src/test_utils.rs b/opentelemetry-otlp/tests/integration_test/src/test_utils.rs index 5dd8ac5dd4..cecab22974 100644 --- a/opentelemetry-otlp/tests/integration_test/src/test_utils.rs +++ b/opentelemetry-otlp/tests/integration_test/src/test_utils.rs @@ -13,7 +13,7 @@ //! Only a single test suite can run at once, as each container has statically mapped ports, but //! this works nicely with the way cargo executes the suite. //! -//! To skip integration tests with cargo, you can run `cargo test --mod`, which will run unit tests +//! To skip integration tests with cargo, you can run `cargo test --lib`, which will run unit tests //! only. //! #![cfg(unix)] From e1c406a4c92d346f549431055e972adeda54576c Mon Sep 17 00:00:00 2001 From: Aaron Marten <437496+AaronRM@users.noreply.github.com> Date: Wed, 15 Jan 2025 15:43:25 -0800 Subject: [PATCH 2/7] Remove extraneous lines from test_with_gzip_compression unit test --- opentelemetry-otlp/src/exporter/tonic/mod.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/tonic/mod.rs b/opentelemetry-otlp/src/exporter/tonic/mod.rs index 140c17d534..f92355215a 100644 --- a/opentelemetry-otlp/src/exporter/tonic/mod.rs +++ b/opentelemetry-otlp/src/exporter/tonic/mod.rs @@ -498,9 +498,6 @@ mod tests { #[test] #[cfg(feature = "gzip-tonic")] fn test_with_gzip_compression() { - // metadata should merge with the current one with priority instead of just replacing it - let mut metadata = MetadataMap::new(); - metadata.insert("foo", "bar".parse().unwrap()); let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip); assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip); } From 12f6f6d1d11b0ea6009cefa1b41a48e93756de8a Mon Sep 17 00:00:00 2001 From: Aaron Marten <437496+AaronRM@users.noreply.github.com> Date: Thu, 27 Feb 2025 10:38:16 -0800 Subject: [PATCH 3/7] Add retry_with_exponential_backoff method; Use in tonic logs client --- opentelemetry-otlp/src/exporter/http/mod.rs | 2 +- opentelemetry-otlp/src/exporter/tonic/logs.rs | 66 ++++++++++++------- opentelemetry-otlp/src/lib.rs | 1 + opentelemetry-otlp/src/retry.rs | 47 +++++++++++++ opentelemetry-proto/src/transform/logs.rs | 8 +-- 5 files changed, 96 insertions(+), 28 deletions(-) create mode 100644 opentelemetry-otlp/src/retry.rs diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 537d41d108..02b78f06ea 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -322,7 +322,7 @@ impl OtlpHttpClient { logs: LogBatch<'_>, ) -> opentelemetry_sdk::logs::LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; - let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource); + let resource_logs = group_logs_by_resource_and_scope(&logs, &self.resource); let req = ExportLogsServiceRequest { resource_logs }; match self.protocol { diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index cdbed23be2..85cd04e003 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use core::fmt; use opentelemetry::otel_debug; use opentelemetry_proto::tonic::collector::logs::v1::{ @@ -12,6 +13,8 @@ use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scop use super::BoxInterceptor; use tokio::sync::Mutex; +use crate::retry::{retry_with_exponential_backoff, RetryPolicy}; + pub(crate) struct TonicLogsClient { inner: Option, #[allow(dead_code)] @@ -57,33 +60,50 @@ impl TonicLogsClient { impl LogExporter for TonicLogsClient { async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult { - let (mut client, metadata, extensions) = match &self.inner { - Some(inner) => { - let (m, e, _) = inner - .interceptor - .lock() - .await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here - .call(Request::new(())) - .map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))? - .into_parts(); - (inner.client.clone(), m, e) - } - None => return Err(OTelSdkError::AlreadyShutdown), + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, }; - let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource); + let batch = Arc::new(batch); // Wrap batch in Arc> + + retry_with_exponential_backoff(policy, "TonicLogsClient.Export", { + let batch = Arc::clone(&batch); + move || { + let batch = Arc::clone(&batch); // Clone the Arc inside the closure + Box::pin(async move { + let (mut client, metadata, extensions) = match &self.inner { + Some(inner) => { + let (m, e, _) = inner + .interceptor + .lock() + .await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here + .call(Request::new(())) + .map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))? + .into_parts(); + (inner.client.clone(), m, e) + } + None => return Err(OTelSdkError::AlreadyShutdown), + }; - otel_debug!(name: "TonicsLogsClient.CallingExport"); + let resource_logs = group_logs_by_resource_and_scope(&*batch, &self.resource); - client - .export(Request::from_parts( - metadata, - extensions, - ExportLogsServiceRequest { resource_logs }, - )) - .await - .map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e)))?; - Ok(()) + otel_debug!(name: "TonicsLogsClient.CallingExport"); + + client + .export(Request::from_parts( + metadata, + extensions, + ExportLogsServiceRequest { resource_logs }, + )) + .await + .map(|_| ()) // Map the successful result to Ok(()) + .map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e))) + }) + } + }).await } fn shutdown(&mut self) -> OTelSdkResult { diff --git a/opentelemetry-otlp/src/lib.rs b/opentelemetry-otlp/src/lib.rs index 4d80a017f1..f5cf848b50 100644 --- a/opentelemetry-otlp/src/lib.rs +++ b/opentelemetry-otlp/src/lib.rs @@ -220,6 +220,7 @@ mod metric; #[cfg(feature = "trace")] #[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))] mod span; +mod retry; pub use crate::exporter::Compression; pub use crate::exporter::ExportConfig; diff --git a/opentelemetry-otlp/src/retry.rs b/opentelemetry-otlp/src/retry.rs new file mode 100644 index 0000000000..841ade870d --- /dev/null +++ b/opentelemetry-otlp/src/retry.rs @@ -0,0 +1,47 @@ +use std::future::Future; +use std::time::{Duration, SystemTime}; +use opentelemetry::otel_warn; +use tokio::time::sleep; + +pub(crate) struct RetryPolicy { + pub max_retries: usize, + pub initial_delay_ms: u64, + pub max_delay_ms: u64, + pub jitter_ms: u64, +} + +fn generate_jitter(max_jitter: u64) -> u64 { + let now = SystemTime::now(); + let nanos = now.duration_since(SystemTime::UNIX_EPOCH).unwrap().subsec_nanos(); + nanos as u64 % (max_jitter + 1) +} + +pub(crate) async fn retry_with_exponential_backoff( + policy: RetryPolicy, + operation_name: &str, + mut operation: F, +) -> Result +where + F: FnMut() -> Fut, + E: std::fmt::Debug, + Fut: Future>, +{ + let mut attempt = 0; + let mut delay = policy.initial_delay_ms; + + loop { + match operation().await { + Ok(result) => return Ok(result), + Err(err) if attempt < policy.max_retries => { + attempt += 1; + // Log the error and retry after a delay with jitter + otel_warn!(name: "OtlpRetry", message = format!("Retrying operation {:?} due to error: {:?}", operation_name, err)); + let jitter = generate_jitter(policy.jitter_ms); + let delay_with_jitter = std::cmp::min(delay + jitter, policy.max_delay_ms); + sleep(Duration::from_millis(delay_with_jitter)).await; + delay = std::cmp::min(delay * 2, policy.max_delay_ms); + } + Err(err) => return Err(err), + } + } +} \ No newline at end of file diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index f1a992fc9a..458979cd86 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -164,8 +164,8 @@ pub mod tonic { } } - pub fn group_logs_by_resource_and_scope( - logs: LogBatch<'_>, + pub fn group_logs_by_resource_and_scope<'a>( + logs: &'a LogBatch<'a>, resource: &ResourceAttributesWithSchema, ) -> Vec { // Group logs by target or instrumentation name @@ -273,7 +273,7 @@ mod tests { let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource); + crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0]; @@ -293,7 +293,7 @@ mod tests { let log_batch = LogBatch::new(&logs); let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource); + crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0]; From 98469f3b3d0eeb429363f11cd460db25272ea770 Mon Sep 17 00:00:00 2001 From: Aaron Marten <437496+AaronRM@users.noreply.github.com> Date: Thu, 27 Feb 2025 11:07:20 -0800 Subject: [PATCH 4/7] Add tests and comments to retry.rs --- opentelemetry-otlp/src/retry.rs | 106 +++++++++++++++++++++++++++++++- 1 file changed, 103 insertions(+), 3 deletions(-) diff --git a/opentelemetry-otlp/src/retry.rs b/opentelemetry-otlp/src/retry.rs index 841ade870d..de75f51411 100644 --- a/opentelemetry-otlp/src/retry.rs +++ b/opentelemetry-otlp/src/retry.rs @@ -10,12 +10,14 @@ pub(crate) struct RetryPolicy { pub jitter_ms: u64, } +// Generates a random jitter value up to max_jitter fn generate_jitter(max_jitter: u64) -> u64 { let now = SystemTime::now(); let nanos = now.duration_since(SystemTime::UNIX_EPOCH).unwrap().subsec_nanos(); nanos as u64 % (max_jitter + 1) } +// Retries the given operation with exponential backoff and jitter pub(crate) async fn retry_with_exponential_backoff( policy: RetryPolicy, operation_name: &str, @@ -31,7 +33,7 @@ where loop { match operation().await { - Ok(result) => return Ok(result), + Ok(result) => return Ok(result), // Return the result if the operation succeeds Err(err) if attempt < policy.max_retries => { attempt += 1; // Log the error and retry after a delay with jitter @@ -39,9 +41,107 @@ where let jitter = generate_jitter(policy.jitter_ms); let delay_with_jitter = std::cmp::min(delay + jitter, policy.max_delay_ms); sleep(Duration::from_millis(delay_with_jitter)).await; - delay = std::cmp::min(delay * 2, policy.max_delay_ms); + delay = std::cmp::min(delay * 2, policy.max_delay_ms); // Exponential backoff } - Err(err) => return Err(err), + Err(err) => return Err(err), // Return the error if max retries are reached } } +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::time::timeout; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; + + // Test to ensure generate_jitter returns a value within the expected range + #[tokio::test] + async fn test_generate_jitter() { + let max_jitter = 100; + let jitter = generate_jitter(max_jitter); + assert!(jitter <= max_jitter); + } + + // Test to ensure retry_with_exponential_backoff succeeds on the first attempt + #[tokio::test] + async fn test_retry_with_exponential_backoff_success() { + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + let result = retry_with_exponential_backoff(policy, "test_operation", || { + Box::pin(async { Ok::<_, ()>("success") }) + }).await; + + assert_eq!(result, Ok("success")); + } + + // Test to ensure retry_with_exponential_backoff retries the operation and eventually succeeds + #[tokio::test] + async fn test_retry_with_exponential_backoff_retries() { + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + let attempts = AtomicUsize::new(0); + + let result = retry_with_exponential_backoff(policy, "test_operation", || { + let attempt = attempts.fetch_add(1, Ordering::SeqCst); + Box::pin(async move { + if attempt < 2 { + Err::<&str, &str>("error") // Fail the first two attempts + } else { + Ok::<&str, &str>("success") // Succeed on the third attempt + } + }) + }).await; + + assert_eq!(result, Ok("success")); + assert_eq!(attempts.load(Ordering::SeqCst), 3); // Ensure there were 3 attempts + } + + // Test to ensure retry_with_exponential_backoff fails after max retries + #[tokio::test] + async fn test_retry_with_exponential_backoff_failure() { + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + let attempts = AtomicUsize::new(0); + + let result = retry_with_exponential_backoff(policy, "test_operation", || { + attempts.fetch_add(1, Ordering::SeqCst); + Box::pin(async { Err::<(), _>("error") }) // Always fail + }).await; + + assert_eq!(result, Err("error")); + assert_eq!(attempts.load(Ordering::SeqCst), 4); // Ensure there were 4 attempts (initial + 3 retries) + } + + // Test to ensure retry_with_exponential_backoff respects the timeout + #[tokio::test] + async fn test_retry_with_exponential_backoff_timeout() { + let policy = RetryPolicy { + max_retries: 12, // Increase the number of retries + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + let result = timeout(Duration::from_secs(1), retry_with_exponential_backoff(policy, "test_operation", || { + Box::pin(async { Err::<(), _>("error") }) // Always fail + })).await; + + assert!(result.is_err()); // Ensure the operation times out + } } \ No newline at end of file From b2dd5e7508b855c9b39de4c85706fe0ec6524180 Mon Sep 17 00:00:00 2001 From: Aaron Marten <437496+AaronRM@users.noreply.github.com> Date: Thu, 13 Mar 2025 06:25:46 -0700 Subject: [PATCH 5/7] Move retry to opentelemetry-sdk (WIP) --- opentelemetry-otlp/src/exporter/tonic/logs.rs | 4 +- opentelemetry-otlp/src/lib.rs | 1 - opentelemetry-sdk/src/lib.rs | 3 + .../src/retry.rs | 98 +++++++++++++++++-- 4 files changed, 94 insertions(+), 12 deletions(-) rename {opentelemetry-otlp => opentelemetry-sdk}/src/retry.rs (56%) diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 85cd04e003..3426360c59 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -13,7 +13,7 @@ use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scop use super::BoxInterceptor; use tokio::sync::Mutex; -use crate::retry::{retry_with_exponential_backoff, RetryPolicy}; +use opentelemetry_sdk::retry::{retry_with_exponential_backoff, RetryPolicy}; pub(crate) struct TonicLogsClient { inner: Option, @@ -69,7 +69,7 @@ impl LogExporter for TonicLogsClient { let batch = Arc::new(batch); // Wrap batch in Arc> - retry_with_exponential_backoff(policy, "TonicLogsClient.Export", { + retry_with_exponential_backoff::<_, _, _, _, tokio::time::Sleep>(policy, "TonicLogsClient.Export", { let batch = Arc::clone(&batch); move || { let batch = Arc::clone(&batch); // Clone the Arc inside the closure diff --git a/opentelemetry-otlp/src/lib.rs b/opentelemetry-otlp/src/lib.rs index f5cf848b50..4d80a017f1 100644 --- a/opentelemetry-otlp/src/lib.rs +++ b/opentelemetry-otlp/src/lib.rs @@ -220,7 +220,6 @@ mod metric; #[cfg(feature = "trace")] #[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))] mod span; -mod retry; pub use crate::exporter::Compression; pub use crate::exporter::ExportConfig; diff --git a/opentelemetry-sdk/src/lib.rs b/opentelemetry-sdk/src/lib.rs index b51e8f8ea4..58680e6dec 100644 --- a/opentelemetry-sdk/src/lib.rs +++ b/opentelemetry-sdk/src/lib.rs @@ -149,3 +149,6 @@ pub use resource::Resource; pub mod error; pub use error::ExportError; + +/// Retry logic for exporting telemetry data. +pub mod retry; \ No newline at end of file diff --git a/opentelemetry-otlp/src/retry.rs b/opentelemetry-sdk/src/retry.rs similarity index 56% rename from opentelemetry-otlp/src/retry.rs rename to opentelemetry-sdk/src/retry.rs index de75f51411..c65247c6ec 100644 --- a/opentelemetry-otlp/src/retry.rs +++ b/opentelemetry-sdk/src/retry.rs @@ -1,12 +1,31 @@ +//! This module provides functionality for retrying operations with exponential backoff and jitter. +//! +//! The `RetryPolicy` struct defines the configuration for the retry behavior, including the maximum +//! number of retries, initial delay, maximum delay, and jitter. +//! +//! The `Sleep` trait abstracts the sleep functionality, allowing different implementations for +//! various async runtimes such as Tokio and async-std, as well as a synchronous implementation. +//! +//! The `retry_with_exponential_backoff` function retries the given operation according to the +//! specified retry policy, using exponential backoff and jitter to determine the delay between +//! retries. The function logs errors and retries the operation until it succeeds or the maximum +//! number of retries is reached. + use std::future::Future; +use std::pin::Pin; use std::time::{Duration, SystemTime}; use opentelemetry::otel_warn; -use tokio::time::sleep; -pub(crate) struct RetryPolicy { +/// Configuration for retry policy. +#[derive(Debug)] +pub struct RetryPolicy { + /// Maximum number of retry attempts. pub max_retries: usize, + /// Initial delay in milliseconds before the first retry. pub initial_delay_ms: u64, + /// Maximum delay in milliseconds between retries. pub max_delay_ms: u64, + /// Maximum jitter in milliseconds to add to the delay. pub jitter_ms: u64, } @@ -17,8 +36,68 @@ fn generate_jitter(max_jitter: u64) -> u64 { nanos as u64 % (max_jitter + 1) } -// Retries the given operation with exponential backoff and jitter -pub(crate) async fn retry_with_exponential_backoff( +/// Trait to abstract the sleep functionality. +pub trait Sleep { + /// The future returned by the sleep function. + type SleepFuture: Future; + + /// Sleeps for the specified duration. + fn sleep(duration: Duration) -> Self::SleepFuture; +} + +/// Implementation of the Sleep trait for tokio::time::Sleep +#[cfg(feature = "rt-tokio")] +impl Sleep for tokio::time::Sleep { + type SleepFuture = tokio::time::Sleep; + + fn sleep(duration: Duration) -> Self::SleepFuture { + tokio::time::sleep(duration) + } +} + +#[cfg(feature = "rt-async-std")] +/// There is no direct equivalent to `tokio::time::Sleep` in `async-std`. +/// Instead, we create a new struct `AsyncStdSleep` and implement the `Sleep` +/// trait for it, boxing the future returned by `async_std::task::sleep` to fit +/// the trait's associated type requirements. +#[derive(Debug)] +pub struct AsyncStdSleep; + +/// Implementation of the Sleep trait for async-std +#[cfg(feature = "rt-async-std")] +impl Sleep for AsyncStdSleep { + type SleepFuture = Pin + Send>>; + + fn sleep(duration: Duration) -> Self::SleepFuture { + Box::pin(async_std::task::sleep(duration)) + } +} + +/// Implement the Sleep trait for synchronous sleep +#[derive(Debug)] +pub struct StdSleep; + +impl Sleep for StdSleep { + type SleepFuture = std::future::Ready<()>; + + fn sleep(duration: Duration) -> Self::SleepFuture { + std::thread::sleep(duration); + std::future::ready(()) + } +} + +/// Retries the given operation with exponential backoff and jitter. +/// +/// # Arguments +/// +/// * `policy` - The retry policy configuration. +/// * `operation_name` - The name of the operation being retried. +/// * `operation` - The operation to be retried. +/// +/// # Returns +/// +/// A `Result` containing the operation's result or an error if the maximum retries are reached. +pub async fn retry_with_exponential_backoff( policy: RetryPolicy, operation_name: &str, mut operation: F, @@ -27,6 +106,7 @@ where F: FnMut() -> Fut, E: std::fmt::Debug, Fut: Future>, + S: Sleep, { let mut attempt = 0; let mut delay = policy.initial_delay_ms; @@ -40,7 +120,7 @@ where otel_warn!(name: "OtlpRetry", message = format!("Retrying operation {:?} due to error: {:?}", operation_name, err)); let jitter = generate_jitter(policy.jitter_ms); let delay_with_jitter = std::cmp::min(delay + jitter, policy.max_delay_ms); - sleep(Duration::from_millis(delay_with_jitter)).await; + S::sleep(Duration::from_millis(delay_with_jitter)).await; delay = std::cmp::min(delay * 2, policy.max_delay_ms); // Exponential backoff } Err(err) => return Err(err), // Return the error if max retries are reached @@ -73,7 +153,7 @@ mod tests { jitter_ms: 100, }; - let result = retry_with_exponential_backoff(policy, "test_operation", || { + let result = retry_with_exponential_backoff::<_, _, _, _, tokio::time::Sleep>(policy, "test_operation", || { Box::pin(async { Ok::<_, ()>("success") }) }).await; @@ -92,7 +172,7 @@ mod tests { let attempts = AtomicUsize::new(0); - let result = retry_with_exponential_backoff(policy, "test_operation", || { + let result = retry_with_exponential_backoff::<_, _, _, _, tokio::time::Sleep>(policy, "test_operation", || { let attempt = attempts.fetch_add(1, Ordering::SeqCst); Box::pin(async move { if attempt < 2 { @@ -119,7 +199,7 @@ mod tests { let attempts = AtomicUsize::new(0); - let result = retry_with_exponential_backoff(policy, "test_operation", || { + let result = retry_with_exponential_backoff::<_, _, _, _, tokio::time::Sleep>(policy, "test_operation", || { attempts.fetch_add(1, Ordering::SeqCst); Box::pin(async { Err::<(), _>("error") }) // Always fail }).await; @@ -138,7 +218,7 @@ mod tests { jitter_ms: 100, }; - let result = timeout(Duration::from_secs(1), retry_with_exponential_backoff(policy, "test_operation", || { + let result = timeout(Duration::from_secs(1), retry_with_exponential_backoff::<_, _, _, _, tokio::time::Sleep>(policy, "test_operation", || { Box::pin(async { Err::<(), _>("error") }) // Always fail })).await; From 26bb2c2235f3f1c583ff59318b885b02f76237fa Mon Sep 17 00:00:00 2001 From: Aaron Marten <437496+AaronRM@users.noreply.github.com> Date: Thu, 27 Mar 2025 16:16:09 -0700 Subject: [PATCH 6/7] Fix build warning --- .../tests/integration_test/src/logs_asserter.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs b/opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs index 2c9339c6b4..9daccbea66 100644 --- a/opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs +++ b/opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs @@ -1,8 +1,5 @@ use anyhow::Result; -use opentelemetry_proto::tonic::{ - common::v1::KeyValue, - logs::v1::{LogRecord, LogsData, ResourceLogs}, -}; +use opentelemetry_proto::tonic::logs::v1::{LogRecord, LogsData, ResourceLogs}; use std::fs::File; // Given two ResourceLogs, assert that they are equal except for the timestamps From 16dd9a5861ee1dcc4292267b183063edfc8b5ea9 Mon Sep 17 00:00:00 2001 From: Aaron Marten <437496+AaronRM@users.noreply.github.com> Date: Thu, 27 Mar 2025 16:16:32 -0700 Subject: [PATCH 7/7] Scope retry to just logs+tonic --- opentelemetry-otlp/src/exporter/tonic/logs.rs | 111 +++++++++------- opentelemetry-otlp/src/exporter/tonic/mod.rs | 5 + .../src/exporter/tonic}/retry.rs | 118 +++++++++--------- opentelemetry-sdk/src/lib.rs | 3 - 4 files changed, 131 insertions(+), 106 deletions(-) rename {opentelemetry-sdk/src => opentelemetry-otlp/src/exporter/tonic}/retry.rs (72%) diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 954ec4ee71..21042f1070 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -1,4 +1,3 @@ -use std::sync::Arc; use core::fmt; use opentelemetry::otel_debug; use opentelemetry_proto::tonic::collector::logs::v1::{ @@ -13,7 +12,7 @@ use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scop use super::BoxInterceptor; -use opentelemetry_sdk::retry::{retry_with_exponential_backoff, RetryPolicy}; +use super::retry::{retry_with_exponential_backoff, RetryPolicy}; pub(crate) struct TonicLogsClient { inner: Mutex>, @@ -60,57 +59,79 @@ impl TonicLogsClient { impl LogExporter for TonicLogsClient { async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult { + let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() { + Some(inner) => { + let (m, e, _) = inner + .interceptor + .call(Request::new(())) + .map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))? + .into_parts(); + (inner.client.clone(), m, e) + } + None => return Err(OTelSdkError::AlreadyShutdown), + }; + + let resource_logs = group_logs_by_resource_and_scope(&batch, &self.resource); + + otel_debug!(name: "TonicsLogsClient.CallingExport"); + + // First attempt without retry + let result = client + .export(Request::from_parts( + metadata.clone(), + extensions.clone(), + ExportLogsServiceRequest { + resource_logs: resource_logs.clone() + }, + )) + .await; + + // If the first attempt succeeds, return success + if result.is_ok() { + return Ok(()); + } + + // If the first attempt fails, try with retry + otel_debug!(name: "TonicsLogsClient.FirstAttemptFailed.Retrying"); + let policy = RetryPolicy { - max_retries: 3, + max_retries: 10, initial_delay_ms: 100, max_delay_ms: 1600, jitter_ms: 100, }; - - let batch = Arc::new(batch); // Wrap batch in Arc> - - retry_with_exponential_backoff::<_, _, _, _, tokio::time::Sleep>(policy, "TonicLogsClient.Export", { - let batch = Arc::clone(&batch); - move || { - let batch = Arc::clone(&batch); // Clone the Arc inside the closure - Box::pin(async move { - let (mut client, metadata, extensions) = match &self.inner { - Some(inner) => { - let (m, e, _) = inner - .interceptor - .lock() - .await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here - .call(Request::new(())) - .map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))? - .into_parts(); - (inner.client.clone(), m, e) - } - None => return Err(OTelSdkError::AlreadyShutdown), - }; - - let resource_logs = group_logs_by_resource_and_scope(&*batch, &self.resource); - - otel_debug!(name: "TonicsLogsClient.CallingExport"); - - client - .export(Request::from_parts( - metadata, - extensions, - ExportLogsServiceRequest { resource_logs }, - )) - .await - .map(|_| ()) // Map the successful result to Ok(()) - .map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e))) - }) + + // Now use retry_with_exponential_backoff for subsequent attempts + retry_with_exponential_backoff( + policy, + "TonicsLogsClient.export", + || async { + client + .clone() + .export(Request::from_parts( + metadata.clone(), + extensions.clone(), + ExportLogsServiceRequest { + resource_logs: resource_logs.clone() + }, + )) + .await + .map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e))) } - }).await + ) + .await + .map(|_| ()) // Convert successful response to () as required by OTelSdkResult } - fn shutdown(&mut self) -> OTelSdkResult { - match self.inner.take() { - Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown. - None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down. - } + fn shutdown(&self) -> OTelSdkResult { + // TODO: Implement actual shutdown + // Due to the use of tokio::sync::Mutex to guard + // the inner client, we need to await the call to lock the mutex + // and that requires async runtime. + // It is possible to fix this by using + // a dedicated thread just to handle shutdown. + // But for now, we just return Ok. + Ok(()) } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { diff --git a/opentelemetry-otlp/src/exporter/tonic/mod.rs b/opentelemetry-otlp/src/exporter/tonic/mod.rs index f92355215a..3d5ff5459a 100644 --- a/opentelemetry-otlp/src/exporter/tonic/mod.rs +++ b/opentelemetry-otlp/src/exporter/tonic/mod.rs @@ -28,6 +28,11 @@ pub(crate) mod metrics; #[cfg(feature = "trace")] pub(crate) mod trace; +// For now, we are not exposing the retry policy. Only work with grpc-tonic since retry takes a hard dependency on tokio +// while we sort out an abstraction for the async runtime which can be used by all exporters. +#[cfg(feature = "grpc-tonic")] +mod retry; + /// Configuration for [tonic] /// /// [tonic]: https://github.com/hyperium/tonic diff --git a/opentelemetry-sdk/src/retry.rs b/opentelemetry-otlp/src/exporter/tonic/retry.rs similarity index 72% rename from opentelemetry-sdk/src/retry.rs rename to opentelemetry-otlp/src/exporter/tonic/retry.rs index c65247c6ec..d9a283a463 100644 --- a/opentelemetry-sdk/src/retry.rs +++ b/opentelemetry-otlp/src/exporter/tonic/retry.rs @@ -12,13 +12,12 @@ //! number of retries is reached. use std::future::Future; -use std::pin::Pin; use std::time::{Duration, SystemTime}; use opentelemetry::otel_warn; /// Configuration for retry policy. #[derive(Debug)] -pub struct RetryPolicy { +pub(super) struct RetryPolicy { /// Maximum number of retry attempts. pub max_retries: usize, /// Initial delay in milliseconds before the first retry. @@ -36,55 +35,54 @@ fn generate_jitter(max_jitter: u64) -> u64 { nanos as u64 % (max_jitter + 1) } -/// Trait to abstract the sleep functionality. -pub trait Sleep { - /// The future returned by the sleep function. - type SleepFuture: Future; - - /// Sleeps for the specified duration. - fn sleep(duration: Duration) -> Self::SleepFuture; -} - -/// Implementation of the Sleep trait for tokio::time::Sleep -#[cfg(feature = "rt-tokio")] -impl Sleep for tokio::time::Sleep { - type SleepFuture = tokio::time::Sleep; - - fn sleep(duration: Duration) -> Self::SleepFuture { - tokio::time::sleep(duration) - } -} - -#[cfg(feature = "rt-async-std")] -/// There is no direct equivalent to `tokio::time::Sleep` in `async-std`. -/// Instead, we create a new struct `AsyncStdSleep` and implement the `Sleep` -/// trait for it, boxing the future returned by `async_std::task::sleep` to fit -/// the trait's associated type requirements. -#[derive(Debug)] -pub struct AsyncStdSleep; - -/// Implementation of the Sleep trait for async-std -#[cfg(feature = "rt-async-std")] -impl Sleep for AsyncStdSleep { - type SleepFuture = Pin + Send>>; - - fn sleep(duration: Duration) -> Self::SleepFuture { - Box::pin(async_std::task::sleep(duration)) - } -} - -/// Implement the Sleep trait for synchronous sleep -#[derive(Debug)] -pub struct StdSleep; - -impl Sleep for StdSleep { - type SleepFuture = std::future::Ready<()>; - - fn sleep(duration: Duration) -> Self::SleepFuture { - std::thread::sleep(duration); - std::future::ready(()) - } -} +// /// Trait to abstract the sleep functionality. +// pub trait Sleep { +// /// The future returned by the sleep function. +// type SleepFuture: Future; + +// /// Sleeps for the specified duration. +// fn sleep(duration: Duration) -> Self::SleepFuture; +// } + +// /// Implementation of the Sleep trait for tokio::time::Sleep +// #[cfg(feature = "rt-tokio")] +// impl Sleep for tokio::time::Sleep { +// type SleepFuture = tokio::time::Sleep; + +// fn sleep(duration: Duration) -> Self::SleepFuture { +// } +// } + +// #[cfg(feature = "rt-async-std")] +// /// There is no direct equivalent to `tokio::time::Sleep` in `async-std`. +// /// Instead, we create a new struct `AsyncStdSleep` and implement the `Sleep` +// /// trait for it, boxing the future returned by `async_std::task::sleep` to fit +// /// the trait's associated type requirements. +// #[derive(Debug)] +// pub struct AsyncStdSleep; + +// /// Implementation of the Sleep trait for async-std +// #[cfg(feature = "rt-async-std")] +// impl Sleep for AsyncStdSleep { +// type SleepFuture = Pin + Send>>; + +// fn sleep(duration: Duration) -> Self::SleepFuture { +// Box::pin(async_std::task::sleep(duration)) +// } +// } + +// /// Implement the Sleep trait for synchronous sleep +// #[derive(Debug)] +// pub struct StdSleep; + +// impl Sleep for StdSleep { +// type SleepFuture = std::future::Ready<()>; + +// fn sleep(duration: Duration) -> Self::SleepFuture { +// std::thread::sleep(duration); +// std::future::ready(()) +// } +// } /// Retries the given operation with exponential backoff and jitter. /// @@ -97,7 +95,7 @@ impl Sleep for StdSleep { /// # Returns /// /// A `Result` containing the operation's result or an error if the maximum retries are reached. -pub async fn retry_with_exponential_backoff( +pub(super) async fn retry_with_exponential_backoff( policy: RetryPolicy, operation_name: &str, mut operation: F, @@ -106,7 +104,6 @@ where F: FnMut() -> Fut, E: std::fmt::Debug, Fut: Future>, - S: Sleep, { let mut attempt = 0; let mut delay = policy.initial_delay_ms; @@ -120,7 +117,12 @@ where otel_warn!(name: "OtlpRetry", message = format!("Retrying operation {:?} due to error: {:?}", operation_name, err)); let jitter = generate_jitter(policy.jitter_ms); let delay_with_jitter = std::cmp::min(delay + jitter, policy.max_delay_ms); - S::sleep(Duration::from_millis(delay_with_jitter)).await; + + // Retry currently only supports tokio::time::sleep (for use with gRPC/tonic). This + // should be replaced with a more generic sleep function that works with async-std + // and a synchronous runtime in the future. + tokio::time::sleep(Duration::from_millis(delay_with_jitter)).await; + delay = std::cmp::min(delay * 2, policy.max_delay_ms); // Exponential backoff } Err(err) => return Err(err), // Return the error if max retries are reached @@ -153,7 +155,7 @@ mod tests { jitter_ms: 100, }; - let result = retry_with_exponential_backoff::<_, _, _, _, tokio::time::Sleep>(policy, "test_operation", || { + let result = retry_with_exponential_backoff(policy, "test_operation", || { Box::pin(async { Ok::<_, ()>("success") }) }).await; @@ -172,7 +174,7 @@ mod tests { let attempts = AtomicUsize::new(0); - let result = retry_with_exponential_backoff::<_, _, _, _, tokio::time::Sleep>(policy, "test_operation", || { + let result = retry_with_exponential_backoff(policy, "test_operation", || { let attempt = attempts.fetch_add(1, Ordering::SeqCst); Box::pin(async move { if attempt < 2 { @@ -199,7 +201,7 @@ mod tests { let attempts = AtomicUsize::new(0); - let result = retry_with_exponential_backoff::<_, _, _, _, tokio::time::Sleep>(policy, "test_operation", || { + let result = retry_with_exponential_backoff(policy, "test_operation", || { attempts.fetch_add(1, Ordering::SeqCst); Box::pin(async { Err::<(), _>("error") }) // Always fail }).await; @@ -218,7 +220,7 @@ mod tests { jitter_ms: 100, }; - let result = timeout(Duration::from_secs(1), retry_with_exponential_backoff::<_, _, _, _, tokio::time::Sleep>(policy, "test_operation", || { + let result = timeout(Duration::from_secs(1), retry_with_exponential_backoff(policy, "test_operation", || { Box::pin(async { Err::<(), _>("error") }) // Always fail })).await; diff --git a/opentelemetry-sdk/src/lib.rs b/opentelemetry-sdk/src/lib.rs index c9514cf75e..127f9e9094 100644 --- a/opentelemetry-sdk/src/lib.rs +++ b/opentelemetry-sdk/src/lib.rs @@ -149,6 +149,3 @@ pub use resource::Resource; pub mod error; pub use error::ExportError; - -/// Retry logic for exporting telemetry data. -pub mod retry; \ No newline at end of file