Skip to content

Commit c45cdcd

Browse files
committed
Fix #342: Async support for push_metrics
Signed-off-by: Adam Chalmers <[email protected]>
1 parent 8e2d9e8 commit c45cdcd

File tree

2 files changed

+127
-16
lines changed

2 files changed

+127
-16
lines changed

src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,8 @@ pub use self::histogram::{Histogram, HistogramOpts, HistogramTimer, HistogramVec
223223
pub use self::metrics::Opts;
224224
#[cfg(feature = "push")]
225225
pub use self::push::{
226-
hostname_grouping_key, push_add_collector, push_add_metrics, push_collector, push_metrics,
226+
hostname_grouping_key, push_add_collector, push_add_collector_async, push_add_metrics,
227+
push_add_metrics_async, push_collector, push_collector_async, push_metrics, push_metrics_async,
227228
BasicAuthentication,
228229
};
229230
pub use self::registry::Registry;

src/push.rs

Lines changed: 125 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,6 @@ use crate::registry::Registry;
1818

1919
const REQWEST_TIMEOUT_SEC: Duration = Duration::from_secs(10);
2020

21-
lazy_static! {
22-
static ref HTTP_CLIENT: Client = Client::builder()
23-
.timeout(REQWEST_TIMEOUT_SEC)
24-
.build()
25-
.unwrap();
26-
}
27-
2821
/// `BasicAuthentication` holder for supporting `push` to Pushgateway endpoints
2922
/// using Basic access authentication.
3023
/// Can be passed to any `push_metrics` method.
@@ -60,6 +53,18 @@ pub fn push_metrics<S: BuildHasher>(
6053
push(job, grouping, url, mfs, "PUT", basic_auth)
6154
}
6255

56+
/// Functions just like `push_metrics`, except the metrics are pushed
57+
/// asynchronously.
58+
pub async fn push_metrics_async<S: BuildHasher>(
59+
job: &str,
60+
grouping: HashMap<String, String, S>,
61+
url: &str,
62+
mfs: Vec<proto::MetricFamily>,
63+
basic_auth: Option<BasicAuthentication>,
64+
) -> Result<()> {
65+
push_async(job, grouping, url, mfs, "PUT", basic_auth).await
66+
}
67+
6368
/// `push_add_metrics` works like `push_metrics`, but only previously pushed
6469
/// metrics with the same name (and the same job and other grouping labels) will
6570
/// be replaced. (It uses HTTP method 'POST' to push to the Pushgateway.)
@@ -73,16 +78,25 @@ pub fn push_add_metrics<S: BuildHasher>(
7378
push(job, grouping, url, mfs, "POST", basic_auth)
7479
}
7580

81+
/// `push_add_metrics_async` works like `push_metrics`, but async.
82+
pub async fn push_add_metrics_async<'a, S: BuildHasher>(
83+
job: &'a str,
84+
grouping: HashMap<String, String, S>,
85+
url: &'a str,
86+
mfs: Vec<proto::MetricFamily>,
87+
basic_auth: Option<BasicAuthentication>,
88+
) -> Result<()> {
89+
push(job, grouping, url, mfs, "POST", basic_auth)
90+
}
91+
7692
const LABEL_NAME_JOB: &str = "job";
7793

