Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1725,6 +1725,7 @@ dependencies = [
"linkerd-io",
"linkerd-stack",
"linkerd-tracing",
"prometheus-client",
"thiserror 2.0.12",
"tokio",
"tokio-test",
Expand Down
10 changes: 7 additions & 3 deletions linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ impl Config {
.push_on_service(http::BoxResponse::layer())
.arc_new_clone_http();

let inbound::DetectMetrics(detect_metrics) = metrics.detect.clone();
let tcp = http
.unlift_new()
.push(http::NewServeHttp::layer({
Expand Down Expand Up @@ -177,9 +178,12 @@ impl Config {
)
.arc_new_tcp()
.lift_new_with_target()
.push(http::NewDetect::layer(svc::stack::CloneParam::from(
http::DetectParams { read_timeout: DETECT_TIMEOUT }
)))
.push(http::NewDetect::layer(move |tcp: &Tcp| {
http::DetectParams {
read_timeout: DETECT_TIMEOUT,
metrics: detect_metrics.metrics(tcp.policy.server_label())
}
}))
.push(transport::metrics::NewServer::layer(metrics.proxy.transport))
.push_map_target(move |(tls, addrs): (tls::ConditionalServerTls, B::Addrs)| {
Tcp {
Expand Down
6 changes: 5 additions & 1 deletion linkerd/app/inbound/src/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,11 @@ mod tests {
}

fn inbound() -> Inbound<()> {
Inbound::new(test_util::default_config(), test_util::runtime().0)
Inbound::new(
test_util::default_config(),
test_util::runtime().0,
&mut Default::default(),
)
}

fn new_panic<T>(msg: &'static str) -> svc::ArcNewTcp<T, io::DuplexStream> {
Expand Down
39 changes: 31 additions & 8 deletions linkerd/app/inbound/src/detect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
};
use linkerd_app_core::{
identity, io,
metrics::ServerLabel,
metrics::{prom, ServerLabel},
proxy::http,
svc, tls,
transport::{
Expand All @@ -20,6 +20,10 @@ use tracing::info;
#[cfg(test)]
mod tests;

#[derive(Clone, Debug)]
pub struct MetricsFamilies(pub HttpDetectMetrics);
pub type HttpDetectMetrics = http::DetectMetricsFamilies<ServerLabel>;

#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct Forward {
client_addr: Remote<ClientAddr>,
Expand Down Expand Up @@ -61,7 +65,11 @@ type TlsIo<I> = tls::server::Io<identity::ServerIo<tls::server::DetectIo<I>>, I>
impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
/// Builds a stack that terminates mesh TLS and detects whether the traffic is HTTP (as hinted
/// by policy).
pub(crate) fn push_detect<T, I, F, FSvc>(self, forward: F) -> Inbound<svc::ArcNewTcp<T, I>>
pub(crate) fn push_detect<T, I, F, FSvc>(
self,
MetricsFamilies(metrics): MetricsFamilies,
forward: F,
) -> Inbound<svc::ArcNewTcp<T, I>>
where
T: svc::Param<OrigDstAddr> + svc::Param<Remote<ClientAddr>> + svc::Param<AllowPolicy>,
T: Clone + Send + 'static,
Expand All @@ -72,14 +80,18 @@ impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
FSvc::Error: Into<Error>,
FSvc::Future: Send,
{
self.push_detect_http(forward.clone())
self.push_detect_http(metrics, forward.clone())
.push_detect_tls(forward)
}

/// Builds a stack that handles HTTP detection once TLS detection has been performed. If the
/// connection is determined to be HTTP, the inner stack is used; otherwise the connection is
/// passed to the provided 'forward' stack.
fn push_detect_http<I, F, FSvc>(self, forward: F) -> Inbound<svc::ArcNewTcp<Tls, I>>
fn push_detect_http<I, F, FSvc>(
self,
metrics: HttpDetectMetrics,
forward: F,
) -> Inbound<svc::ArcNewTcp<Tls, I>>
where
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr,
I: Debug + Send + Sync + Unpin + 'static,
Expand Down Expand Up @@ -153,11 +165,12 @@ impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
forward.into_inner(),
)
.lift_new_with_target()
.push(http::NewDetect::layer(|Detect { timeout, .. }: &Detect| {
http::DetectParams {
.push(http::NewDetect::layer(
move |Detect { timeout, tls }: &Detect| http::DetectParams {
read_timeout: *timeout,
}
}))
metrics: metrics.metrics(tls.policy.server_label()),
},
))
.arc_new_tcp();

http.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new))
Expand Down Expand Up @@ -445,3 +458,13 @@ impl<T> svc::InsertParam<tls::ConditionalServerTls, T> for TlsParams {
(tls, target)
}
}

// === impl MetricsFamilies ===

impl MetricsFamilies {
pub fn register(reg: &mut prom::Registry) -> Self {
Self(http::DetectMetricsFamilies::register(
reg.sub_registry_with_prefix("http"),
))
}
}
16 changes: 10 additions & 6 deletions linkerd/app/inbound/src/detect/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async fn detect_http_non_http() {

inbound()
.with_stack(new_panic("http stack must not be used"))
.push_detect_http(new_ok())
.push_detect_http(Default::default(), new_ok())
.into_inner()
.new_service(target)
.oneshot(ior)
Expand Down Expand Up @@ -110,7 +110,7 @@ async fn detect_http() {

inbound()
.with_stack(new_ok())
.push_detect_http(new_panic("tcp stack must not be used"))
.push_detect_http(Default::default(), new_panic("tcp stack must not be used"))
.into_inner()
.new_service(target)
.oneshot(ior)
Expand All @@ -136,7 +136,7 @@ async fn hinted_http1() {

inbound()
.with_stack(new_ok())
.push_detect_http(new_panic("tcp stack must not be used"))
.push_detect_http(Default::default(), new_panic("tcp stack must not be used"))
.into_inner()
.new_service(target)
.oneshot(ior)
Expand All @@ -162,7 +162,7 @@ async fn hinted_http1_supports_http2() {

inbound()
.with_stack(new_ok())
.push_detect_http(new_panic("tcp stack must not be used"))
.push_detect_http(Default::default(), new_panic("tcp stack must not be used"))
.into_inner()
.new_service(target)
.oneshot(ior)
Expand All @@ -187,7 +187,7 @@ async fn hinted_http2() {

inbound()
.with_stack(new_ok())
.push_detect_http(new_panic("tcp stack must not be used"))
.push_detect_http(Default::default(), new_panic("tcp stack must not be used"))
.into_inner()
.new_service(target)
.oneshot(ior)
Expand All @@ -210,7 +210,11 @@ fn orig_dst_addr() -> OrigDstAddr {
}

fn inbound() -> Inbound<()> {
Inbound::new(test_util::default_config(), test_util::runtime().0)
Inbound::new(
test_util::default_config(),
test_util::runtime().0,
&mut Default::default(),
)
}

fn new_panic<T, I: 'static>(msg: &'static str) -> svc::ArcNewTcp<T, I> {
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/inbound/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn build_server<I>(
where
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + Send + Unpin + 'static,
{
Inbound::new(cfg, rt)
Inbound::new(cfg, rt, &mut Default::default())
.with_stack(connect)
.map_stack(|cfg, _, s| {
s.push_map_target(|t| Param::<Remote<ServerAddr>>::param(&t))
Expand Down
15 changes: 11 additions & 4 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ pub mod test_util;

#[cfg(fuzzing)]
pub use self::http::fuzz as http_fuzz;
pub use self::{metrics::InboundMetrics, policy::DefaultPolicy};
pub use self::{
detect::MetricsFamilies as DetectMetrics, metrics::InboundMetrics, policy::DefaultPolicy,
};
use linkerd_app_core::{
config::{ConnectConfig, ProxyConfig, QueueConfig},
drain,
http_tracing::SpanSink,
identity, io,
metrics::prom,
proxy::{tap, tcp},
svc,
transport::{self, Remote, ServerAddr},
Expand Down Expand Up @@ -148,9 +151,9 @@ impl<S> Inbound<S> {
}

impl Inbound<()> {
pub fn new(config: Config, runtime: ProxyRuntime) -> Self {
pub fn new(config: Config, runtime: ProxyRuntime, prom: &mut prom::Registry) -> Self {
let runtime = Runtime {
metrics: InboundMetrics::new(runtime.metrics),
metrics: InboundMetrics::new(runtime.metrics, prom),
identity: runtime.identity,
tap: runtime.tap,
span_sink: runtime.span_sink,
Expand All @@ -166,7 +169,11 @@ impl Inbound<()> {
#[cfg(any(test, feature = "test-util"))]
pub fn for_test() -> (Self, drain::Signal) {
let (rt, drain) = test_util::runtime();
let this = Self::new(test_util::default_config(), rt);
let this = Self::new(
test_util::default_config(),
rt,
&mut prom::Registry::default(),
);
(this, drain)
}

Expand Down
8 changes: 7 additions & 1 deletion linkerd/app/inbound/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,22 @@ pub struct InboundMetrics {
/// Holds metrics that are common to both inbound and outbound proxies. These metrics are
/// reported separately
pub proxy: Proxy,

pub detect: crate::detect::MetricsFamilies,
}

impl InboundMetrics {
pub(crate) fn new(proxy: Proxy) -> Self {
pub(crate) fn new(proxy: Proxy, reg: &mut prom::Registry) -> Self {
let detect =
crate::detect::MetricsFamilies::register(reg.sub_registry_with_prefix("tcp_detect"));

Self {
http_authz: authz::HttpAuthzMetrics::default(),
http_errors: error::HttpErrorMetrics::default(),
tcp_authz: authz::TcpAuthzMetrics::default(),
tcp_errors: error::TcpErrorMetrics::default(),
proxy,
detect,
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion linkerd/app/inbound/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ impl Inbound<()> {
I: Debug + Unpin + Send + Sync + 'static,
P: profiles::GetProfile<Error = Error>,
{
let detect_metrics = self.runtime.metrics.detect.clone();

// Handles connections to ports that can't be determined to be HTTP.
let forward = self
.clone()
Expand Down Expand Up @@ -97,7 +99,7 @@ impl Inbound<()> {
// Determines how to handle an inbound connection, dispatching it to the appropriate
// stack.
http.push_http_tcp_server()
.push_detect(forward)
.push_detect(detect_metrics, forward)
.push_accept(addr.port(), policies, direct)
.into_inner()
}
Expand Down
15 changes: 10 additions & 5 deletions linkerd/app/outbound/src/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,15 @@ impl<N> Outbound<N> {
NSvc::Future: Send,
{
self.map_stack(|config, rt, inner| {
let detect_params = http::DetectParams {
read_timeout: config.proxy.detect_protocol_timeout,
metrics: rt
.metrics
.prom
.http_detect
.metrics(ParentRef(policy::Meta::new_default("ingress"))),
};

// Route requests with destinations that can be discovered via the
// `l5d-dst-override` header through the (load balanced) logical
// stack. Route requests without the header through the endpoint
Expand Down Expand Up @@ -329,11 +338,7 @@ impl<N> Outbound<N> {
fallback,
)
.lift_new_with_target()
.push(http::NewDetect::layer(svc::CloneParam::from(
http::DetectParams {
read_timeout: config.proxy.detect_protocol_timeout,
},
)))
.push(http::NewDetect::layer(svc::CloneParam::from(detect_params)))
.arc_new_tcp()
})
}
Expand Down
7 changes: 7 additions & 0 deletions linkerd/app/outbound/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct OutboundMetrics {

#[derive(Clone, Debug, Default)]
pub(crate) struct PromMetrics {
pub(crate) http_detect: crate::http::DetectMetricsFamilies<ParentRef>,
pub(crate) http: crate::http::HttpMetrics,
pub(crate) opaq: crate::opaq::OpaqMetrics,
pub(crate) tls: crate::tls::TlsMetrics,
Expand Down Expand Up @@ -88,6 +89,11 @@ where

impl PromMetrics {
pub fn register(registry: &mut prom::Registry) -> Self {
let http_detect = crate::http::DetectMetricsFamilies::register(
// Scoped consistently with the inbound metrics.
registry.sub_registry_with_prefix("tcp_detect_http"),
);

// NOTE: HTTP metrics are scoped internally, since this configures both
// HTTP and gRPC scopes.
let http = crate::http::HttpMetrics::register(registry);
Expand All @@ -97,6 +103,7 @@ impl PromMetrics {
let tls = crate::tls::TlsMetrics::register(registry.sub_registry_with_prefix("tls"));

Self {
http_detect,
http,
opaq,
tls,
Expand Down
17 changes: 11 additions & 6 deletions linkerd/app/outbound/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{http, Outbound};
use crate::{http, Outbound, ParentRef};
use linkerd_app_core::{io, svc, Error, Infallible};
use std::{fmt::Debug, hash::Hash};

Expand Down Expand Up @@ -35,6 +35,7 @@ impl<N> Outbound<N> {
where
// Target type indicating whether detection should be skipped.
T: svc::Param<Protocol>,
T: svc::Param<ParentRef>,
T: Eq + Hash + Clone + Debug + Send + Sync + 'static,
// Server-side socket.
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr,
Expand Down Expand Up @@ -62,7 +63,10 @@ impl<N> Outbound<N> {
.arc_new_tcp()
});

let detect = http.clone().map_stack(|config, _, http| {
let detect = http.clone().map_stack(|config, rt, http| {
let read_timeout = config.proxy.detect_protocol_timeout;
let metrics = rt.metrics.prom.http_detect.clone();

http.push_switch(
|(detected, parent): (http::Detection, T)| -> Result<_, Infallible> {
match detected {
Expand All @@ -84,11 +88,12 @@ impl<N> Outbound<N> {
.push_on_service(svc::LoadShed::layer())
.push_on_service(svc::MapTargetLayer::new(io::EitherIo::Right))
.lift_new_with_target::<(http::Detection, T)>()
.push(http::NewDetect::layer(svc::CloneParam::from(
.push(http::NewDetect::layer(move |parent: &T| {
http::DetectParams {
read_timeout: config.proxy.detect_protocol_timeout,
},
)))
read_timeout,
metrics: metrics.metrics(parent.param()),
}
}))
.arc_new_tcp()
});

Expand Down
6 changes: 6 additions & 0 deletions linkerd/app/outbound/src/sidecar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ impl svc::Param<Protocol> for Sidecar {
}
}

impl svc::Param<ParentRef> for Sidecar {
fn param(&self) -> ParentRef {
ParentRef(self.policy.borrow().parent.clone())
}
}

impl PartialEq for Sidecar {
fn eq(&self, other: &Self) -> bool {
self.orig_dst == other.orig_dst
Expand Down
Loading
Loading