Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ humantime.workspace = true
hyper = { version = "0.14.32", default-features = false, features = ["client", "runtime", "http1", "http2", "server", "stream", "backports", "deprecated"] }
hyper-openssl = { version = "0.9.2", default-features = false }
hyper-proxy = { version = "0.9.1", default-features = false, features = ["openssl-tls"] }
hyperlocal = "0.8.0" # newer versions require hyper 1.x
indexmap.workspace = true
inventory = { version = "0.3.20", default-features = false }
ipnet = { version = "2", default-features = false, optional = true, features = ["serde", "std"] }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `prometheus_scrape` source can now scrape metrics from Unix sockets. To use this feature, you must provide the path to the Unix socket, and you can also provide the path in the URI (i.e., `/metrics`).

authors: weriomat
2 changes: 1 addition & 1 deletion src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ pub fn build_tls_connector(
Ok(https)
}

fn default_request_headers<B>(request: &mut Request<B>, user_agent: &HeaderValue) {
pub fn default_request_headers<B>(request: &mut Request<B>, user_agent: &HeaderValue) {
if !request.headers().contains_key("User-Agent") {
request
.headers_mut()
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ pub mod trace;
pub mod transforms;
pub mod types;
pub mod unit_test;
#[cfg(unix)]
pub mod unix_http;
pub(crate) mod utilization;
pub mod validate;
#[cfg(windows)]
Expand Down
123 changes: 113 additions & 10 deletions src/sources/prometheus/scrape.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::{collections::HashMap, time::Duration};
use bytes::Bytes;
use futures_util::FutureExt;
use http::{Uri, response::Parts};
#[cfg(unix)]
use hyperlocal::Uri as UnixUri;
use serde_with::serde_as;
use snafu::ResultExt;
use vector_lib::{config::LogNamespace, configurable::configurable_component, event::Event};
Expand Down Expand Up @@ -34,6 +36,47 @@ static NOT_FOUND_NO_PATH: &str = "No path is set on the endpoint and we got a 40
did you mean to use /metrics?\
This behavior changed in version 0.11.";

/// Unix socket configuration to scrape metrics from
#[cfg(unix)]
#[serde_as]
#[configurable_component]
#[derive(Clone, Debug)]
pub struct UnixEndpoint {
/// The Path to the unix socket
#[configurable(metadata(docs::examples = "/var/run/forgejo/forgejo.sock"))]
path: String,
/// The Path in the Uri
#[serde(default = "default_prometheus_path")]
#[configurable(metadata(docs::examples = "/metrics"))]
#[configurable(metadata(docs::examples = "/api/v1/metrics"))]
endpoint: String,
}

/// Uri Configuration to scrape metrics from
#[serde_as]
#[configurable_component]
#[serde(untagged)]
#[derive(Clone, Debug)]
pub enum PrometheusEndpoint {
/// The Url to scrape metrics from
#[configurable(metadata(docs::examples = "http://localhost:9090/metrics"))]
Url(String),
/// The URI to scrape metrics from a unix socket
#[cfg(unix)]
Unix(UnixEndpoint),
}

impl From<std::string::String> for PrometheusEndpoint {
fn from(str: std::string::String) -> Self {
Self::Url(str)
}
}

/// The default path in the Uri for Unix Endpoint
pub(crate) fn default_prometheus_path() -> String {
String::from("/metrics")
}

/// Configuration for the `prometheus_scrape` source.
#[serde_as]
#[configurable_component(source(
Expand All @@ -43,9 +86,8 @@ static NOT_FOUND_NO_PATH: &str = "No path is set on the endpoint and we got a 40
#[derive(Clone, Debug)]
pub struct PrometheusScrapeConfig {
/// Endpoints to scrape metrics from.
#[configurable(metadata(docs::examples = "http://localhost:9090/metrics"))]
#[serde(alias = "hosts")]
endpoints: Vec<String>,
endpoints: Vec<PrometheusEndpoint>,

/// The interval between scrapes. Requests are run concurrently so if a scrape takes longer
/// than the interval a new scrape will be started. This can take extra resources, set the timeout
Expand Down Expand Up @@ -115,7 +157,9 @@ fn query_example() -> serde_json::Value {
impl GenerateConfig for PrometheusScrapeConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
endpoints: vec!["http://localhost:9090/metrics".to_string()],
endpoints: vec![sources::prometheus::scrape::PrometheusEndpoint::Url(
"http://localhost:9090/metrics".to_string(),
)],
interval: default_interval(),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
Expand All @@ -136,9 +180,14 @@ impl SourceConfig for PrometheusScrapeConfig {
let urls = self
.endpoints
.iter()
.map(|s| s.parse::<Uri>().context(sources::UriParseSnafu))
.map(|t| match t {
PrometheusEndpoint::Url(s) => s.parse::<Uri>().context(sources::UriParseSnafu),
#[cfg(unix)]
PrometheusEndpoint::Unix(u) => Ok(UnixUri::new(&u.path, &u.endpoint).into()).into(),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid panicking when building Unix scrape URI

Constructing the Unix endpoint with UnixUri::new(&u.path, &u.endpoint) can panic for malformed or non-absolute endpoint values, which turns a user config error into a process crash during source build instead of returning a normal BuildError. This is reachable on Unix when endpoints = [{ path = "...", endpoint = "metrics" }] (missing leading slash), so startup robustness regresses for invalid configs.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a limitation used by the library in this case. This can be circumvented by reimplementing this part of the library. Can be implemented if requested.

})
.map(|r| r.map(|uri| build_url(&uri, &self.query)))
.collect::<std::result::Result<Vec<Uri>, sources::BuildError>>()?;

let tls = TlsSettings::from_options(self.tls.as_ref())?;

let builder = PrometheusScrapeBuilder {
Expand Down Expand Up @@ -358,7 +407,58 @@ mod test {
wait_for_tcp(in_addr).await;

let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
endpoints: vec![format!("http://{}/metrics", in_addr).into()],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: true,
query: HashMap::new(),
auth: None,
tls: None,
};

let events = run_and_assert_source_compliance(
config,
Duration::from_secs(3),
&HTTP_PULL_SOURCE_TAGS,
)
.await;
assert!(!events.is_empty());
}

#[cfg(unix)]
#[tokio::test]
async fn test_prometheus_unix_socket() {
use tempfile::tempdir;
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;

use crate::test_util::wait_for_unix_socket;

// Cleanup once dropped
let dir = tempdir().unwrap();
let addr = dir.path().join("prometheus_unix.sock");

let dummy_endpoint = warp::path!("metrics").and(warp::header::exact("Accept", "text/plain")).map(|| {
r#"
promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
"#
});

let listener = UnixListener::bind(&addr).unwrap();
let incoming = UnixListenerStream::new(listener);

tokio::spawn(warp::serve(dummy_endpoint).run_incoming(incoming));
wait_for_unix_socket(addr.clone()).await;

let config = PrometheusScrapeConfig {
endpoints: vec![sources::prometheus::scrape::PrometheusEndpoint::Unix(
UnixEndpoint {
path: addr.clone().into_os_string().into_string().unwrap(),
endpoint: "/metrics".to_string(),
},
)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
Expand All @@ -376,6 +476,9 @@ mod test {
)
.await;
assert!(!events.is_empty());

// cleanup unix socket
tokio::fs::remove_file(addr).await.unwrap();
}

#[tokio::test]
Expand All @@ -392,7 +495,7 @@ mod test {
wait_for_tcp(in_addr).await;

let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
endpoints: vec![format!("http://{}/metrics", in_addr).into()],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
Expand Down Expand Up @@ -444,7 +547,7 @@ mod test {
wait_for_tcp(in_addr).await;

let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
endpoints: vec![format!("http://{}/metrics", in_addr).into()],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
Expand Down Expand Up @@ -510,7 +613,7 @@ mod test {
wait_for_tcp(in_addr).await;

let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
endpoints: vec![format!("http://{}/metrics", in_addr).into()],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
Expand Down Expand Up @@ -565,7 +668,7 @@ mod test {
wait_for_tcp(in_addr).await;

let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics?key1=val1", in_addr)],
endpoints: vec![format!("http://{}/metrics?key1=val1", in_addr).into()],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
Expand Down Expand Up @@ -681,7 +784,7 @@ mod test {
config.add_source(
"in",
PrometheusScrapeConfig {
endpoints: vec![format!("http://{}", in_addr)],
endpoints: vec![format!("http://{}", in_addr).into()],
instance_tag: None,
endpoint_tag: None,
honor_labels: false,
Expand Down
Loading
Loading