Skip to content

Commit 92e6625

Browse files
mateiidavidtobz
authored andcommitted
Introduce support for thread-local default recorder (#523)
Currently, the only way to install a thread-local recorder is to use the provided function `with_local_recorder` function. In asynchronous contexts, especially where single-threaded runtimes are used, using locally scoped recorders can be cumbersome since `with_local_recorder` does not support async execution (unless the closure itself spawns a runtime). In order to provide an API that overcomes this limitation, for users that want to make use of locally-scoped default recorders, we make it so that the guard can be returned through a newly introduced `set_local_default` function that takes in a trait object. The returned guard is the same type we are using for `with_local_recorder` and when dropped will reset the thread local variable. The change does not introduce any breaking changes to the existing API. It does however make minimal changes to the internals in order to uphold the safety guarantees that were previously in place. Since the guard is no longer tied to a scope that also encloses the reference, we need to have an explicit lifetime on the guard to guarantee the recorder is not dropped before the guard is. We achieve this through a `PhantomData` type which shouldn't result in any runtime overhead. Closes #502 Signed-off-by: Matei <[email protected]>
1 parent a143ef6 commit 92e6625

File tree

1 file changed

+224
-34
lines changed

1 file changed

+224
-34
lines changed

metrics/src/recorder/mod.rs

Lines changed: 224 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{cell::Cell, ptr::NonNull};
1+
use std::{cell::Cell, marker::PhantomData, ptr::NonNull};
22

33
mod cell;
44
use self::cell::RecorderOnceCell;
@@ -64,30 +64,39 @@ pub trait Recorder {
6464
/// (thread-local storage) so that it can be accessed by the macros. This guard ensures that the
6565
/// pointer we store to the reference is cleared when the guard is dropped, so that it can't be used
6666
/// after the closure has finished, even if the closure panics and unwinds the stack.
67-
struct LocalRecorderGuard;
67+
///
68+
/// ## Note
69+
///
70+
/// The guard has a lifetime parameter `'a` that is bounded using a
71+
/// `PhantomData` type. This upholds the guard's contravariance, it must live
72+
/// _at most as long_ as the recorder it takes a reference to. The bounded
73+
/// lifetime prevents accidental use-after-free errors when using a guard
74+
/// directly through [`crate::set_default_local_recorder`].
75+
pub struct LocalRecorderGuard<'a> {
76+
prev_recorder: Option<NonNull<dyn Recorder>>,
77+
phantom: PhantomData<&'a dyn Recorder>,
78+
}
6879

69-
impl LocalRecorderGuard {
80+
impl<'a> LocalRecorderGuard<'a> {
7081
/// Creates a new `LocalRecorderGuard` and sets the thread-local recorder.
71-
fn new(recorder: &dyn Recorder) -> Self {
82+
fn new(recorder: &'a dyn Recorder) -> Self {
7283
// SAFETY: While we take a lifetime-less pointer to the given reference, the reference we
73-
// derive _from_ the pointer is never given a lifetime that exceeds the lifetime of the
74-
// input reference.
84+
// derive _from_ the pointer is given the same lifetime of the reference
85+
// used to construct the guard -- captured in the guard type itself --
86+
// and so derived references never outlive the source reference.
7587
let recorder_ptr = unsafe { NonNull::new_unchecked(recorder as *const _ as *mut _) };
7688

77-
LOCAL_RECORDER.with(|local_recorder| {
78-
local_recorder.set(Some(recorder_ptr));
79-
});
89+
let prev_recorder =
90+
LOCAL_RECORDER.with(|local_recorder| local_recorder.replace(Some(recorder_ptr)));
8091

81-
Self
92+
Self { prev_recorder, phantom: PhantomData }
8293
}
8394
}
8495

85-
impl Drop for LocalRecorderGuard {
96+
impl<'a> Drop for LocalRecorderGuard<'a> {
8697
fn drop(&mut self) {
8798
// Clear the thread-local recorder.
88-
LOCAL_RECORDER.with(|local_recorder| {
89-
local_recorder.set(None);
90-
});
99+
LOCAL_RECORDER.with(|local_recorder| local_recorder.replace(self.prev_recorder.take()));
91100
}
92101
}
93102

@@ -109,6 +118,32 @@ where
109118
GLOBAL_RECORDER.set(recorder)
110119
}
111120

121+
/// Sets the recorder as the default for the current thread for the duration of
122+
/// the lifetime of the returned [`LocalRecorderGuard`].
123+
///
124+
/// This function is suitable for capturing metrics in asynchronous code, in particular
125+
/// when using a single-threaded runtime. Any metrics registered prior to the returned
126+
/// guard will remain attached to the recorder that was present at the time of registration,
127+
/// and so this cannot be used to intercept existing metrics.
128+
///
129+
/// Additionally, local recorders can be used in a nested fashion. When setting a new
130+
/// default local recorder, the previous default local recorder will be captured if one
131+
/// was set, and will be restored when the returned guard drops.
132+
/// the lifetime of the returned [`LocalRecorderGuard`].
133+
///
134+
/// Any metrics recorded before a guard is returned will be completely ignored.
135+
/// Metrics implementations should provide an initialization method that
136+
/// installs the recorder internally.
137+
///
138+
/// The function is suitable for capturing metrics in asynchronous code that
139+
/// uses a single threaded runtime.
140+
///
141+
/// If a global recorder is set, it will be restored once the guard is dropped.
142+
#[must_use]
143+
pub fn set_default_local_recorder(recorder: &dyn Recorder) -> LocalRecorderGuard {
144+
LocalRecorderGuard::new(recorder)
145+
}
146+
112147
/// Runs the closure with the given recorder set as the global recorder for the duration.
113148
pub fn with_local_recorder<T>(recorder: &dyn Recorder, f: impl FnOnce() -> T) -> T {
114149
let _local = LocalRecorderGuard::new(recorder);
@@ -142,19 +177,116 @@ pub fn with_recorder<T>(f: impl FnOnce(&dyn Recorder) -> T) -> T {
142177

143178
#[cfg(test)]
144179
mod tests {
145-
use std::sync::{
146-
atomic::{AtomicBool, Ordering},
147-
Arc,
148-
};
180+
use std::sync::{atomic::Ordering, Arc};
181+
182+
use crate::{with_local_recorder, NoopRecorder};
149183

150184
use super::{Recorder, RecorderOnceCell};
151185

152186
#[test]
153187
fn boxed_recorder_dropped_on_existing_set() {
154188
// This test simply ensures that if a boxed recorder is handed to us to install, and another
155-
// recorder has already been installed, that we drop th new boxed recorder instead of
189+
// recorder has already been installed, that we drop the new boxed recorder instead of
156190
// leaking it.
157-
struct TrackOnDropRecorder(Arc<AtomicBool>);
191+
let recorder_cell = RecorderOnceCell::new();
192+
193+
// This is the first set of the cell, so it should always succeed.
194+
let (first_recorder, _) = test_recorders::TrackOnDropRecorder::new();
195+
let first_set_result = recorder_cell.set(first_recorder);
196+
assert!(first_set_result.is_ok());
197+
198+
// Since the cell is already set, this second set should fail. We'll also then assert that
199+
// our atomic boolean is set to `true`, indicating the drop logic ran for it.
200+
let (second_recorder, was_dropped) = test_recorders::TrackOnDropRecorder::new();
201+
assert!(!was_dropped.load(Ordering::SeqCst));
202+
203+
let second_set_result = recorder_cell.set(second_recorder);
204+
assert!(second_set_result.is_err());
205+
assert!(!was_dropped.load(Ordering::SeqCst));
206+
drop(second_set_result);
207+
assert!(was_dropped.load(Ordering::SeqCst));
208+
}
209+
210+
#[test]
211+
fn thread_scoped_recorder_guards() {
212+
// This test ensures that when a recorder is installed through
213+
// `crate::set_default_local_recorder` it will only be valid in the scope of the
214+
// thread.
215+
//
216+
// The goal of the test is to give confidence that no invalid memory
217+
// access errors are present when operating with locally scoped
218+
// recorders.
219+
let t1_recorder = test_recorders::SimpleCounterRecorder::new();
220+
let t2_recorder = test_recorders::SimpleCounterRecorder::new();
221+
let t3_recorder = test_recorders::SimpleCounterRecorder::new();
222+
// Start a new thread scope to take references to each recorder in the
223+
// closures passed to the thread.
224+
std::thread::scope(|s| {
225+
s.spawn(|| {
226+
let _guard = crate::set_default_local_recorder(&t1_recorder);
227+
crate::counter!("t1_counter").increment(1);
228+
});
229+
230+
s.spawn(|| {
231+
with_local_recorder(&t2_recorder, || {
232+
crate::counter!("t2_counter").increment(2);
233+
})
234+
});
235+
236+
s.spawn(|| {
237+
let _guard = crate::set_default_local_recorder(&t3_recorder);
238+
crate::counter!("t3_counter").increment(3);
239+
});
240+
});
241+
242+
assert!(t1_recorder.get_value() == 1);
243+
assert!(t2_recorder.get_value() == 2);
244+
assert!(t3_recorder.get_value() == 3);
245+
}
246+
247+
#[test]
248+
fn local_recorder_restored_when_dropped() {
249+
// This test ensures that any previously installed local recorders are
250+
// restored when the subsequently installed recorder's guard is dropped.
251+
let root_recorder = test_recorders::SimpleCounterRecorder::new();
252+
// Install the root recorder and increment the counter once.
253+
let _guard = crate::set_default_local_recorder(&root_recorder);
254+
crate::counter!("test_counter").increment(1);
255+
256+
// Install a second recorder and increment its counter once.
257+
let next_recorder = test_recorders::SimpleCounterRecorder::new();
258+
let next_guard = crate::set_default_local_recorder(&next_recorder);
259+
crate::counter!("test_counter").increment(1);
260+
let final_recorder = test_recorders::SimpleCounterRecorder::new();
261+
crate::with_local_recorder(&final_recorder, || {
262+
// Final recorder increments the counter by 10. At the end of the
263+
// closure, the guard should be dropped, and `next_recorder`
264+
// restored.
265+
crate::counter!("test_counter").increment(10);
266+
});
267+
// Since `next_recorder` is restored, we can increment it once and check
268+
// that the value is 2 (+1 before and after the closure).
269+
crate::counter!("test_counter").increment(1);
270+
assert!(next_recorder.get_value() == 2);
271+
drop(next_guard);
272+
273+
// At the end, increment the counter again by an arbitrary value. Since
274+
// `next_guard` is dropped, the root recorder is restored.
275+
crate::counter!("test_counter").increment(20);
276+
assert!(root_recorder.get_value() == 21);
277+
}
278+
279+
mod test_recorders {
280+
use std::sync::{
281+
atomic::{AtomicBool, AtomicU64, Ordering},
282+
Arc,
283+
};
284+
285+
use crate::Recorder;
286+
287+
#[derive(Debug)]
288+
// Tracks how many times the recorder was dropped
289+
pub struct TrackOnDropRecorder(Arc<AtomicBool>);
158290

159291
impl TrackOnDropRecorder {
160292
pub fn new() -> (Self, Arc<AtomicBool>) {
@@ -163,6 +295,8 @@ mod tests {
163295
}
164296
}
165297

298+
// === impl TrackOnDropRecorder ===
299+
166300
impl Recorder for TrackOnDropRecorder {
167301
fn describe_counter(
168302
&self,
@@ -209,22 +343,78 @@ mod tests {
209343
}
210344
}
211345

212-
let recorder_cell = RecorderOnceCell::new();
346+
// A simple recorder that only implements `register_counter`.
347+
#[derive(Debug)]
348+
pub struct SimpleCounterRecorder {
349+
state: Arc<AtomicU64>,
350+
}
213351

214-
// This is the first set of the cell, so it should always succeed.
215-
let (first_recorder, _) = TrackOnDropRecorder::new();
216-
let first_set_result = recorder_cell.set(first_recorder);
217-
assert!(first_set_result.is_ok());
352+
impl SimpleCounterRecorder {
353+
pub fn new() -> Self {
354+
Self { state: Arc::new(AtomicU64::default()) }
355+
}
218356

219-
// Since the cell is already set, this second set should fail. We'll also then assert that
220-
// our atomic boolean is set to `true`, indicating the drop logic ran for it.
221-
let (second_recorder, was_dropped) = TrackOnDropRecorder::new();
222-
assert!(!was_dropped.load(Ordering::SeqCst));
357+
pub fn get_value(&self) -> u64 {
358+
self.state.load(Ordering::Acquire)
359+
}
360+
}
223361

224-
let second_set_result = recorder_cell.set(second_recorder);
225-
assert!(second_set_result.is_err());
226-
assert!(!was_dropped.load(Ordering::SeqCst));
227-
drop(second_set_result);
228-
assert!(was_dropped.load(Ordering::SeqCst));
362+
struct SimpleCounterHandle {
363+
state: Arc<AtomicU64>,
364+
}
365+
366+
impl crate::CounterFn for SimpleCounterHandle {
367+
fn increment(&self, value: u64) {
368+
self.state.fetch_add(value, Ordering::Acquire);
369+
}
370+
371+
fn absolute(&self, _value: u64) {
372+
unimplemented!()
373+
}
374+
}
375+
376+
// === impl SimpleCounterRecorder ===
377+
378+
impl Recorder for SimpleCounterRecorder {
379+
fn describe_counter(
380+
&self,
381+
_: crate::KeyName,
382+
_: Option<crate::Unit>,
383+
_: crate::SharedString,
384+
) {
385+
}
386+
fn describe_gauge(
387+
&self,
388+
_: crate::KeyName,
389+
_: Option<crate::Unit>,
390+
_: crate::SharedString,
391+
) {
392+
}
393+
fn describe_histogram(
394+
&self,
395+
_: crate::KeyName,
396+
_: Option<crate::Unit>,
397+
_: crate::SharedString,
398+
) {
399+
}
400+
401+
fn register_counter(&self, _: &crate::Key, _: &crate::Metadata<'_>) -> crate::Counter {
402+
crate::Counter::from_arc(Arc::new(SimpleCounterHandle {
403+
state: self.state.clone(),
404+
}))
405+
}
406+
407+
fn register_gauge(&self, _: &crate::Key, _: &crate::Metadata<'_>) -> crate::Gauge {
408+
crate::Gauge::noop()
409+
}
410+
411+
fn register_histogram(
412+
&self,
413+
_: &crate::Key,
414+
_: &crate::Metadata<'_>,
415+
) -> crate::Histogram {
416+
crate::Histogram::noop()
417+
}
418+
}
229419
}
230420
}

0 commit comments

Comments
 (0)