78-
fn push<S: BuildHasher>(
94+
fn configure_push<S: BuildHasher>(
7995
job: &str,
8096
grouping: HashMap<String, String, S>,
8197
url: &str,
8298
mfs: Vec<proto::MetricFamily>,
83-
method: &str,
84-
basic_auth: Option<BasicAuthentication>,
85-
) -> Result<()> {
99+
) -> Result<(String, impl Encoder, Vec<u8>)> {
86100
// Suppress clippy warning needless_pass_by_value.
87101
let grouping = grouping;
88102

@@ -145,7 +159,24 @@ fn push<S: BuildHasher>(
145159
// Ignore error, `no metrics` and `no name`.
146160
let _ = encoder.encode(&[mf], &mut buf);
147161
}
162+
Ok((push_url, encoder, buf))
163+
}
148164

165+
fn push<S: BuildHasher>(
166+
job: &str,
167+
grouping: HashMap<String, String, S>,
168+
url: &str,
169+
mfs: Vec<proto::MetricFamily>,
170+
method: &str,
171+
basic_auth: Option<BasicAuthentication>,
172+
) -> Result<()> {
173+
let (push_url, encoder, buf) = configure_push(job, grouping, url, mfs)?;
174+
lazy_static! {
175+
static ref HTTP_CLIENT: Client = Client::builder()
176+
.timeout(REQWEST_TIMEOUT_SEC)
177+
.build()
178+
.unwrap();
179+
}
149180
let mut builder = HTTP_CLIENT
150181
.request(
151182
Method::from_str(method).unwrap(),
@@ -159,14 +190,50 @@ fn push<S: BuildHasher>(
159190
}
160191

161192
let response = builder.send().map_err(|e| Error::Msg(format!("{}", e)))?;
193+
handle_push_response(response.status(), push_url)
194+
}
195+
196+
async fn push_async<S: BuildHasher>(
197+
job: &str,
198+
grouping: HashMap<String, String, S>,
199+
url: &str,
200+
mfs: Vec<proto::MetricFamily>,
201+
method: &str,
202+
basic_auth: Option<BasicAuthentication>,
203+
) -> Result<()> {
204+
let (push_url, encoder, buf) = configure_push(job, grouping, url, mfs)?;
205+
lazy_static! {
206+
static ref ASYNC_HTTP_CLIENT: reqwest::Client = reqwest::Client::builder()
207+
.timeout(REQWEST_TIMEOUT_SEC)
208+
.build()
209+
.unwrap();
210+
}
211+
let mut builder = ASYNC_HTTP_CLIENT
212+
.request(
213+
Method::from_str(method).unwrap(),
214+
Url::from_str(&push_url).unwrap(),
215+
)
216+
.header(CONTENT_TYPE, encoder.format_type())
217+
.body(buf);
218+
219+
if let Some(BasicAuthentication { username, password }) = basic_auth {
220+
builder = builder.basic_auth(username, Some(password));
221+
}
162222

163-
match response.status() {
223+
let response = builder
224+
.send()
225+
.await
226+
.map_err(|e| Error::Msg(format!("{}", e)))?;
227+
handle_push_response(response.status(), push_url)
228+
}
229+
230+
fn handle_push_response(status: StatusCode, push_url: String) -> Result<()> {
231+
match status {
164232
StatusCode::ACCEPTED => Ok(()),
165233
StatusCode::OK => Ok(()),
166234
_ => Err(Error::Msg(format!(
167235
"unexpected status code {} while pushing to {}",
168-
response.status(),
169-
push_url
236+
status, push_url
170237
))),
171238
}
172239
}
@@ -188,6 +255,23 @@ fn push_from_collector<S: BuildHasher>(
188255
push(job, grouping, url, mfs, method, basic_auth)
189256
}
190257

258+
async fn push_from_collector_async<'a, S: BuildHasher>(
259+
job: &'a str,
260+
grouping: HashMap<String, String, S>,
261+
url: &'a str,
262+
collectors: Vec<Box<dyn Collector>>,
263+
method: &'a str,
264+
basic_auth: Option<BasicAuthentication>,
265+
) -> Result<()> {
266+
let registry = Registry::new();
267+
for bc in collectors {
268+
registry.register(bc)?;
269+
}
270+
271+
let mfs = registry.gather();
272+
push_async(job, grouping, url, mfs, method, basic_auth).await
273+
}
274+
191275
/// `push_collector` push metrics collected from the provided collectors. It is
192276
/// a convenient way to push only a few metrics.
193277
pub fn push_collector<S: BuildHasher>(
@@ -200,7 +284,20 @@ pub fn push_collector<S: BuildHasher>(
200284
push_from_collector(job, grouping, url, collectors, "PUT", basic_auth)
201285
}
202286

203-
/// `push_add_collector` works like `push_add_metrics`, it collects from the
287+
/// `push_collector_async` is just an async version of `push_collector`.
288+
/// Pushes metrics collected from the provided collectors. It is
289+
/// a convenient way to push only a few metrics.
290+
pub async fn push_collector_async<'a, S: BuildHasher>(
291+
job: &'a str,
292+
grouping: HashMap<String, String, S>,
293+
url: &'a str,
294+
collectors: Vec<Box<dyn Collector>>,
295+
basic_auth: Option<BasicAuthentication>,
296+
) -> Result<()> {
297+
push_from_collector_async(job, grouping, url, collectors, "PUT", basic_auth).await
298+
}
299+
300+
/// `push_add_collector` works like `push_add_collector`, it collects from the
204301
/// provided collectors. It is a convenient way to push only a few metrics.
205302
pub fn push_add_collector<S: BuildHasher>(
206303
job: &str,
@@ -212,6 +309,19 @@ pub fn push_add_collector<S: BuildHasher>(
212309
push_from_collector(job, grouping, url, collectors, "POST", basic_auth)
213310
}
214311

312+
/// `push_add_collector_async` works like `push_add_collector`, but async.
313+
/// It collects from the provided collectors. It is a convenient way to push
314+
/// only a few metrics.
315+
pub async fn push_add_collector_async<'a, S: BuildHasher>(
316+
job: &'a str,
317+
grouping: HashMap<String, String, S>,
318+
url: &'a str,
319+
collectors: Vec<Box<dyn Collector>>,
320+
basic_auth: Option<BasicAuthentication>,
321+
) -> Result<()> {
322+
push_from_collector_async(job, grouping, url, collectors, "POST", basic_auth).await
323+
}
324+
215325
const DEFAULT_GROUP_LABEL_PAIR: (&str, &str) = ("instance", "unknown");
216326

217327
/// `hostname_grouping_key` returns a label map with the only entry

0 commit comments

Comments
 (0)