Skip to content

Commit

Permalink
Add separate buffer used in collect_and_reset
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt committed Nov 17, 2024
1 parent c9621ae commit 6d97487
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod sum;
use core::fmt;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::mem::replace;
use std::mem::swap;
use std::ops::{Add, AddAssign, DerefMut, Sub};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
Expand Down Expand Up @@ -66,6 +66,8 @@ where
sorted_attribs: Mutex<HashMap<Vec<KeyValue>, Arc<A>>>,
/// Configuration for an Aggregator
config: A::InitConfig,
/// Swap with `sorted_attribs` on every `collect_and_reset`.
for_collect_after_reset: Mutex<HashMap<Vec<KeyValue>, Arc<A>>>,
}

impl<A> ValueMap<A>
Expand All @@ -78,9 +80,10 @@ where
tracker: A::create(&config),
is_set: AtomicBool::new(false),
},
all_attribs: RwLock::new(HashMap::new()),
sorted_attribs: Mutex::new(HashMap::new()),
all_attribs: RwLock::new(Default::default()),
sorted_attribs: Mutex::new(Default::default()),
config,
for_collect_after_reset: Mutex::new(Default::default()),
}
}

Expand Down Expand Up @@ -170,11 +173,14 @@ where
where
MapFn: FnMut(Vec<KeyValue>, A) -> Res,
{
let mut to_collect = self
.for_collect_after_reset
.lock()
.unwrap_or_else(|err| err.into_inner());
// reset sorted trackers so new attributes set will be written into new hashmap
let trackers = match self.sorted_attribs.lock() {
Ok(mut trackers) => {
let new = HashMap::with_capacity(trackers.len());
replace(trackers.deref_mut(), new)
match self.sorted_attribs.lock() {
Ok(mut trackers) => {
swap(trackers.deref_mut(), to_collect.deref_mut());
}
Err(_) => return,
};
Expand All @@ -184,7 +190,7 @@ where
Err(_) => return,
};

prepare_data(dest, trackers.len());
prepare_data(dest, to_collect.len());

if self.no_attribs.is_set.swap(false, Ordering::AcqRel) {
dest.push(map_fn(
Expand All @@ -193,7 +199,7 @@ where
));
}

for (attrs, tracker) in trackers.into_iter() {
for (attrs, tracker) in to_collect.drain() {
let tracker = Arc::into_inner(tracker).expect("the only instance");
dest.push(map_fn(attrs, tracker));
}
Expand Down

0 comments on commit 6d97487

Please sign in to comment.