Skip to content

Commit 5d76e0c

Browse files
authored
feat: future_metrics (#19)
1 parent ec50345 commit 5d76e0c

5 files changed

Lines changed: 227 additions & 1 deletion

File tree

.github/workflows/ci.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ jobs:
8787
uses: reviewdog/action-misspell@v1
8888
with:
8989
github_token: ${{ secrets.github_token }}
90-
locale: "US"
9190

9291
cocogitto:
9392
name: cocogitto

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ geoblock = ["geoip/middleware"]
3333
geoip = ["dep:geoip"]
3434
http = []
3535
metrics = ["dep:metrics", "future/metrics", "alloc/metrics", "http/metrics"]
36+
future_metrics = ["dep:future_metrics"]
3637
profiler = ["alloc/profiler"]
3738
rate_limit = ["dep:rate_limit"]
3839

@@ -47,6 +48,7 @@ future = { path = "./crates/future", optional = true }
4748
geoip = { path = "./crates/geoip", optional = true }
4849
http = { path = "./crates/http", optional = true }
4950
metrics = { path = "./crates/metrics", optional = true }
51+
future_metrics = { path = "./crates/future_metrics", optional = true }
5052
rate_limit = { path = "./crates/rate_limit", optional = true }
5153

5254
[dev-dependencies]

crates/future_metrics/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[package]
2+
name = "future_metrics"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
pin-project = "1"
8+
metrics = "0.23"

crates/future_metrics/src/lib.rs

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
use {
2+
metrics::{Counter, Gauge, Histogram, Key, Label, Level, Metadata},
3+
std::{
4+
future::Future,
5+
pin::Pin,
6+
task::{Context, Poll},
7+
time::{Duration, Instant},
8+
},
9+
};
10+
11+
/// Target specified in [`metrics::Metadata`] for all metrics produced by this
12+
/// crate.
13+
pub const METADATA_TARGET: &str = "future_metrics";
14+
15+
/// Metric names used by this crate.
16+
pub mod metric_name {
17+
pub const FUTURE_DURATION: &str = "future_duration";
18+
pub const FUTURE_CANCELLED_DURATION: &str = "future_cancelled_duration";
19+
20+
pub const FUTURES_CREATED: &str = "futures_created_count";
21+
pub const FUTURES_STARTED: &str = "futures_started_count";
22+
pub const FUTURES_FINISHED: &str = "futures_finished_count";
23+
pub const FUTURES_CANCELLED: &str = "futures_cancelled_count";
24+
25+
pub const FUTURE_POLL_DURATION: &str = "future_poll_duration";
26+
pub const FUTURE_POLL_DURATION_MAX: &str = "future_poll_duration_max";
27+
pub const FUTURE_POLLS: &str = "future_polls_count";
28+
}
29+
30+
/// Creates a new label identifying a future by its name.
31+
pub const fn future_name(s: &'static str) -> Label {
32+
Label::from_static_parts("future_name", s)
33+
}
34+
35+
pub trait FutureExt: Sized {
36+
/// Consumes the future, returning a new future that records the executiion
37+
/// metrics of the inner future.
38+
///
39+
/// It is expected that you provide at least one label identifying the
40+
/// future being metered.
41+
/// Consider using [`future_name`] label, or the [`FutureExt::with_metrics`]
42+
/// shortcut.
43+
fn with_labeled_metrics(self, labels: &'static [Label]) -> Metered<Self> {
44+
Metered::new(self, labels)
45+
}
46+
47+
/// A shortcut for [`FutureExt::with_labeled_metrics`] using a single label
48+
/// only (presumably [`future_name`]).
49+
fn with_metrics(self, label: &'static Label) -> Metered<Self> {
50+
self.with_labeled_metrics(std::slice::from_ref(label))
51+
}
52+
}
53+
54+
impl<F> FutureExt for F where F: Future {}
55+
56+
#[pin_project::pin_project]
57+
#[must_use = "futures do nothing unless you `.await` or poll them"]
58+
pub struct Metered<F> {
59+
#[pin]
60+
future: F,
61+
state: State,
62+
}
63+
64+
struct State {
65+
started_at: Option<Instant>,
66+
is_finished: bool,
67+
68+
poll_duration_sum: Duration,
69+
poll_duration_max: Duration,
70+
polls_count: usize,
71+
72+
metrics: Metrics,
73+
}
74+
75+
impl<F> Metered<F> {
76+
fn new(future: F, metric_labels: &'static [Label]) -> Self {
77+
let metrics = Metrics::new(metric_labels);
78+
79+
metrics.created.increment(1);
80+
81+
Self {
82+
future,
83+
state: State {
84+
started_at: None,
85+
is_finished: false,
86+
poll_duration_sum: Duration::from_secs(0),
87+
poll_duration_max: Duration::from_secs(0),
88+
polls_count: 0,
89+
metrics,
90+
},
91+
}
92+
}
93+
}
94+
95+
impl<F: Future> Future for Metered<F> {
96+
type Output = F::Output;
97+
98+
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
99+
let mut this = self.project();
100+
let state = &mut this.state;
101+
102+
if state.started_at.is_none() {
103+
state.started_at = Some(Instant::now());
104+
state.metrics.started.increment(1);
105+
}
106+
107+
let poll_started_at = Instant::now();
108+
let result = this.future.poll(cx);
109+
let poll_duration = poll_started_at.elapsed();
110+
111+
state.poll_duration_sum += poll_duration;
112+
state.poll_duration_max = state.poll_duration_max.max(poll_duration);
113+
state.polls_count += 1;
114+
115+
if result.is_ready() && !state.is_finished {
116+
state.is_finished = true;
117+
118+
state.metrics.finished.increment(1);
119+
120+
if let Some(started_at) = state.started_at {
121+
state.metrics.duration.record(started_at.elapsed())
122+
}
123+
}
124+
125+
result
126+
}
127+
}
128+
129+
impl Drop for State {
130+
fn drop(&mut self) {
131+
if !self.is_finished {
132+
self.metrics.cancelled.increment(1);
133+
134+
if let Some(started_at) = self.started_at {
135+
self.metrics.cancelled_duration.record(started_at.elapsed())
136+
}
137+
}
138+
139+
self.metrics
140+
.poll_duration
141+
.record(duration_as_millis_f64(self.poll_duration_sum));
142+
143+
self.metrics
144+
.poll_duration_max
145+
.set(duration_as_millis_f64(self.poll_duration_max));
146+
147+
self.metrics.polls.increment(self.polls_count as u64);
148+
}
149+
}
150+
151+
struct Metrics {
152+
duration: Histogram,
153+
cancelled_duration: Histogram,
154+
155+
created: Counter,
156+
started: Counter,
157+
finished: Counter,
158+
cancelled: Counter,
159+
160+
poll_duration: Histogram,
161+
poll_duration_max: Gauge,
162+
polls: Counter,
163+
}
164+
165+
impl Metrics {
166+
fn new(labels: &'static [Label]) -> Self {
167+
metrics::with_recorder(|r| {
168+
let metadata = Metadata::new(METADATA_TARGET, Level::INFO, None);
169+
170+
Self {
171+
duration: r.register_histogram(
172+
&Key::from_static_parts(metric_name::FUTURE_DURATION, labels),
173+
&metadata,
174+
),
175+
cancelled_duration: r.register_histogram(
176+
&Key::from_static_parts(metric_name::FUTURE_CANCELLED_DURATION, labels),
177+
&metadata,
178+
),
179+
created: r.register_counter(
180+
&Key::from_static_parts(metric_name::FUTURES_CREATED, labels),
181+
&metadata,
182+
),
183+
started: r.register_counter(
184+
&Key::from_static_parts(metric_name::FUTURES_STARTED, labels),
185+
&metadata,
186+
),
187+
finished: r.register_counter(
188+
&Key::from_static_parts(metric_name::FUTURES_FINISHED, labels),
189+
&metadata,
190+
),
191+
cancelled: r.register_counter(
192+
&Key::from_static_parts(metric_name::FUTURES_CANCELLED, labels),
193+
&metadata,
194+
),
195+
poll_duration: r.register_histogram(
196+
&Key::from_static_parts(metric_name::FUTURE_POLL_DURATION, labels),
197+
&metadata,
198+
),
199+
poll_duration_max: r.register_gauge(
200+
&Key::from_static_parts(metric_name::FUTURE_POLL_DURATION_MAX, labels),
201+
&metadata,
202+
),
203+
polls: r.register_counter(
204+
&Key::from_static_parts(metric_name::FUTURE_POLLS, labels),
205+
&metadata,
206+
),
207+
}
208+
})
209+
}
210+
}
211+
212+
#[inline]
213+
pub fn duration_as_millis_f64(val: Duration) -> f64 {
214+
val.as_secs_f64() * 1000.0
215+
}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ pub use analytics;
77
pub use collections;
88
#[cfg(feature = "future")]
99
pub use future;
10+
#[cfg(feature = "future_metrics")]
11+
pub use future_metrics;
1012
#[cfg(feature = "geoip")]
1113
pub use geoip;
1214
#[cfg(feature = "http")]

0 commit comments

Comments
 (0)