diff --git a/opentelemetry-prometheus/src/lib.rs b/opentelemetry-prometheus/src/lib.rs index bb637f52b2..b31a8e81aa 100644 --- a/opentelemetry-prometheus/src/lib.rs +++ b/opentelemetry-prometheus/src/lib.rs @@ -373,7 +373,8 @@ impl prometheus::core::Collector for Collector { /// Maps attributes into Prometheus-style label pairs. /// /// It sanitizes invalid characters and handles duplicate keys (due to -/// sanitization) by sorting and concatenating the values following the spec. +/// sanitization or when user provides duplicate keys) by sorting and +/// concatenating the values following the spec. fn get_attrs(kvs: &mut dyn Iterator, extra: &[LabelPair]) -> Vec { let mut keys_map = BTreeMap::>::new(); for (key, value) in kvs { diff --git a/opentelemetry-prometheus/tests/data/sanitized_labels.txt b/opentelemetry-prometheus/tests/data/sanitized_labels.txt index 79ec70e61f..e80185df17 100644 --- a/opentelemetry-prometheus/tests/data/sanitized_labels.txt +++ b/opentelemetry-prometheus/tests/data/sanitized_labels.txt @@ -1,6 +1,6 @@ # HELP foo_total a sanitary counter # TYPE foo_total counter -foo_total{A_B="Q",C_D="Y;Z",otel_scope_name="testmeter",otel_scope_version="v0.1.0"} 24.3 +foo_total{A_B="Q;X",C_D="Y;Z",otel_scope_name="testmeter",otel_scope_version="v0.1.0"} 24.3 # HELP otel_scope_info Instrumentation Scope metadata # TYPE otel_scope_info gauge otel_scope_info{otel_scope_name="testmeter",otel_scope_version="v0.1.0"} 1 diff --git a/opentelemetry-prometheus/tests/integration_test.rs b/opentelemetry-prometheus/tests/integration_test.rs index aff4c43a99..47770c1d85 100644 --- a/opentelemetry-prometheus/tests/integration_test.rs +++ b/opentelemetry-prometheus/tests/integration_test.rs @@ -135,7 +135,13 @@ fn prometheus_exporter_integration() { builder: ExporterBuilder::default().without_units(), record_metrics: Box::new(|meter| { let attrs = vec![ - // exact match, value should be overwritten + // exact match, SDK will not do de-duplication, + // values should be concatenated. + // The order X;Q vs Q:X is not guaranteed. + // We expect end-users to not produce duplicates in the + // first place, but if that cannot be done, + // we can offer a opt-in feature in SDK to do it. + // https://github.com/open-telemetry/opentelemetry-rust/issues/1300 Key::new("A.B").string("X"), Key::new("A.B").string("Q"), // unintended match due to sanitization, values should be concatenated diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 1b5e0b76f9..028d5c8d43 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -24,6 +24,12 @@ - **Breaking** Remove `TextMapCompositePropagator` [#1373](https://github.com/open-telemetry/opentelemetry-rust/pull/1373). Use `TextMapCompositePropagator` in opentelemetry API. - [#1375](https://github.com/open-telemetry/opentelemetry-rust/pull/1375/) Fix metric collections during PeriodicReader shutdown +- **Breaking** + [#1397](https://github.com/open-telemetry/opentelemetry-rust/issues/1397) + Removes de-duplication of Metric attribute keys to achieve performance gains. + Please share [feedback + here](https://github.com/open-telemetry/opentelemetry-rust/issues/1300), if + you are affected. ## v0.21.1 diff --git a/opentelemetry-sdk/src/attributes/set.rs b/opentelemetry-sdk/src/attributes/set.rs index 06490879a1..97f34ad3d1 100644 --- a/opentelemetry-sdk/src/attributes/set.rs +++ b/opentelemetry-sdk/src/attributes/set.rs @@ -1,4 +1,3 @@ -use std::collections::HashSet; use std::{ cmp::Ordering, hash::{Hash, Hasher}, @@ -109,17 +108,10 @@ pub struct AttributeSet(Vec); impl From<&[KeyValue]> for AttributeSet { fn from(values: &[KeyValue]) -> Self { - let mut seen_keys = HashSet::with_capacity(values.len()); let mut vec = values .iter() .rev() - .filter_map(|kv| { - if seen_keys.insert(kv.key.clone()) { - Some(HashKeyValue(kv.clone())) - } else { - None - } - }) + .map(|kv| HashKeyValue(kv.clone())) .collect::>(); vec.sort_unstable(); diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 9eb34d9303..30420e7491 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -157,6 +157,81 @@ mod tests { ); } + // "multi_thread" tokio flavor must be used else flush won't + // be able to make progress! + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn counter_aggregation_duplicate_attributes() { + // Run this test with stdout enabled to see output. + // cargo test counter_aggregation_duplicate_attributes --features=metrics,testing + + // Arrange + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + + // Act + let meter = meter_provider.meter("test"); + let counter = meter + .u64_counter("my_counter") + .with_unit(Unit::new("my_unit")) + .init(); + counter.add( + 1, + &[ + KeyValue::new("key1", "value1"), + KeyValue::new("key1", "value2"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("key1", "value1"), + KeyValue::new("key1", "value2"), + ], + ); + + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert!(!resource_metrics.is_empty()); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + assert_eq!(metric.name, "my_counter"); + assert_eq!(metric.unit.as_str(), "my_unit"); + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for Counter instruments by default"); + + // Expecting 1 time-series. + assert_eq!(sum.data_points.len(), 1); + + // find and validate key1=value1,key1=value2 datapoint + let data_point = &sum.data_points[0]; + assert_eq!(data_point.value, 2); + assert_eq!( + data_point.attributes.len(), + 2, + "Should have 2 attributes as sdk is not deduplicating attributes" + ); + let key_value1_found = data_point + .attributes + .iter() + .any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value1"); + let key_value2_found = data_point + .attributes + .iter() + .any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value2"); + + assert!( + key_value1_found && key_value2_found, + "Should have found both key1=value1 and key1=value2 attributes as does not dedup" + ); + } + // "multi_thread" tokio flavor must be used else flush won't // be able to make progress! #[tokio::test(flavor = "multi_thread", worker_threads = 1)]