Skip to content

Fix #342: Async support for push_metrics #343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ pub use self::histogram::{Histogram, HistogramOpts, HistogramTimer, HistogramVec
pub use self::metrics::Opts;
#[cfg(feature = "push")]
pub use self::push::{
hostname_grouping_key, push_add_collector, push_add_metrics, push_collector, push_metrics,
hostname_grouping_key, push_add_collector, push_add_collector_async, push_add_metrics,
push_add_metrics_async, push_collector, push_collector_async, push_metrics, push_metrics_async,
BasicAuthentication,
};
pub use self::registry::Registry;
Expand Down
140 changes: 125 additions & 15 deletions src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@ use crate::registry::Registry;

const REQWEST_TIMEOUT_SEC: Duration = Duration::from_secs(10);

lazy_static! {
static ref HTTP_CLIENT: Client = Client::builder()
.timeout(REQWEST_TIMEOUT_SEC)
.build()
.unwrap();
}

/// `BasicAuthentication` holder for supporting `push` to Pushgateway endpoints
/// using Basic access authentication.
/// Can be passed to any `push_metrics` method.
Expand Down Expand Up @@ -60,6 +53,18 @@ pub fn push_metrics<S: BuildHasher>(
push(job, grouping, url, mfs, "PUT", basic_auth)
}

/// Functions just like `push_metrics`, except the metrics are pushed
/// asynchronously.
pub async fn push_metrics_async<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
push_async(job, grouping, url, mfs, "PUT", basic_auth).await
}

/// `push_add_metrics` works like `push_metrics`, but only previously pushed
/// metrics with the same name (and the same job and other grouping labels) will
/// be replaced. (It uses HTTP method 'POST' to push to the Pushgateway.)
Expand All @@ -73,16 +78,25 @@ pub fn push_add_metrics<S: BuildHasher>(
push(job, grouping, url, mfs, "POST", basic_auth)
}

/// `push_add_metrics_async` works like `push_metrics`, but async.
pub async fn push_add_metrics_async<'a, S: BuildHasher>(
job: &'a str,
grouping: HashMap<String, String, S>,
url: &'a str,
mfs: Vec<proto::MetricFamily>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
push(job, grouping, url, mfs, "POST", basic_auth)
}

const LABEL_NAME_JOB: &str = "job";

fn push<S: BuildHasher>(
fn configure_push<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
method: &str,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
) -> Result<(String, impl Encoder, Vec<u8>)> {
// Suppress clippy warning needless_pass_by_value.
let grouping = grouping;

Expand Down Expand Up @@ -145,7 +159,24 @@ fn push<S: BuildHasher>(
// Ignore error, `no metrics` and `no name`.
let _ = encoder.encode(&[mf], &mut buf);
}
Ok((push_url, encoder, buf))
}

fn push<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
method: &str,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
let (push_url, encoder, buf) = configure_push(job, grouping, url, mfs)?;
lazy_static! {
static ref HTTP_CLIENT: Client = Client::builder()
.timeout(REQWEST_TIMEOUT_SEC)
.build()
.unwrap();
}
let mut builder = HTTP_CLIENT
.request(
Method::from_str(method).unwrap(),
Expand All @@ -159,14 +190,50 @@ fn push<S: BuildHasher>(
}

let response = builder.send().map_err(|e| Error::Msg(format!("{}", e)))?;
handle_push_response(response.status(), push_url)
}

async fn push_async<S: BuildHasher>(
Copy link
Member

@breezewish breezewish Oct 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is very similar to fn push. Is it possible to use something like https://docs.rs/crate/reqwest/0.10.8/source/src/blocking/wait.rs to share common logic between the async impl and blocking impl?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what the linked code has to do with duplication. I tried to reduce duplication as much as possible by breaking common code into their own functions which are called inside both async_push and push. But this can only go so far, because the underlying reqwest structs use different types.

job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
method: &str,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
let (push_url, encoder, buf) = configure_push(job, grouping, url, mfs)?;
lazy_static! {
static ref ASYNC_HTTP_CLIENT: reqwest::Client = reqwest::Client::builder()
.timeout(REQWEST_TIMEOUT_SEC)
.build()
.unwrap();
}
let mut builder = ASYNC_HTTP_CLIENT
.request(
Method::from_str(method).unwrap(),
Url::from_str(&push_url).unwrap(),
)
.header(CONTENT_TYPE, encoder.format_type())
.body(buf);

if let Some(BasicAuthentication { username, password }) = basic_auth {
builder = builder.basic_auth(username, Some(password));
}

match response.status() {
let response = builder
.send()
.await
.map_err(|e| Error::Msg(format!("{}", e)))?;
handle_push_response(response.status(), push_url)
}

fn handle_push_response(status: StatusCode, push_url: String) -> Result<()> {
match status {
StatusCode::ACCEPTED => Ok(()),
StatusCode::OK => Ok(()),
_ => Err(Error::Msg(format!(
"unexpected status code {} while pushing to {}",
response.status(),
push_url
status, push_url
))),
}
}
Expand All @@ -188,6 +255,23 @@ fn push_from_collector<S: BuildHasher>(
push(job, grouping, url, mfs, method, basic_auth)
}

async fn push_from_collector_async<'a, S: BuildHasher>(
job: &'a str,
grouping: HashMap<String, String, S>,
url: &'a str,
collectors: Vec<Box<dyn Collector>>,
method: &'a str,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
let registry = Registry::new();
for bc in collectors {
registry.register(bc)?;
}

let mfs = registry.gather();
push_async(job, grouping, url, mfs, method, basic_auth).await
}

/// `push_collector` push metrics collected from the provided collectors. It is
/// a convenient way to push only a few metrics.
pub fn push_collector<S: BuildHasher>(
Expand All @@ -200,7 +284,20 @@ pub fn push_collector<S: BuildHasher>(
push_from_collector(job, grouping, url, collectors, "PUT", basic_auth)
}

/// `push_add_collector` works like `push_add_metrics`, it collects from the
/// `push_collector_async` is just an async version of `push_collector`.
/// Pushes metrics collected from the provided collectors. It is
/// a convenient way to push only a few metrics.
pub async fn push_collector_async<'a, S: BuildHasher>(
job: &'a str,
grouping: HashMap<String, String, S>,
url: &'a str,
collectors: Vec<Box<dyn Collector>>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
push_from_collector_async(job, grouping, url, collectors, "PUT", basic_auth).await
}

/// `push_add_collector` works like `push_add_collector`, it collects from the
/// provided collectors. It is a convenient way to push only a few metrics.
pub fn push_add_collector<S: BuildHasher>(
job: &str,
Expand All @@ -212,6 +309,19 @@ pub fn push_add_collector<S: BuildHasher>(
push_from_collector(job, grouping, url, collectors, "POST", basic_auth)
}

/// `push_add_collector_async` works like `push_add_collector`, but async.
/// It collects from the provided collectors. It is a convenient way to push
/// only a few metrics.
pub async fn push_add_collector_async<'a, S: BuildHasher>(
job: &'a str,
grouping: HashMap<String, String, S>,
url: &'a str,
collectors: Vec<Box<dyn Collector>>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
push_from_collector_async(job, grouping, url, collectors, "POST", basic_auth).await
}

const DEFAULT_GROUP_LABEL_PAIR: (&str, &str) = ("instance", "unknown");

/// `hostname_grouping_key` returns a label map with the only entry
Expand Down