diff --git a/src/lib.rs b/src/lib.rs index a3bb9f68..10c0bede 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -223,7 +223,8 @@ pub use self::histogram::{Histogram, HistogramOpts, HistogramTimer, HistogramVec pub use self::metrics::Opts; #[cfg(feature = "push")] pub use self::push::{ - hostname_grouping_key, push_add_collector, push_add_metrics, push_collector, push_metrics, + hostname_grouping_key, push_add_collector, push_add_collector_async, push_add_metrics, + push_add_metrics_async, push_collector, push_collector_async, push_metrics, push_metrics_async, BasicAuthentication, }; pub use self::registry::Registry; diff --git a/src/push.rs b/src/push.rs index 80a253a6..6aafc5a5 100644 --- a/src/push.rs +++ b/src/push.rs @@ -18,13 +18,6 @@ use crate::registry::Registry; const REQWEST_TIMEOUT_SEC: Duration = Duration::from_secs(10); -lazy_static! { - static ref HTTP_CLIENT: Client = Client::builder() - .timeout(REQWEST_TIMEOUT_SEC) - .build() - .unwrap(); -} - /// `BasicAuthentication` holder for supporting `push` to Pushgateway endpoints /// using Basic access authentication. /// Can be passed to any `push_metrics` method. @@ -60,6 +53,18 @@ pub fn push_metrics( push(job, grouping, url, mfs, "PUT", basic_auth) } +/// Functions just like `push_metrics`, except the metrics are pushed +/// asynchronously. +pub async fn push_metrics_async( + job: &str, + grouping: HashMap, + url: &str, + mfs: Vec, + basic_auth: Option, +) -> Result<()> { + push_async(job, grouping, url, mfs, "PUT", basic_auth).await +} + /// `push_add_metrics` works like `push_metrics`, but only previously pushed /// metrics with the same name (and the same job and other grouping labels) will /// be replaced. (It uses HTTP method 'POST' to push to the Pushgateway.) @@ -73,16 +78,25 @@ pub fn push_add_metrics( push(job, grouping, url, mfs, "POST", basic_auth) } +/// `push_add_metrics_async` works like `push_metrics`, but async. +pub async fn push_add_metrics_async<'a, S: BuildHasher>( + job: &'a str, + grouping: HashMap, + url: &'a str, + mfs: Vec, + basic_auth: Option, +) -> Result<()> { + push(job, grouping, url, mfs, "POST", basic_auth) +} + const LABEL_NAME_JOB: &str = "job"; -fn push( +fn configure_push( job: &str, grouping: HashMap, url: &str, mfs: Vec, - method: &str, - basic_auth: Option, -) -> Result<()> { +) -> Result<(String, impl Encoder, Vec)> { // Suppress clippy warning needless_pass_by_value. let grouping = grouping; @@ -145,7 +159,24 @@ fn push( // Ignore error, `no metrics` and `no name`. let _ = encoder.encode(&[mf], &mut buf); } + Ok((push_url, encoder, buf)) +} +fn push( + job: &str, + grouping: HashMap, + url: &str, + mfs: Vec, + method: &str, + basic_auth: Option, +) -> Result<()> { + let (push_url, encoder, buf) = configure_push(job, grouping, url, mfs)?; + lazy_static! { + static ref HTTP_CLIENT: Client = Client::builder() + .timeout(REQWEST_TIMEOUT_SEC) + .build() + .unwrap(); + } let mut builder = HTTP_CLIENT .request( Method::from_str(method).unwrap(), @@ -159,14 +190,50 @@ fn push( } let response = builder.send().map_err(|e| Error::Msg(format!("{}", e)))?; + handle_push_response(response.status(), push_url) +} + +async fn push_async( + job: &str, + grouping: HashMap, + url: &str, + mfs: Vec, + method: &str, + basic_auth: Option, +) -> Result<()> { + let (push_url, encoder, buf) = configure_push(job, grouping, url, mfs)?; + lazy_static! { + static ref ASYNC_HTTP_CLIENT: reqwest::Client = reqwest::Client::builder() + .timeout(REQWEST_TIMEOUT_SEC) + .build() + .unwrap(); + } + let mut builder = ASYNC_HTTP_CLIENT + .request( + Method::from_str(method).unwrap(), + Url::from_str(&push_url).unwrap(), + ) + .header(CONTENT_TYPE, encoder.format_type()) + .body(buf); + + if let Some(BasicAuthentication { username, password }) = basic_auth { + builder = builder.basic_auth(username, Some(password)); + } - match response.status() { + let response = builder + .send() + .await + .map_err(|e| Error::Msg(format!("{}", e)))?; + handle_push_response(response.status(), push_url) +} + +fn handle_push_response(status: StatusCode, push_url: String) -> Result<()> { + match status { StatusCode::ACCEPTED => Ok(()), StatusCode::OK => Ok(()), _ => Err(Error::Msg(format!( "unexpected status code {} while pushing to {}", - response.status(), - push_url + status, push_url ))), } } @@ -188,6 +255,23 @@ fn push_from_collector( push(job, grouping, url, mfs, method, basic_auth) } +async fn push_from_collector_async<'a, S: BuildHasher>( + job: &'a str, + grouping: HashMap, + url: &'a str, + collectors: Vec>, + method: &'a str, + basic_auth: Option, +) -> Result<()> { + let registry = Registry::new(); + for bc in collectors { + registry.register(bc)?; + } + + let mfs = registry.gather(); + push_async(job, grouping, url, mfs, method, basic_auth).await +} + /// `push_collector` push metrics collected from the provided collectors. It is /// a convenient way to push only a few metrics. pub fn push_collector( @@ -200,7 +284,20 @@ pub fn push_collector( push_from_collector(job, grouping, url, collectors, "PUT", basic_auth) } -/// `push_add_collector` works like `push_add_metrics`, it collects from the +/// `push_collector_async` is just an async version of `push_collector`. +/// Pushes metrics collected from the provided collectors. It is +/// a convenient way to push only a few metrics. +pub async fn push_collector_async<'a, S: BuildHasher>( + job: &'a str, + grouping: HashMap, + url: &'a str, + collectors: Vec>, + basic_auth: Option, +) -> Result<()> { + push_from_collector_async(job, grouping, url, collectors, "PUT", basic_auth).await +} + +/// `push_add_collector` works like `push_add_collector`, it collects from the /// provided collectors. It is a convenient way to push only a few metrics. pub fn push_add_collector( job: &str, @@ -212,6 +309,19 @@ pub fn push_add_collector( push_from_collector(job, grouping, url, collectors, "POST", basic_auth) } +/// `push_add_collector_async` works like `push_add_collector`, but async. +/// It collects from the provided collectors. It is a convenient way to push +/// only a few metrics. +pub async fn push_add_collector_async<'a, S: BuildHasher>( + job: &'a str, + grouping: HashMap, + url: &'a str, + collectors: Vec>, + basic_auth: Option, +) -> Result<()> { + push_from_collector_async(job, grouping, url, collectors, "POST", basic_auth).await +} + const DEFAULT_GROUP_LABEL_PAIR: (&str, &str) = ("instance", "unknown"); /// `hostname_grouping_key` returns a label map with the only entry