Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1725,6 +1725,7 @@ dependencies = [
"linkerd-io",
"linkerd-stack",
"linkerd-tracing",
"prometheus-client",
"thiserror 2.0.12",
"tokio",
"tokio-test",
Expand Down
10 changes: 7 additions & 3 deletions linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ impl Config {
.push_on_service(http::BoxResponse::layer())
.arc_new_clone_http();

let inbound::DetectMetrics(detect_metrics) = metrics.detect.clone();
let tcp = http
.unlift_new()
.push(http::NewServeHttp::layer({
Expand Down Expand Up @@ -177,9 +178,12 @@ impl Config {
)
.arc_new_tcp()
.lift_new_with_target()
.push(http::NewDetect::layer(svc::stack::CloneParam::from(
http::DetectParams { read_timeout: DETECT_TIMEOUT }
)))
.push(http::NewDetect::layer(move |tcp: &Tcp| {
http::DetectParams {
read_timeout: DETECT_TIMEOUT,
metrics: detect_metrics.metrics(tcp.policy.server_label())
}
}))
.push(transport::metrics::NewServer::layer(metrics.proxy.transport))
.push_map_target(move |(tls, addrs): (tls::ConditionalServerTls, B::Addrs)| {
Tcp {
Expand Down
6 changes: 5 additions & 1 deletion linkerd/app/inbound/src/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,11 @@ mod tests {
}

fn inbound() -> Inbound<()> {
Inbound::new(test_util::default_config(), test_util::runtime().0)
Inbound::new(
test_util::default_config(),
test_util::runtime().0,
&mut Default::default(),
)
}

fn new_panic<T>(msg: &'static str) -> svc::ArcNewTcp<T, io::DuplexStream> {
Expand Down
39 changes: 31 additions & 8 deletions linkerd/app/inbound/src/detect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
};
use linkerd_app_core::{
identity, io,
metrics::ServerLabel,
metrics::{prom, ServerLabel},
proxy::http,
svc, tls,
transport::{
Expand All @@ -20,6 +20,10 @@ use tracing::info;
#[cfg(test)]
mod tests;

#[derive(Clone, Debug)]
pub struct MetricsFamilies(pub HttpDetectMetrics);
pub type HttpDetectMetrics = http::DetectMetricsFamilies<ServerLabel>;

#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct Forward {
client_addr: Remote<ClientAddr>,
Expand Down Expand Up @@ -61,7 +65,11 @@ type TlsIo<I> = tls::server::Io<identity::ServerIo<tls::server::DetectIo<I>>, I>
impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
/// Builds a stack that terminates mesh TLS and detects whether the traffic is HTTP (as hinted
/// by policy).
pub(crate) fn push_detect<T, I, F, FSvc>(self, forward: F) -> Inbound<svc::ArcNewTcp<T, I>>
pub(crate) fn push_detect<T, I, F, FSvc>(
self,
MetricsFamilies(metrics): MetricsFamilies,
forward: F,
) -> Inbound<svc::ArcNewTcp<T, I>>
where
T: svc::Param<OrigDstAddr> + svc::Param<Remote<ClientAddr>> + svc::Param<AllowPolicy>,
T: Clone + Send + 'static,
Expand All @@ -72,14 +80,18 @@ impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
FSvc::Error: Into<Error>,
FSvc::Future: Send,
{
self.push_detect_http(forward.clone())
self.push_detect_http(metrics, forward.clone())
.push_detect_tls(forward)
}

/// Builds a stack that handles HTTP detection once TLS detection has been performed. If the
/// connection is determined to be HTTP, the inner stack is used; otherwise the connection is
/// passed to the provided 'forward' stack.
fn push_detect_http<I, F, FSvc>(self, forward: F) -> Inbound<svc::ArcNewTcp<Tls, I>>
fn push_detect_http<I, F, FSvc>(
self,
metrics: HttpDetectMetrics,
forward: F,
) -> Inbound<svc::ArcNewTcp<Tls, I>>
where
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr,
I: Debug + Send + Sync + Unpin + 'static,
Expand Down Expand Up @@ -153,11 +165,12 @@ impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
forward.into_inner(),
)
.lift_new_with_target()
.push(http::NewDetect::layer(|Detect { timeout, .. }: &Detect| {
http::DetectParams {
.push(http::NewDetect::layer(
move |Detect { timeout, tls }: &Detect| http::DetectParams {
read_timeout: *timeout,
}
}))
metrics: metrics.metrics(tls.policy.server_label()),
},
))
.arc_new_tcp();

http.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new))
Expand Down Expand Up @@ -445,3 +458,13 @@ impl<T> svc::InsertParam<tls::ConditionalServerTls, T> for TlsParams {
(tls, target)
}
}

// === impl MetricsFamilies ===

impl MetricsFamilies {
pub fn register(reg: &mut prom::Registry) -> Self {
Self(http::DetectMetricsFamilies::register(
reg.sub_registry_with_prefix("http"),
))
}
}
99 changes: 93 additions & 6 deletions linkerd/app/inbound/src/detect/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ const HTTP1: &[u8] = b"GET / HTTP/1.1\r\nhost: example.com\r\n\r\n";
const HTTP2: &[u8] = b"PRI * HTTP/2.0\r\n";
const NOT_HTTP: &[u8] = b"foo\r\nbar\r\nblah\r\n";

const RESULTS_NOT_HTTP: &str = "results_total{result=\"not_http\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}";
const RESULTS_HTTP1: &str = "results_total{result=\"http/1\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}";
const RESULTS_HTTP2: &str = "results_total{result=\"http/2\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}";
const RESULTS_READ_TIMEOUT: &str = "results_total{result=\"read_timeout\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}";
const RESULTS_ERROR: &str = "results_total{result=\"error\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}";

fn authzs() -> Arc<[Authorization]> {
Arc::new([Authorization {
authentication: Authentication::Unauthenticated,
Expand Down Expand Up @@ -41,6 +47,35 @@ fn allow(protocol: Protocol) -> AllowPolicy {
allow
}

macro_rules! assert_contains_metric {
($registry:expr, $metric:expr, $value:expr) => {{
let mut buf = String::new();
prom::encoding::text::encode_registry(&mut buf, $registry).expect("encode registry failed");
let lines = buf.split_terminator('\n').collect::<Vec<_>>();
assert_eq!(
lines.iter().find(|l| l.starts_with($metric)),
Some(&&*format!("{} {}", $metric, $value)),
"metric '{}' not found in:\n{:?}",
$metric,
buf
);
}};
}

macro_rules! assert_not_contains_metric {
($registry:expr, $pattern:expr) => {{
let mut buf = String::new();
prom::encoding::text::encode_registry(&mut buf, $registry).expect("encode registry failed");
let lines = buf.split_terminator('\n').collect::<Vec<_>>();
assert!(
!lines.iter().any(|l| l.starts_with($pattern)),
"metric '{}' found in:\n{:?}",
$pattern,
buf
);
}};
}

#[tokio::test(flavor = "current_thread")]
async fn detect_tls_opaque() {
let _trace = trace::test::trace_init();
Expand Down Expand Up @@ -77,14 +112,21 @@ async fn detect_http_non_http() {
let (ior, mut iow) = io::duplex(100);
iow.write_all(NOT_HTTP).await.unwrap();

let mut registry = prom::Registry::default();
inbound()
.with_stack(new_panic("http stack must not be used"))
.push_detect_http(new_ok())
.push_detect_http(super::HttpDetectMetrics::register(&mut registry), new_ok())
.into_inner()
.new_service(target)
.oneshot(ior)
.await
.expect("should succeed");

assert_contains_metric!(&registry, RESULTS_NOT_HTTP, 1);
assert_contains_metric!(&registry, RESULTS_HTTP1, 0);
assert_contains_metric!(&registry, RESULTS_HTTP2, 0);
assert_contains_metric!(&registry, RESULTS_READ_TIMEOUT, 0);
assert_contains_metric!(&registry, RESULTS_ERROR, 0);
}

#[tokio::test(flavor = "current_thread")]
Expand All @@ -108,14 +150,24 @@ async fn detect_http() {
let (ior, mut iow) = io::duplex(100);
iow.write_all(HTTP1).await.unwrap();

let mut registry = prom::Registry::default();
inbound()
.with_stack(new_ok())
.push_detect_http(new_panic("tcp stack must not be used"))
.push_detect_http(
super::HttpDetectMetrics::register(&mut registry),
new_panic("tcp stack must not be used"),
)
.into_inner()
.new_service(target)
.oneshot(ior)
.await
.expect("should succeed");

assert_contains_metric!(&registry, RESULTS_NOT_HTTP, 0);
assert_contains_metric!(&registry, RESULTS_HTTP1, 1);
assert_contains_metric!(&registry, RESULTS_HTTP2, 0);
assert_contains_metric!(&registry, RESULTS_READ_TIMEOUT, 0);
assert_contains_metric!(&registry, RESULTS_ERROR, 0);
}

#[tokio::test(flavor = "current_thread")]
Expand All @@ -134,14 +186,24 @@ async fn hinted_http1() {
let (ior, mut iow) = io::duplex(100);
iow.write_all(HTTP1).await.unwrap();

let mut registry = prom::Registry::default();
inbound()
.with_stack(new_ok())
.push_detect_http(new_panic("tcp stack must not be used"))
.push_detect_http(
super::HttpDetectMetrics::register(&mut registry),
new_panic("tcp stack must not be used"),
)
.into_inner()
.new_service(target)
.oneshot(ior)
.await
.expect("should succeed");

assert_contains_metric!(&registry, RESULTS_NOT_HTTP, 0);
assert_contains_metric!(&registry, RESULTS_HTTP1, 1);
assert_contains_metric!(&registry, RESULTS_HTTP2, 0);
assert_contains_metric!(&registry, RESULTS_READ_TIMEOUT, 0);
assert_contains_metric!(&registry, RESULTS_ERROR, 0);
}

#[tokio::test(flavor = "current_thread")]
Expand All @@ -160,14 +222,24 @@ async fn hinted_http1_supports_http2() {
let (ior, mut iow) = io::duplex(100);
iow.write_all(HTTP2).await.unwrap();

let mut registry = prom::Registry::default();
inbound()
.with_stack(new_ok())
.push_detect_http(new_panic("tcp stack must not be used"))
.push_detect_http(
super::HttpDetectMetrics::register(&mut registry),
new_panic("tcp stack must not be used"),
)
.into_inner()
.new_service(target)
.oneshot(ior)
.await
.expect("should succeed");

assert_contains_metric!(&registry, RESULTS_NOT_HTTP, 0);
assert_contains_metric!(&registry, RESULTS_HTTP1, 0);
assert_contains_metric!(&registry, RESULTS_HTTP2, 1);
assert_contains_metric!(&registry, RESULTS_READ_TIMEOUT, 0);
assert_contains_metric!(&registry, RESULTS_ERROR, 0);
}

#[tokio::test(flavor = "current_thread")]
Expand All @@ -185,14 +257,25 @@ async fn hinted_http2() {

let (ior, _) = io::duplex(100);

let mut registry = prom::Registry::default();
inbound()
.with_stack(new_ok())
.push_detect_http(new_panic("tcp stack must not be used"))
.push_detect_http(
super::HttpDetectMetrics::register(&mut registry),
new_panic("tcp stack must not be used"),
)
.into_inner()
.new_service(target)
.oneshot(ior)
.await
.expect("should succeed");

// No detection is performed when HTTP/2 is hinted, so no metrics are recorded.
assert_not_contains_metric!(&registry, RESULTS_NOT_HTTP);
assert_not_contains_metric!(&registry, RESULTS_HTTP1);
assert_not_contains_metric!(&registry, RESULTS_HTTP2);
assert_not_contains_metric!(&registry, RESULTS_READ_TIMEOUT);
assert_not_contains_metric!(&registry, RESULTS_ERROR);
}

fn client_id() -> tls::ClientId {
Expand All @@ -210,7 +293,11 @@ fn orig_dst_addr() -> OrigDstAddr {
}

fn inbound() -> Inbound<()> {
Inbound::new(test_util::default_config(), test_util::runtime().0)
Inbound::new(
test_util::default_config(),
test_util::runtime().0,
&mut Default::default(),
)
}

fn new_panic<T, I: 'static>(msg: &'static str) -> svc::ArcNewTcp<T, I> {
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/inbound/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn build_server<I>(
where
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + Send + Unpin + 'static,
{
Inbound::new(cfg, rt)
Inbound::new(cfg, rt, &mut Default::default())
.with_stack(connect)
.map_stack(|cfg, _, s| {
s.push_map_target(|t| Param::<Remote<ServerAddr>>::param(&t))
Expand Down
15 changes: 11 additions & 4 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ pub mod test_util;

#[cfg(fuzzing)]
pub use self::http::fuzz as http_fuzz;
pub use self::{metrics::InboundMetrics, policy::DefaultPolicy};
pub use self::{
detect::MetricsFamilies as DetectMetrics, metrics::InboundMetrics, policy::DefaultPolicy,
};
use linkerd_app_core::{
config::{ConnectConfig, ProxyConfig, QueueConfig},
drain,
http_tracing::SpanSink,
identity, io,
metrics::prom,
proxy::{tap, tcp},
svc,
transport::{self, Remote, ServerAddr},
Expand Down Expand Up @@ -148,9 +151,9 @@ impl<S> Inbound<S> {
}

impl Inbound<()> {
pub fn new(config: Config, runtime: ProxyRuntime) -> Self {
pub fn new(config: Config, runtime: ProxyRuntime, prom: &mut prom::Registry) -> Self {
let runtime = Runtime {
metrics: InboundMetrics::new(runtime.metrics),
metrics: InboundMetrics::new(runtime.metrics, prom),
identity: runtime.identity,
tap: runtime.tap,
span_sink: runtime.span_sink,
Expand All @@ -166,7 +169,11 @@ impl Inbound<()> {
#[cfg(any(test, feature = "test-util"))]
pub fn for_test() -> (Self, drain::Signal) {
let (rt, drain) = test_util::runtime();
let this = Self::new(test_util::default_config(), rt);
let this = Self::new(
test_util::default_config(),
rt,
&mut prom::Registry::default(),
);
(this, drain)
}

Expand Down
8 changes: 7 additions & 1 deletion linkerd/app/inbound/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,22 @@ pub struct InboundMetrics {
/// Holds metrics that are common to both inbound and outbound proxies. These metrics are
/// reported separately
pub proxy: Proxy,

pub detect: crate::detect::MetricsFamilies,
}

impl InboundMetrics {
pub(crate) fn new(proxy: Proxy) -> Self {
pub(crate) fn new(proxy: Proxy, reg: &mut prom::Registry) -> Self {
let detect =
crate::detect::MetricsFamilies::register(reg.sub_registry_with_prefix("tcp_detect"));

Self {
http_authz: authz::HttpAuthzMetrics::default(),
http_errors: error::HttpErrorMetrics::default(),
tcp_authz: authz::TcpAuthzMetrics::default(),
tcp_errors: error::TcpErrorMetrics::default(),
proxy,
detect,
}
}
}
Expand Down
Loading
Loading