Skip to content

Commit c9621ae

Browse files
committed
Separate HashMap for sorted attribs
1 parent a707bb9 commit c9621ae

File tree

1 file changed

+98
-87
lines changed
  • opentelemetry-sdk/src/metrics/internal

1 file changed

+98
-87
lines changed

opentelemetry-sdk/src/metrics/internal/mod.rs

+98-87
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ mod precomputed_sum;
66
mod sum;
77

88
use core::fmt;
9-
use std::collections::{HashMap, HashSet};
10-
use std::mem::take;
9+
use std::collections::hash_map::Entry;
10+
use std::collections::HashMap;
11+
use std::mem::replace;
1112
use std::ops::{Add, AddAssign, DerefMut, Sub};
12-
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
13-
use std::sync::{Arc, RwLock};
13+
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
14+
use std::sync::{Arc, Mutex, RwLock};
1415

1516
use aggregate::is_under_cardinality_limit;
1617
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
@@ -43,6 +44,11 @@ pub(crate) trait Aggregator {
4344
fn clone_and_reset(&self, init: &Self::InitConfig) -> Self;
4445
}
4546

47+
struct NoAttribs<A> {
48+
tracker: A,
49+
is_set: AtomicBool,
50+
}
51+
4652
/// The storage for sums.
4753
///
4854
/// This structure is parametrized by an `Operation` that indicates how
@@ -51,14 +57,13 @@ pub(crate) struct ValueMap<A>
5157
where
5258
A: Aggregator,
5359
{
54-
/// Trackers store the values associated with different attribute sets.
55-
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,
56-
/// Number of different attribute set stored in the `trackers` map.
57-
count: AtomicUsize,
58-
/// Indicates whether a value with no attributes has been stored.
59-
has_no_attribute_value: AtomicBool,
60-
/// Tracker for values with no attributes attached.
61-
no_attribute_tracker: A,
60+
// for performance reasons, no_attribs tracker
61+
no_attribs: NoAttribs<A>,
62+
// for performance reasons, to handle attributes in the provided order
63+
all_attribs: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,
64+
// different order of attribute keys should still map to same tracker instance
65+
// this helps to achieve that and also enables implementing collection efficiently
66+
sorted_attribs: Mutex<HashMap<Vec<KeyValue>, Arc<A>>>,
6267
/// Configuration for an Aggregator
6368
config: A::InitConfig,
6469
}
@@ -69,70 +74,68 @@ where
6974
{
7075
fn new(config: A::InitConfig) -> Self {
7176
ValueMap {
72-
trackers: RwLock::new(HashMap::new()),
73-
has_no_attribute_value: AtomicBool::new(false),
74-
no_attribute_tracker: A::create(&config),
75-
count: AtomicUsize::new(0),
77+
no_attribs: NoAttribs {
78+
tracker: A::create(&config),
79+
is_set: AtomicBool::new(false),
80+
},
81+
all_attribs: RwLock::new(HashMap::new()),
82+
sorted_attribs: Mutex::new(HashMap::new()),
7683
config,
7784
}
7885
}
7986

8087
fn measure(&self, value: A::PreComputedValue, attributes: &[KeyValue]) {
8188
if attributes.is_empty() {
82-
self.no_attribute_tracker.update(value);
83-
self.has_no_attribute_value.store(true, Ordering::Release);
89+
self.no_attribs.tracker.update(value);
90+
self.no_attribs.is_set.store(true, Ordering::Release);
8491
return;
8592
}
8693

87-
let Ok(trackers) = self.trackers.read() else {
88-
return;
89-
};
90-
9194
// Try to retrieve and update the tracker with the attributes in the provided order first
92-
if let Some(tracker) = trackers.get(attributes) {
93-
tracker.update(value);
94-
return;
95-
}
95+
match self.all_attribs.read() {
96+
Ok(trackers) => {
97+
if let Some(tracker) = trackers.get(attributes) {
98+
tracker.update(value);
99+
return;
100+
}
101+
}
102+
Err(_) => return,
103+
};
96104

97-
// Try to retrieve and update the tracker with the attributes sorted.
105+
// Get or create a tracker
98106
let sorted_attrs = AttributeSet::from(attributes).into_vec();
99-
if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
100-
tracker.update(value);
107+
let Ok(mut sorted_trackers) = self.sorted_attribs.lock() else {
101108
return;
102-
}
109+
};
110+
111+
let sorted_count = sorted_trackers.len();
112+
let new_tracker = match sorted_trackers.entry(sorted_attrs) {
113+
Entry::Occupied(occupied_entry) => occupied_entry.get().clone(),
114+
Entry::Vacant(vacant_entry) => {
115+
if !is_under_cardinality_limit(sorted_count) {
116+
sorted_trackers.entry(STREAM_OVERFLOW_ATTRIBUTES.clone())
117+
.or_insert_with(|| {
118+
otel_warn!( name: "ValueMap.measure",
119+
message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
120+
);
121+
Arc::new(A::create(&self.config))
122+
})
123+
.update(value);
124+
return;
125+
}
126+
let new_tracker = Arc::new(A::create(&self.config));
127+
vacant_entry.insert(new_tracker).clone()
128+
}
129+
};
130+
drop(sorted_trackers);
103131

104-
// Give up the read lock before acquiring the write lock.
105-
drop(trackers);
132+
new_tracker.update(value);
106133

107-
let Ok(mut trackers) = self.trackers.write() else {
134+
// Insert new tracker, so we could find it next time
135+
let Ok(mut all_trackers) = self.all_attribs.write() else {
108136
return;
109137
};
110-
111-
// Recheck both the provided and sorted orders after acquiring the write lock
112-
// in case another thread has pushed an update in the meantime.
113-
if let Some(tracker) = trackers.get(attributes) {
114-
tracker.update(value);
115-
} else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
116-
tracker.update(value);
117-
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
118-
let new_tracker = Arc::new(A::create(&self.config));
119-
new_tracker.update(value);
120-
121-
// Insert tracker with the attributes in the provided and sorted orders
122-
trackers.insert(attributes.to_vec(), new_tracker.clone());
123-
trackers.insert(sorted_attrs, new_tracker);
124-
125-
self.count.fetch_add(1, Ordering::SeqCst);
126-
} else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) {
127-
overflow_value.update(value);
128-
} else {
129-
let new_tracker = A::create(&self.config);
130-
new_tracker.update(value);
131-
trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker));
132-
otel_warn!( name: "ValueMap.measure",
133-
message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
134-
);
135-
}
138+
all_trackers.insert(attributes.to_vec(), new_tracker);
136139
}
137140

