diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index c24d0525da..2b8623cabb 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -2,6 +2,8 @@ ## vNext +- *Bug fix*: ObservableGauge returns data points recorded since previous collection, despite temporality. Other asynchronous (observable) instruments with Cumulative temporality behave as synchronous ones and return data points on every collection. [#2213](https://github.com/open-telemetry/opentelemetry-rust/issues/2213) + - *Feature*: Introduced a new feature flag, `experimental_metrics_disable_name_validation`, under the `opentelemetry-sdk`, which allows disabling the Instrument Name Validation. This is useful in scenarios where you need to use *special characters*, *Windows Perf Counter Wildcard Path*, or similar cases. For more details, check [#2543](https://github.com/open-telemetry/opentelemetry-rust/pull/2543). > **WARNING**: While this feature provides flexibility, **be cautious** when using it, as platforms like **Prometheus** impose restrictions on metric names and labels (e.g., no spaces, capital letters, or certain special characters). Using invalid characters may result in compatibility issues or incorrect behavior. Ensure that instrument names comply with the requirements of your target platform to avoid potential problems. diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index 3d8a422587..4497ac2fd3 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -146,8 +146,12 @@ impl AggregateBuilder { } /// Builds a last-value aggregate function input and output. - pub(crate) fn last_value(&self) -> AggregateFns { - LastValue::new(self.temporality, self.filter.clone()).into() + pub(crate) fn last_value(&self, overwrite_temporality: Option) -> AggregateFns { + LastValue::new( + overwrite_temporality.unwrap_or(self.temporality), + self.filter.clone(), + ) + .into() } /// Builds a precomputed sum aggregate function input and output. @@ -210,7 +214,7 @@ mod tests { #[test] fn last_value_aggregation() { let AggregateFns { measure, collect } = - AggregateBuilder::::new(Temporality::Cumulative, None).last_value(); + AggregateBuilder::::new(Temporality::Cumulative, None).last_value(None); let mut a = Gauge { data_points: vec![GaugeDataPoint { attributes: vec![KeyValue::new("a", 1)], diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 7712c30b85..7b96c71527 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -1394,22 +1394,32 @@ mod tests { } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn asynchronous_instruments_cumulative_with_gap_in_measurements() { + async fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement() { // Run this test with stdout enabled to see output. - // cargo test asynchronous_instruments_cumulative_with_gap_in_measurements --features=testing -- --nocapture + // cargo test asynchronous_instruments_cumulative_data_points_only_from_last_measurement --features=testing -- --nocapture - asynchronous_instruments_cumulative_with_gap_in_measurements_helper("counter"); - asynchronous_instruments_cumulative_with_gap_in_measurements_helper("updown_counter"); - asynchronous_instruments_cumulative_with_gap_in_measurements_helper("gauge"); + asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper( + "gauge", true, + ); + // TODO fix: all asynchronous instruments should not emit data points if not measured + // but these implementations are still buggy + asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper( + "counter", false, + ); + asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper( + "updown_counter", + false, + ); } - fn asynchronous_instruments_cumulative_with_gap_in_measurements_helper( + fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper( instrument_name: &'static str, + should_not_emit: bool, ) { let mut test_context = TestContext::new(Temporality::Cumulative); let attributes = Arc::new([KeyValue::new("key1", "value1")]); - // Create instrument and emit measurements + // Create instrument and emit measurements once match instrument_name { "counter" => { let has_run = AtomicBool::new(false); @@ -1466,8 +1476,12 @@ mod tests { test_context.flush_metrics(); - // Test that latest export has the same data as the previous one - assert_correct_export(&mut test_context, instrument_name); + if should_not_emit { + test_context.check_no_metrics(); + } else { + // Test that latest export has the same data as the previous one + assert_correct_export(&mut test_context, instrument_name); + } fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) { match instrument_name { diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index 0c117c8deb..d8c9429c51 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -23,7 +23,7 @@ use crate::{ use self::internal::AggregateFns; -use super::Aggregation; +use super::{Aggregation, Temporality}; /// Connects all of the instruments created by a meter provider to a [MetricReader]. /// @@ -488,9 +488,20 @@ fn aggregate_fn( match agg { Aggregation::Default => aggregate_fn(b, &default_aggregation_selector(kind), kind), Aggregation::Drop => Ok(None), - Aggregation::LastValue => Ok(Some(b.last_value())), + Aggregation::LastValue => { + match kind { + InstrumentKind::Gauge => Ok(Some(b.last_value(None))), + // temporality for LastValue only affects how data points are reported, so we can always use + // delta temporality, because observable instruments should report data points only since previous collection + InstrumentKind::ObservableGauge => Ok(Some(b.last_value(Some(Temporality::Delta)))), + _ => Err(MetricError::Other(format!("LastValue aggregation is only available for Gauge or ObservableGauge, but not for {kind:?}"))) + } + } Aggregation::Sum => { let fns = match kind { + // TODO implement: observable instruments should not report data points on every collect + // from SDK: For asynchronous instruments with Delta or Cumulative aggregation temporality, + // MetricReader.Collect MUST only receive data points with measurements recorded since the previous collection InstrumentKind::ObservableCounter => b.precomputed_sum(true), InstrumentKind::ObservableUpDownCounter => b.precomputed_sum(false), InstrumentKind::Counter | InstrumentKind::Histogram => b.sum(true), @@ -508,6 +519,9 @@ fn aggregate_fn( | InstrumentKind::ObservableUpDownCounter | InstrumentKind::ObservableGauge ); + // TODO implement: observable instruments should not report data points on every collect + // from SDK: For asynchronous instruments with Delta or Cumulative aggregation temporality, + // MetricReader.Collect MUST only receive data points with measurements recorded since the previous collection Ok(Some(b.explicit_bucket_histogram( boundaries.to_vec(), *record_min_max,