diff --git a/Cargo.lock b/Cargo.lock index 9c90509352..76bd101ba4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1725,6 +1725,7 @@ dependencies = [ "linkerd-io", "linkerd-stack", "linkerd-tracing", + "prometheus-client", "thiserror 2.0.12", "tokio", "tokio-test", diff --git a/linkerd/app/admin/src/stack.rs b/linkerd/app/admin/src/stack.rs index 7fb5608d7a..10a48245eb 100644 --- a/linkerd/app/admin/src/stack.rs +++ b/linkerd/app/admin/src/stack.rs @@ -122,6 +122,7 @@ impl Config { .push_on_service(http::BoxResponse::layer()) .arc_new_clone_http(); + let inbound::DetectMetrics(detect_metrics) = metrics.detect.clone(); let tcp = http .unlift_new() .push(http::NewServeHttp::layer({ @@ -177,9 +178,12 @@ impl Config { ) .arc_new_tcp() .lift_new_with_target() - .push(http::NewDetect::layer(svc::stack::CloneParam::from( - http::DetectParams { read_timeout: DETECT_TIMEOUT } - ))) + .push(http::NewDetect::layer(move |tcp: &Tcp| { + http::DetectParams { + read_timeout: DETECT_TIMEOUT, + metrics: detect_metrics.metrics(tcp.policy.server_label()) + } + })) .push(transport::metrics::NewServer::layer(metrics.proxy.transport)) .push_map_target(move |(tls, addrs): (tls::ConditionalServerTls, B::Addrs)| { Tcp { diff --git a/linkerd/app/inbound/src/accept.rs b/linkerd/app/inbound/src/accept.rs index d3c1516f75..c6dd083dbb 100644 --- a/linkerd/app/inbound/src/accept.rs +++ b/linkerd/app/inbound/src/accept.rs @@ -182,7 +182,11 @@ mod tests { } fn inbound() -> Inbound<()> { - Inbound::new(test_util::default_config(), test_util::runtime().0) + Inbound::new( + test_util::default_config(), + test_util::runtime().0, + &mut Default::default(), + ) } fn new_panic(msg: &'static str) -> svc::ArcNewTcp { diff --git a/linkerd/app/inbound/src/detect.rs b/linkerd/app/inbound/src/detect.rs index 8286835d6c..4f71bb32b5 100644 --- a/linkerd/app/inbound/src/detect.rs +++ b/linkerd/app/inbound/src/detect.rs @@ -4,7 +4,7 @@ use crate::{ }; use linkerd_app_core::{ identity, io, - metrics::ServerLabel, + metrics::{prom, ServerLabel}, proxy::http, svc, tls, transport::{ @@ -20,6 +20,10 @@ use tracing::info; #[cfg(test)] mod tests; +#[derive(Clone, Debug)] +pub struct MetricsFamilies(pub HttpDetectMetrics); +pub type HttpDetectMetrics = http::DetectMetricsFamilies; + #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) struct Forward { client_addr: Remote, @@ -61,7 +65,11 @@ type TlsIo = tls::server::Io>, I> impl Inbound> { /// Builds a stack that terminates mesh TLS and detects whether the traffic is HTTP (as hinted /// by policy). - pub(crate) fn push_detect(self, forward: F) -> Inbound> + pub(crate) fn push_detect( + self, + MetricsFamilies(metrics): MetricsFamilies, + forward: F, + ) -> Inbound> where T: svc::Param + svc::Param> + svc::Param, T: Clone + Send + 'static, @@ -72,14 +80,18 @@ impl Inbound> { FSvc::Error: Into, FSvc::Future: Send, { - self.push_detect_http(forward.clone()) + self.push_detect_http(metrics, forward.clone()) .push_detect_tls(forward) } /// Builds a stack that handles HTTP detection once TLS detection has been performed. If the /// connection is determined to be HTTP, the inner stack is used; otherwise the connection is /// passed to the provided 'forward' stack. - fn push_detect_http(self, forward: F) -> Inbound> + fn push_detect_http( + self, + metrics: HttpDetectMetrics, + forward: F, + ) -> Inbound> where I: io::AsyncRead + io::AsyncWrite + io::PeerAddr, I: Debug + Send + Sync + Unpin + 'static, @@ -153,11 +165,12 @@ impl Inbound> { forward.into_inner(), ) .lift_new_with_target() - .push(http::NewDetect::layer(|Detect { timeout, .. }: &Detect| { - http::DetectParams { + .push(http::NewDetect::layer( + move |Detect { timeout, tls }: &Detect| http::DetectParams { read_timeout: *timeout, - } - })) + metrics: metrics.metrics(tls.policy.server_label()), + }, + )) .arc_new_tcp(); http.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new)) @@ -445,3 +458,13 @@ impl svc::InsertParam for TlsParams { (tls, target) } } + +// === impl MetricsFamilies === + +impl MetricsFamilies { + pub fn register(reg: &mut prom::Registry) -> Self { + Self(http::DetectMetricsFamilies::register( + reg.sub_registry_with_prefix("http"), + )) + } +} diff --git a/linkerd/app/inbound/src/detect/tests.rs b/linkerd/app/inbound/src/detect/tests.rs index 002717824b..170050f30c 100644 --- a/linkerd/app/inbound/src/detect/tests.rs +++ b/linkerd/app/inbound/src/detect/tests.rs @@ -13,6 +13,12 @@ const HTTP1: &[u8] = b"GET / HTTP/1.1\r\nhost: example.com\r\n\r\n"; const HTTP2: &[u8] = b"PRI * HTTP/2.0\r\n"; const NOT_HTTP: &[u8] = b"foo\r\nbar\r\nblah\r\n"; +const RESULTS_NOT_HTTP: &str = "results_total{result=\"not_http\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}"; +const RESULTS_HTTP1: &str = "results_total{result=\"http/1\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}"; +const RESULTS_HTTP2: &str = "results_total{result=\"http/2\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}"; +const RESULTS_READ_TIMEOUT: &str = "results_total{result=\"read_timeout\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}"; +const RESULTS_ERROR: &str = "results_total{result=\"error\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}"; + fn authzs() -> Arc<[Authorization]> { Arc::new([Authorization { authentication: Authentication::Unauthenticated, @@ -41,6 +47,35 @@ fn allow(protocol: Protocol) -> AllowPolicy { allow } +macro_rules! assert_contains_metric { + ($registry:expr, $metric:expr, $value:expr) => {{ + let mut buf = String::new(); + prom::encoding::text::encode_registry(&mut buf, $registry).expect("encode registry failed"); + let lines = buf.split_terminator('\n').collect::>(); + assert_eq!( + lines.iter().find(|l| l.starts_with($metric)), + Some(&&*format!("{} {}", $metric, $value)), + "metric '{}' not found in:\n{:?}", + $metric, + buf + ); + }}; +} + +macro_rules! assert_not_contains_metric { + ($registry:expr, $pattern:expr) => {{ + let mut buf = String::new(); + prom::encoding::text::encode_registry(&mut buf, $registry).expect("encode registry failed"); + let lines = buf.split_terminator('\n').collect::>(); + assert!( + !lines.iter().any(|l| l.starts_with($pattern)), + "metric '{}' found in:\n{:?}", + $pattern, + buf + ); + }}; +} + #[tokio::test(flavor = "current_thread")] async fn detect_tls_opaque() { let _trace = trace::test::trace_init(); @@ -77,14 +112,21 @@ async fn detect_http_non_http() { let (ior, mut iow) = io::duplex(100); iow.write_all(NOT_HTTP).await.unwrap(); + let mut registry = prom::Registry::default(); inbound() .with_stack(new_panic("http stack must not be used")) - .push_detect_http(new_ok()) + .push_detect_http(super::HttpDetectMetrics::register(&mut registry), new_ok()) .into_inner() .new_service(target) .oneshot(ior) .await .expect("should succeed"); + + assert_contains_metric!(®istry, RESULTS_NOT_HTTP, 1); + assert_contains_metric!(®istry, RESULTS_HTTP1, 0); + assert_contains_metric!(®istry, RESULTS_HTTP2, 0); + assert_contains_metric!(®istry, RESULTS_READ_TIMEOUT, 0); + assert_contains_metric!(®istry, RESULTS_ERROR, 0); } #[tokio::test(flavor = "current_thread")] @@ -108,14 +150,24 @@ async fn detect_http() { let (ior, mut iow) = io::duplex(100); iow.write_all(HTTP1).await.unwrap(); + let mut registry = prom::Registry::default(); inbound() .with_stack(new_ok()) - .push_detect_http(new_panic("tcp stack must not be used")) + .push_detect_http( + super::HttpDetectMetrics::register(&mut registry), + new_panic("tcp stack must not be used"), + ) .into_inner() .new_service(target) .oneshot(ior) .await .expect("should succeed"); + + assert_contains_metric!(®istry, RESULTS_NOT_HTTP, 0); + assert_contains_metric!(®istry, RESULTS_HTTP1, 1); + assert_contains_metric!(®istry, RESULTS_HTTP2, 0); + assert_contains_metric!(®istry, RESULTS_READ_TIMEOUT, 0); + assert_contains_metric!(®istry, RESULTS_ERROR, 0); } #[tokio::test(flavor = "current_thread")] @@ -134,14 +186,24 @@ async fn hinted_http1() { let (ior, mut iow) = io::duplex(100); iow.write_all(HTTP1).await.unwrap(); + let mut registry = prom::Registry::default(); inbound() .with_stack(new_ok()) - .push_detect_http(new_panic("tcp stack must not be used")) + .push_detect_http( + super::HttpDetectMetrics::register(&mut registry), + new_panic("tcp stack must not be used"), + ) .into_inner() .new_service(target) .oneshot(ior) .await .expect("should succeed"); + + assert_contains_metric!(®istry, RESULTS_NOT_HTTP, 0); + assert_contains_metric!(®istry, RESULTS_HTTP1, 1); + assert_contains_metric!(®istry, RESULTS_HTTP2, 0); + assert_contains_metric!(®istry, RESULTS_READ_TIMEOUT, 0); + assert_contains_metric!(®istry, RESULTS_ERROR, 0); } #[tokio::test(flavor = "current_thread")] @@ -160,14 +222,24 @@ async fn hinted_http1_supports_http2() { let (ior, mut iow) = io::duplex(100); iow.write_all(HTTP2).await.unwrap(); + let mut registry = prom::Registry::default(); inbound() .with_stack(new_ok()) - .push_detect_http(new_panic("tcp stack must not be used")) + .push_detect_http( + super::HttpDetectMetrics::register(&mut registry), + new_panic("tcp stack must not be used"), + ) .into_inner() .new_service(target) .oneshot(ior) .await .expect("should succeed"); + + assert_contains_metric!(®istry, RESULTS_NOT_HTTP, 0); + assert_contains_metric!(®istry, RESULTS_HTTP1, 0); + assert_contains_metric!(®istry, RESULTS_HTTP2, 1); + assert_contains_metric!(®istry, RESULTS_READ_TIMEOUT, 0); + assert_contains_metric!(®istry, RESULTS_ERROR, 0); } #[tokio::test(flavor = "current_thread")] @@ -185,14 +257,25 @@ async fn hinted_http2() { let (ior, _) = io::duplex(100); + let mut registry = prom::Registry::default(); inbound() .with_stack(new_ok()) - .push_detect_http(new_panic("tcp stack must not be used")) + .push_detect_http( + super::HttpDetectMetrics::register(&mut registry), + new_panic("tcp stack must not be used"), + ) .into_inner() .new_service(target) .oneshot(ior) .await .expect("should succeed"); + + // No detection is performed when HTTP/2 is hinted, so no metrics are recorded. + assert_not_contains_metric!(®istry, RESULTS_NOT_HTTP); + assert_not_contains_metric!(®istry, RESULTS_HTTP1); + assert_not_contains_metric!(®istry, RESULTS_HTTP2); + assert_not_contains_metric!(®istry, RESULTS_READ_TIMEOUT); + assert_not_contains_metric!(®istry, RESULTS_ERROR); } fn client_id() -> tls::ClientId { @@ -210,7 +293,11 @@ fn orig_dst_addr() -> OrigDstAddr { } fn inbound() -> Inbound<()> { - Inbound::new(test_util::default_config(), test_util::runtime().0) + Inbound::new( + test_util::default_config(), + test_util::runtime().0, + &mut Default::default(), + ) } fn new_panic(msg: &'static str) -> svc::ArcNewTcp { diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index d4187b67cc..0f1c0308d5 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -33,7 +33,7 @@ fn build_server( where I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + Send + Unpin + 'static, { - Inbound::new(cfg, rt) + Inbound::new(cfg, rt, &mut Default::default()) .with_stack(connect) .map_stack(|cfg, _, s| { s.push_map_target(|t| Param::>::param(&t)) diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index 94f13701f6..f8813a7413 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -20,12 +20,15 @@ pub mod test_util; #[cfg(fuzzing)] pub use self::http::fuzz as http_fuzz; -pub use self::{metrics::InboundMetrics, policy::DefaultPolicy}; +pub use self::{ + detect::MetricsFamilies as DetectMetrics, metrics::InboundMetrics, policy::DefaultPolicy, +}; use linkerd_app_core::{ config::{ConnectConfig, ProxyConfig, QueueConfig}, drain, http_tracing::SpanSink, identity, io, + metrics::prom, proxy::{tap, tcp}, svc, transport::{self, Remote, ServerAddr}, @@ -148,9 +151,9 @@ impl Inbound { } impl Inbound<()> { - pub fn new(config: Config, runtime: ProxyRuntime) -> Self { + pub fn new(config: Config, runtime: ProxyRuntime, prom: &mut prom::Registry) -> Self { let runtime = Runtime { - metrics: InboundMetrics::new(runtime.metrics), + metrics: InboundMetrics::new(runtime.metrics, prom), identity: runtime.identity, tap: runtime.tap, span_sink: runtime.span_sink, @@ -166,7 +169,11 @@ impl Inbound<()> { #[cfg(any(test, feature = "test-util"))] pub fn for_test() -> (Self, drain::Signal) { let (rt, drain) = test_util::runtime(); - let this = Self::new(test_util::default_config(), rt); + let this = Self::new( + test_util::default_config(), + rt, + &mut prom::Registry::default(), + ); (this, drain) } diff --git a/linkerd/app/inbound/src/metrics.rs b/linkerd/app/inbound/src/metrics.rs index 6f803eb6d1..dcb52a71bc 100644 --- a/linkerd/app/inbound/src/metrics.rs +++ b/linkerd/app/inbound/src/metrics.rs @@ -25,16 +25,22 @@ pub struct InboundMetrics { /// Holds metrics that are common to both inbound and outbound proxies. These metrics are /// reported separately pub proxy: Proxy, + + pub detect: crate::detect::MetricsFamilies, } impl InboundMetrics { - pub(crate) fn new(proxy: Proxy) -> Self { + pub(crate) fn new(proxy: Proxy, reg: &mut prom::Registry) -> Self { + let detect = + crate::detect::MetricsFamilies::register(reg.sub_registry_with_prefix("tcp_detect")); + Self { http_authz: authz::HttpAuthzMetrics::default(), http_errors: error::HttpErrorMetrics::default(), tcp_authz: authz::TcpAuthzMetrics::default(), tcp_errors: error::TcpErrorMetrics::default(), proxy, + detect, } } } diff --git a/linkerd/app/inbound/src/server.rs b/linkerd/app/inbound/src/server.rs index 85990cbc8b..b8458fcce3 100644 --- a/linkerd/app/inbound/src/server.rs +++ b/linkerd/app/inbound/src/server.rs @@ -55,6 +55,8 @@ impl Inbound<()> { I: Debug + Unpin + Send + Sync + 'static, P: profiles::GetProfile, { + let detect_metrics = self.runtime.metrics.detect.clone(); + // Handles connections to ports that can't be determined to be HTTP. let forward = self .clone() @@ -97,7 +99,7 @@ impl Inbound<()> { // Determines how to handle an inbound connection, dispatching it to the appropriate // stack. http.push_http_tcp_server() - .push_detect(forward) + .push_detect(detect_metrics, forward) .push_accept(addr.port(), policies, direct) .into_inner() } diff --git a/linkerd/app/outbound/src/ingress.rs b/linkerd/app/outbound/src/ingress.rs index 5ff5dda599..86dad06bfa 100644 --- a/linkerd/app/outbound/src/ingress.rs +++ b/linkerd/app/outbound/src/ingress.rs @@ -274,6 +274,15 @@ impl Outbound { NSvc::Future: Send, { self.map_stack(|config, rt, inner| { + let detect_params = http::DetectParams { + read_timeout: config.proxy.detect_protocol_timeout, + metrics: rt + .metrics + .prom + .http_detect + .metrics(ParentRef(policy::Meta::new_default("ingress"))), + }; + // Route requests with destinations that can be discovered via the // `l5d-dst-override` header through the (load balanced) logical // stack. Route requests without the header through the endpoint @@ -329,11 +338,7 @@ impl Outbound { fallback, ) .lift_new_with_target() - .push(http::NewDetect::layer(svc::CloneParam::from( - http::DetectParams { - read_timeout: config.proxy.detect_protocol_timeout, - }, - ))) + .push(http::NewDetect::layer(svc::CloneParam::from(detect_params))) .arc_new_tcp() }) } diff --git a/linkerd/app/outbound/src/metrics.rs b/linkerd/app/outbound/src/metrics.rs index 81d40b8419..033409ecd9 100644 --- a/linkerd/app/outbound/src/metrics.rs +++ b/linkerd/app/outbound/src/metrics.rs @@ -36,6 +36,7 @@ pub struct OutboundMetrics { #[derive(Clone, Debug, Default)] pub(crate) struct PromMetrics { + pub(crate) http_detect: crate::http::DetectMetricsFamilies, pub(crate) http: crate::http::HttpMetrics, pub(crate) opaq: crate::opaq::OpaqMetrics, pub(crate) tls: crate::tls::TlsMetrics, @@ -88,6 +89,11 @@ where impl PromMetrics { pub fn register(registry: &mut prom::Registry) -> Self { + let http_detect = crate::http::DetectMetricsFamilies::register( + // Scoped consistently with the inbound metrics. + registry.sub_registry_with_prefix("tcp_detect_http"), + ); + // NOTE: HTTP metrics are scoped internally, since this configures both // HTTP and gRPC scopes. let http = crate::http::HttpMetrics::register(registry); @@ -97,6 +103,7 @@ impl PromMetrics { let tls = crate::tls::TlsMetrics::register(registry.sub_registry_with_prefix("tls")); Self { + http_detect, http, opaq, tls, diff --git a/linkerd/app/outbound/src/protocol.rs b/linkerd/app/outbound/src/protocol.rs index c046ef00c8..03c903a5f1 100644 --- a/linkerd/app/outbound/src/protocol.rs +++ b/linkerd/app/outbound/src/protocol.rs @@ -1,4 +1,4 @@ -use crate::{http, Outbound}; +use crate::{http, Outbound, ParentRef}; use linkerd_app_core::{io, svc, Error, Infallible}; use std::{fmt::Debug, hash::Hash}; @@ -35,6 +35,7 @@ impl Outbound { where // Target type indicating whether detection should be skipped. T: svc::Param, + T: svc::Param, T: Eq + Hash + Clone + Debug + Send + Sync + 'static, // Server-side socket. I: io::AsyncRead + io::AsyncWrite + io::PeerAddr, @@ -62,7 +63,10 @@ impl Outbound { .arc_new_tcp() }); - let detect = http.clone().map_stack(|config, _, http| { + let detect = http.clone().map_stack(|config, rt, http| { + let read_timeout = config.proxy.detect_protocol_timeout; + let metrics = rt.metrics.prom.http_detect.clone(); + http.push_switch( |(detected, parent): (http::Detection, T)| -> Result<_, Infallible> { match detected { @@ -84,11 +88,12 @@ impl Outbound { .push_on_service(svc::LoadShed::layer()) .push_on_service(svc::MapTargetLayer::new(io::EitherIo::Right)) .lift_new_with_target::<(http::Detection, T)>() - .push(http::NewDetect::layer(svc::CloneParam::from( + .push(http::NewDetect::layer(move |parent: &T| { http::DetectParams { - read_timeout: config.proxy.detect_protocol_timeout, - }, - ))) + read_timeout, + metrics: metrics.metrics(parent.param()), + } + })) .arc_new_tcp() }); diff --git a/linkerd/app/outbound/src/sidecar.rs b/linkerd/app/outbound/src/sidecar.rs index f1315a28d6..d04d5c6543 100644 --- a/linkerd/app/outbound/src/sidecar.rs +++ b/linkerd/app/outbound/src/sidecar.rs @@ -158,6 +158,12 @@ impl svc::Param for Sidecar { } } +impl svc::Param for Sidecar { + fn param(&self) -> ParentRef { + ParentRef(self.policy.borrow().parent.clone()) + } +} + impl PartialEq for Sidecar { fn eq(&self, other: &Self) -> bool { self.orig_dst == other.orig_dst diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index 4fceec08dd..2e8cc25c4a 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -219,7 +219,11 @@ impl Config { span_sink: trace_collector.span_sink(), drain: drain_rx.clone(), }; - let inbound = Inbound::new(inbound, runtime.clone()); + let inbound = Inbound::new( + inbound, + runtime.clone(), + registry.sub_registry_with_prefix("inbound"), + ); let outbound = Outbound::new( outbound, runtime, diff --git a/linkerd/http/detect/Cargo.toml b/linkerd/http/detect/Cargo.toml index ba6c1e6c3d..d34bdd8501 100644 --- a/linkerd/http/detect/Cargo.toml +++ b/linkerd/http/detect/Cargo.toml @@ -7,6 +7,7 @@ publish = false [dependencies] bytes = { workspace = true } httparse = "1" +prometheus-client = "0.22" thiserror = "2" tokio = { version = "1", features = ["time"] } tracing = { version = "0.1" } diff --git a/linkerd/http/detect/src/lib.rs b/linkerd/http/detect/src/lib.rs index c1910414ea..a75bb30021 100644 --- a/linkerd/http/detect/src/lib.rs +++ b/linkerd/http/detect/src/lib.rs @@ -11,9 +11,14 @@ use std::{ use tokio::time; use tracing::{debug, trace}; -#[derive(Copy, Clone, Debug)] +mod metrics; + +pub use self::metrics::{DetectMetrics, DetectMetricsFamilies}; + +#[derive(Clone, Debug, Default)] pub struct DetectParams { pub read_timeout: time::Duration, + pub metrics: metrics::DetectMetrics, } #[derive(Debug, Clone)] @@ -34,13 +39,13 @@ pub enum Detection { /// This allows us to interoperate with protocols that send very small initial /// messages. In rare situations, we may fail to properly detect that a stream is /// HTTP. -#[derive(Copy, Clone, Debug)] +#[derive(Clone, Debug)] pub struct Detect { params: DetectParams, inner: N, } -#[derive(Copy, Clone, Debug)] +#[derive(Clone, Debug)] pub struct NewDetect { inner: N, params: P, @@ -108,18 +113,17 @@ where } fn call(&mut self, mut io: I) -> Self::Future { - let params = self.params; + let params = self.params.clone(); let inner = self.inner.clone(); Box::pin(async move { let t0 = time::Instant::now(); let mut buf = BytesMut::with_capacity(READ_CAPACITY); - let detection = detect(params, &mut io, &mut buf).await?; - debug!( - ?detection, - elapsed = ?time::Instant::now().saturating_duration_since(t0), - "Detected", - ); + let result = detect(¶ms, &mut io, &mut buf).await; + let elapsed = time::Instant::now().saturating_duration_since(t0); + params.metrics.observe(&result, elapsed); + debug!(?result, ?elapsed, "Detected"); + let detection = result?; trace!("Dispatching connection"); let svc = inner.new_service(detection); let mut svc = svc.ready_oneshot().await.map_err(Into::into)?; @@ -147,16 +151,16 @@ impl Detection { } async fn detect( - DetectParams { read_timeout }: DetectParams, + params: &DetectParams, io: &mut I, buf: &mut BytesMut, ) -> io::Result { debug_assert!(buf.capacity() > 0, "buffer must have capacity"); - trace!(capacity = buf.capacity(), timeout = ?read_timeout, "Reading"); - let sz = match time::timeout(read_timeout, io.read_buf(buf)).await { + trace!(capacity = buf.capacity(), timeout = ?params.read_timeout, "Reading"); + let sz = match time::timeout(params.read_timeout, io.read_buf(buf)).await { Ok(res) => res?, - Err(_) => return Ok(Detection::ReadTimeout(read_timeout)), + Err(_) => return Ok(Detection::ReadTimeout(params.read_timeout)), }; trace!(sz, "Read"); @@ -207,10 +211,11 @@ mod tests { let params = DetectParams { read_timeout: time::Duration::from_millis(1), + ..Default::default() }; let mut buf = BytesMut::with_capacity(1024); let mut io = io::Builder::new().wait(params.read_timeout * 2).build(); - let kind = detect(params, &mut io, &mut buf).await.unwrap(); + let kind = detect(¶ms, &mut io, &mut buf).await.unwrap(); assert!(matches!(kind, Detection::ReadTimeout(_)), "{kind:?}"); } @@ -220,12 +225,13 @@ mod tests { let params = DetectParams { read_timeout: time::Duration::from_millis(1), + ..Default::default() }; for read in &[H2_PREFACE, H2_AND_GARBAGE] { debug!(read = ?std::str::from_utf8(read).unwrap()); let mut buf = BytesMut::with_capacity(1024); let mut io = io::Builder::new().read(read).build(); - let kind = detect(params, &mut io, &mut buf).await.unwrap(); + let kind = detect(¶ms, &mut io, &mut buf).await.unwrap(); assert_eq!(kind.variant(), Some(Variant::H2), "{kind:?}"); } } @@ -236,19 +242,20 @@ mod tests { let params = DetectParams { read_timeout: time::Duration::from_millis(1), + ..Default::default() }; for i in 1..SMALLEST_POSSIBLE_HTTP1_REQ.len() { debug!(read = ?std::str::from_utf8(&HTTP11_LINE[..i]).unwrap()); let mut buf = BytesMut::with_capacity(1024); let mut io = io::Builder::new().read(&HTTP11_LINE[..i]).build(); - let kind = detect(params, &mut io, &mut buf).await.unwrap(); + let kind = detect(¶ms, &mut io, &mut buf).await.unwrap(); assert!(matches!(kind, Detection::NotHttp), "{kind:?}"); } debug!(read = ?std::str::from_utf8(HTTP11_LINE).unwrap()); let mut buf = BytesMut::with_capacity(1024); let mut io = io::Builder::new().read(HTTP11_LINE).build(); - let kind = detect(params, &mut io, &mut buf).await.unwrap(); + let kind = detect(¶ms, &mut io, &mut buf).await.unwrap(); assert_eq!(kind.variant(), Some(Variant::Http1), "{kind:?}"); const REQ: &[u8] = b"GET /foo/bar/bar/blah HTTP/1.1\r\nHost: foob.example.com\r\n\r\n"; @@ -256,7 +263,7 @@ mod tests { debug!(read = ?std::str::from_utf8(&REQ[..i]).unwrap()); let mut buf = BytesMut::with_capacity(1024); let mut io = io::Builder::new().read(&REQ[..i]).build(); - let kind = detect(params, &mut io, &mut buf).await.unwrap(); + let kind = detect(¶ms, &mut io, &mut buf).await.unwrap(); assert_eq!(kind.variant(), Some(Variant::Http1), "{kind:?}"); assert_eq!(buf[..], REQ[..i]); } @@ -267,7 +274,7 @@ mod tests { let mut buf = BytesMut::with_capacity(1024); let mut io = io::Builder::new().read(&POST[..i]).build(); debug!(read = ?std::str::from_utf8(&POST[..i]).unwrap()); - let kind = detect(params, &mut io, &mut buf).await.unwrap(); + let kind = detect(¶ms, &mut io, &mut buf).await.unwrap(); assert_eq!(kind.variant(), Some(Variant::Http1), "{kind:?}"); assert_eq!(buf[..], POST[..i]); } @@ -279,16 +286,17 @@ mod tests { let params = DetectParams { read_timeout: time::Duration::from_millis(1), + ..Default::default() }; let mut buf = BytesMut::with_capacity(1024); let mut io = io::Builder::new().read(b"foo.bar.blah\r\nbobo").build(); - let kind = detect(params, &mut io, &mut buf).await.unwrap(); + let kind = detect(¶ms, &mut io, &mut buf).await.unwrap(); assert!(matches!(kind, Detection::NotHttp), "{kind:?}"); assert_eq!(&buf[..], b"foo.bar.blah\r\nbobo"); let mut buf = BytesMut::with_capacity(1024); let mut io = io::Builder::new().read(GARBAGE).build(); - let kind = detect(params, &mut io, &mut buf).await.unwrap(); + let kind = detect(¶ms, &mut io, &mut buf).await.unwrap(); assert!(matches!(kind, Detection::NotHttp), "{kind:?}"); assert_eq!(&buf[..], GARBAGE); } @@ -299,10 +307,11 @@ mod tests { let params = DetectParams { read_timeout: time::Duration::from_millis(1), + ..Default::default() }; let mut buf = BytesMut::with_capacity(1024); let mut io = io::Builder::new().build(); - let err = detect(params, &mut io, &mut buf).await.unwrap_err(); + let err = detect(¶ms, &mut io, &mut buf).await.unwrap_err(); assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof, "{err:?}"); assert_eq!(&buf[..], b""); } diff --git a/linkerd/http/detect/src/metrics.rs b/linkerd/http/detect/src/metrics.rs new file mode 100644 index 0000000000..bb2d4b0f51 --- /dev/null +++ b/linkerd/http/detect/src/metrics.rs @@ -0,0 +1,195 @@ +use linkerd_http_variant::Variant; +use prometheus_client::{ + encoding::EncodeLabelSet, + metrics::{ + counter::Counter, + family::{Family, MetricConstructor}, + histogram::Histogram, + }, + registry::{Registry, Unit}, +}; +use std::{fmt::Debug, hash::Hash}; +use tokio::time; + +#[derive(Clone, Debug)] +pub struct DetectMetricsFamilies +where + L: Clone + Hash + Eq + EncodeLabelSet + Debug + Send + Sync + 'static, +{ + duration: Family, + results: Family, Counter>, +} + +#[derive(Clone, Debug)] +pub struct DetectMetrics { + duration: Histogram, + not_http: Counter, + http1: Counter, + h2: Counter, + read_timeout: Counter, + error: Counter, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +struct DetectLabels +where + L: Clone + Hash + Eq + EncodeLabelSet + Debug + Send + Sync + 'static, +{ + result: DetectResult, + labels: L, +} + +#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)] +enum DetectResult { + NotHttp, + Http1, + H2, + ReadTimeout, + Error, +} + +#[derive(Clone, Debug, Default)] +struct MkDurations; + +// === impl DetectMetricsFamilies === + +impl Default for DetectMetricsFamilies +where + L: Clone + Hash + Eq + EncodeLabelSet + Debug + Send + Sync + 'static, +{ + fn default() -> Self { + Self { + duration: Family::new_with_constructor(MkDurations), + results: Family::default(), + } + } +} + +impl DetectMetricsFamilies +where + L: Clone + Hash + Eq + EncodeLabelSet + Debug + Send + Sync + 'static, +{ + pub fn register(reg: &mut Registry) -> Self { + let duration = Family::new_with_constructor(MkDurations); + reg.register_with_unit( + "duration", + "Time taken for protocol detection", + Unit::Seconds, + duration.clone(), + ); + + let results = Family::default(); + reg.register("results", "Protocol detection results", results.clone()); + + Self { duration, results } + } + + pub fn metrics(&self, labels: L) -> DetectMetrics { + let duration = (*self.duration.get_or_create(&labels)).clone(); + + let not_http = (*self.results.get_or_create(&DetectLabels { + result: DetectResult::NotHttp, + labels: labels.clone(), + })) + .clone(); + let http1 = (*self.results.get_or_create(&DetectLabels { + result: DetectResult::Http1, + labels: labels.clone(), + })) + .clone(); + let h2 = (*self.results.get_or_create(&DetectLabels { + result: DetectResult::H2, + labels: labels.clone(), + })) + .clone(); + let read_timeout = (*self.results.get_or_create(&DetectLabels { + result: DetectResult::ReadTimeout, + labels: labels.clone(), + })) + .clone(); + let error = (*self.results.get_or_create(&DetectLabels { + result: DetectResult::Error, + labels, + })) + .clone(); + + DetectMetrics { + duration, + not_http, + http1, + h2, + read_timeout, + error, + } + } +} + +// === impl DetectMetrics === + +impl Default for DetectMetrics { + fn default() -> Self { + Self { + duration: MkDurations.new_metric(), + not_http: Counter::default(), + http1: Counter::default(), + h2: Counter::default(), + read_timeout: Counter::default(), + error: Counter::default(), + } + } +} + +impl DetectMetrics { + pub(crate) fn observe( + &self, + result: &std::io::Result, + elapsed: time::Duration, + ) { + match result { + Ok(super::Detection::NotHttp) => self.not_http.inc(), + Ok(super::Detection::Http(Variant::Http1)) => self.http1.inc(), + Ok(super::Detection::Http(Variant::H2)) => self.h2.inc(), + Ok(super::Detection::ReadTimeout(_)) => self.read_timeout.inc(), + Err(_) => self.error.inc(), + }; + self.duration.observe(elapsed.as_secs_f64()); + } +} + +// === impl DetectLabels === + +impl EncodeLabelSet for DetectLabels +where + L: Clone + Hash + Eq + EncodeLabelSet + Debug + Send + Sync + 'static, +{ + fn encode( + &self, + mut enc: prometheus_client::encoding::LabelSetEncoder, + ) -> Result<(), std::fmt::Error> { + use prometheus_client::encoding::EncodeLabel; + + ( + "result", + match self.result { + DetectResult::NotHttp => "not_http", + DetectResult::Http1 => "http/1", + DetectResult::H2 => "http/2", + DetectResult::ReadTimeout => "read_timeout", + DetectResult::Error => "error", + }, + ) + .encode(enc.encode_label())?; + + self.labels.encode(enc)?; + + Ok(()) + } +} + +// === impl MkDurations === + +impl MetricConstructor for MkDurations { + fn new_metric(&self) -> Histogram { + Histogram::new([0.001, 0.1].into_iter()) + } +} diff --git a/linkerd/proxy/http/src/lib.rs b/linkerd/proxy/http/src/lib.rs index 3ef847b0bc..b4dc00db13 100644 --- a/linkerd/proxy/http/src/lib.rs +++ b/linkerd/proxy/http/src/lib.rs @@ -36,7 +36,9 @@ pub use http::{ pub use http_body::Body; pub use linkerd_http_box::{BoxBody, BoxRequest, BoxResponse, EraseResponse}; pub use linkerd_http_classify as classify; -pub use linkerd_http_detect::{DetectParams, Detection, NewDetect}; +pub use linkerd_http_detect::{ + DetectMetrics, DetectMetricsFamilies, DetectParams, Detection, NewDetect, +}; pub use linkerd_http_executor::TracingExecutor; pub use linkerd_http_insert as insert; pub use linkerd_http_override_authority::{AuthorityOverride, NewOverrideAuthority};