138141
/// Iterate through all attribute sets and populate `DataPoints` in readonly mode.
@@ -141,20 +144,23 @@ where
141144
where
142145
MapFn: FnMut(Vec<KeyValue>, &A) -> Res,
143146
{
144-
prepare_data(dest, self.count.load(Ordering::SeqCst));
145-
if self.has_no_attribute_value.load(Ordering::Acquire) {
146-
dest.push(map_fn(vec![], &self.no_attribute_tracker));
147-
}
148-
149-
let Ok(trackers) = self.trackers.read() else {
150-
return;
147+
let trackers = match self.sorted_attribs.lock() {
148+
Ok(trackers) => {
149+
// it's important to release lock as fast as possible,
150+
// so we don't block insertion of new attribute sets
151+
trackers.clone()
152+
}
153+
Err(_) => return,
151154
};
152155

153-
let mut seen = HashSet::new();
154-
for (attrs, tracker) in trackers.iter() {
155-
if seen.insert(Arc::as_ptr(tracker)) {
156-
dest.push(map_fn(attrs.clone(), tracker));
157-
}
156+
prepare_data(dest, trackers.len());
157+
158+
if self.no_attribs.is_set.load(Ordering::Acquire) {
159+
dest.push(map_fn(vec![], &self.no_attribs.tracker));
160+
}
161+
162+
for (attrs, tracker) in trackers.into_iter() {
163+
dest.push(map_fn(attrs, &tracker));
158164
}
159165
}
160166

@@ -164,35 +170,40 @@ where
164170
where
165171
MapFn: FnMut(Vec<KeyValue>, A) -> Res,
166172
{
167-
prepare_data(dest, self.count.load(Ordering::SeqCst));
168-
if self.has_no_attribute_value.swap(false, Ordering::AcqRel) {
173+
// reset sorted trackers so new attributes set will be written into new hashmap
174+
let trackers = match self.sorted_attribs.lock() {
175+
Ok(mut trackers) => {
176+
let new = HashMap::with_capacity(trackers.len());
177+
replace(trackers.deref_mut(), new)
178+
}
179+
Err(_) => return,
180+
};
181+
// reset all trackers, so all attribute sets will start using new hashmap
182+
match self.all_attribs.write() {
183+
Ok(mut all_trackers) => all_trackers.clear(),
184+
Err(_) => return,
185+
};
186+
187+
prepare_data(dest, trackers.len());
188+
189+
if self.no_attribs.is_set.swap(false, Ordering::AcqRel) {
169190
dest.push(map_fn(
170191
vec![],
171-
self.no_attribute_tracker.clone_and_reset(&self.config),
192+
self.no_attribs.tracker.clone_and_reset(&self.config),
172193
));
173194
}
174195

175-
let trackers = match self.trackers.write() {
176-
Ok(mut trackers) => {
177-
self.count.store(0, Ordering::SeqCst);
178-
take(trackers.deref_mut())
179-
}
180-
Err(_) => todo!(),
181-
};
182-
183-
let mut seen = HashSet::new();
184196
for (attrs, tracker) in trackers.into_iter() {
185-
if seen.insert(Arc::as_ptr(&tracker)) {
186-
dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
187-
}
197+
let tracker = Arc::into_inner(tracker).expect("the only instance");
198+
dest.push(map_fn(attrs, tracker));
188199
}
189200
}
190201
}
191202

192203
/// Clear and allocate exactly required amount of space for all attribute-sets
193204
fn prepare_data<T>(data: &mut Vec<T>, list_len: usize) {
194205
data.clear();
195-
let total_len = list_len + 2; // to account for no_attributes case + overflow state
206+
let total_len = list_len + 1; // to account for no_attributes case
196207
if total_len > data.capacity() {
197208
data.reserve_exact(total_len - data.capacity());
198209
}

0 commit comments

Comments
 (0)