Skip to content

Commit c08a6b5

Browse files
committed
feat(app): Backend response frame count metrics
this introduces a new tower middleware for Prometheus metrics, used for instrumenting HTTP and gRPC response bodies, and observing (a) the number of frames yielded by a body, and (b) the number of bytes included in body frames. this middleware allows operators to reason about how large or small the packets being served in a backend's response bodies are. a route-level middleware that instruments request bodies will be added in a follow-on PR. ### 📝 changes an overview of changes made here: * the `linkerd-http-prom` has a new `body_data` submodule. it exposes `request` and `response` halves, to be explicit about which body is being instrumented on a `tower::Service`. * the `linkerd-http-prom` crate now has a collection of new dependencies: `bytes` is added as a dependency in order to inspect the data chunk when the inner body yields a new frame. `futures-util` and `http-body` are added as dev-dependencies for the accompanying test coverage. * body metrics are affixed to the `RouteBackendMetrics<L>` structure, and registered at startup. Signed-off-by: katelyn martin <[email protected]>
1 parent c268774 commit c08a6b5

File tree

11 files changed

+467
-6
lines changed

11 files changed

+467
-6
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,7 +1325,9 @@ dependencies = [
13251325
"ahash",
13261326
"bytes",
13271327
"futures",
1328+
"futures-util",
13281329
"http",
1330+
"http-body",
13291331
"hyper",
13301332
"linkerd-app-core",
13311333
"linkerd-app-test",
@@ -1554,6 +1556,7 @@ dependencies = [
15541556
name = "linkerd-http-prom"
15551557
version = "0.1.0"
15561558
dependencies = [
1559+
"bytes",
15571560
"futures",
15581561
"http",
15591562
"http-body",

linkerd/app/outbound/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ linkerd-tonic-stream = { path = "../../tonic-stream" }
4949
linkerd-tonic-watch = { path = "../../tonic-watch" }
5050

5151
[dev-dependencies]
52+
futures-util = "0.3"
53+
http-body = "0.4"
5254
hyper = { version = "0.14", features = ["http1", "http2"] }
5355
tokio = { version = "1", features = ["macros", "sync", "time"] }
5456
tokio-rustls = "0.24"

linkerd/app/outbound/src/http/logical/policy/route/backend/metrics.rs

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::{BackendRef, ParentRef, RouteRef};
22
use linkerd_app_core::{metrics::prom, svc};
33
use linkerd_http_prom::{
4+
body_data::response::{BodyDataMetrics, NewRecordBodyData, ResponseBodyFamilies},
45
record_response::{self, NewResponseDuration, StreamLabel},
56
NewCountRequests, RequestCount, RequestCountFamilies,
67
};
@@ -15,6 +16,7 @@ mod tests;
1516
pub struct RouteBackendMetrics<L: StreamLabel> {
1617
requests: RequestCountFamilies<labels::RouteBackend>,
1718
responses: ResponseMetrics<L>,
19+
body_metrics: ResponseBodyFamilies<labels::RouteBackend>,
1820
}
1921

2022
type ResponseMetrics<L> = record_response::ResponseMetrics<
@@ -26,14 +28,24 @@ pub fn layer<T, N>(
2628
metrics: &RouteBackendMetrics<T::StreamLabel>,
2729
) -> impl svc::Layer<
2830
N,
29-
Service = NewCountRequests<
30-
ExtractRequestCount,
31-
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
31+
Service = NewRecordBodyData<
32+
ExtractRecordBodyDataParams,
33+
NewCountRequests<
34+
ExtractRequestCount,
35+
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
36+
>,
3237
>,
3338
> + Clone
3439
where
3540
T: MkStreamLabel,
3641
N: svc::NewService<T>,
42+
NewRecordBodyData<
43+
ExtractRecordBodyDataParams,
44+
NewCountRequests<
45+
ExtractRequestCount,
46+
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
47+
>,
48+
>: svc::NewService<T>,
3749
NewCountRequests<
3850
ExtractRequestCount,
3951
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
@@ -44,28 +56,37 @@ where
4456
let RouteBackendMetrics {
4557
requests,
4658
responses,
59+
body_metrics,
4760
} = metrics.clone();
61+
4862
svc::layer::mk(move |inner| {
4963
use svc::Layer;
50-
NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer(
51-
NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone()))
52-
.layer(inner),
64+
NewRecordBodyData::layer_via(ExtractRecordBodyDataParams(body_metrics.clone())).layer(
65+
NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer(
66+
NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone()))
67+
.layer(inner),
68+
),
5369
)
5470
})
5571
}
5672

5773
#[derive(Clone, Debug)]
5874
pub struct ExtractRequestCount(RequestCountFamilies<labels::RouteBackend>);
5975

76+
#[derive(Clone, Debug)]
77+
pub struct ExtractRecordBodyDataParams(ResponseBodyFamilies<labels::RouteBackend>);
78+
6079
// === impl RouteBackendMetrics ===
6180

6281
impl<L: StreamLabel> RouteBackendMetrics<L> {
6382
pub fn register(reg: &mut prom::Registry, histo: impl IntoIterator<Item = f64>) -> Self {
6483
let requests = RequestCountFamilies::register(reg);
6584
let responses = record_response::ResponseMetrics::register(reg, histo);
85+
let body_metrics = ResponseBodyFamilies::register(reg);
6686
Self {
6787
requests,
6888
responses,
89+
body_metrics,
6990
}
7091
}
7192

@@ -83,13 +104,22 @@ impl<L: StreamLabel> RouteBackendMetrics<L> {
83104
pub(crate) fn get_statuses(&self, l: &L::StatusLabels) -> prom::Counter {
84105
self.responses.get_statuses(l)
85106
}
107+
108+
#[cfg(test)]
109+
pub(crate) fn get_response_body_metrics(
110+
&self,
111+
l: &labels::RouteBackend,
112+
) -> linkerd_http_prom::body_data::response::BodyDataMetrics {
113+
self.body_metrics.get(l)
114+
}
86115
}
87116

88117
impl<L: StreamLabel> Default for RouteBackendMetrics<L> {
89118
fn default() -> Self {
90119
Self {
91120
requests: Default::default(),
92121
responses: Default::default(),
122+
body_metrics: Default::default(),
93123
}
94124
}
95125
}
@@ -99,6 +129,7 @@ impl<L: StreamLabel> Clone for RouteBackendMetrics<L> {
99129
Self {
100130
requests: self.requests.clone(),
101131
responses: self.responses.clone(),
132+
body_metrics: self.body_metrics.clone(),
102133
}
103134
}
104135
}
@@ -114,3 +145,17 @@ where
114145
.metrics(&labels::RouteBackend(t.param(), t.param(), t.param()))
115146
}
116147
}
148+
149+
// === impl ExtractRecordBodyDataParams ===
150+
151+
impl<T> svc::ExtractParam<BodyDataMetrics, T> for ExtractRecordBodyDataParams
152+
where
153+
T: svc::Param<ParentRef> + svc::Param<RouteRef> + svc::Param<BackendRef>,
154+
{
155+
fn extract_param(&self, t: &T) -> BodyDataMetrics {
156+
let Self(families) = self;
157+
let labels = labels::RouteBackend(t.param(), t.param(), t.param());
158+
159+
families.get(&labels)
160+
}
161+
}

linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ use super::{
55
LabelGrpcRouteBackendRsp, LabelHttpRouteBackendRsp, RouteBackendMetrics,
66
};
77
use crate::http::{concrete, logical::Concrete};
8+
use bytes::Buf;
89
use linkerd_app_core::{
910
svc::{self, http::BoxBody, Layer, NewService},
1011
transport::{Remote, ServerAddr},
12+
Error,
1113
};
1214
use linkerd_proxy_client_policy as policy;
1315

@@ -116,6 +118,128 @@ async fn http_request_statuses() {
116118
assert_eq!(mixed.get(), 1);
117119
}
118120

121+
/// Tests that metrics count frames in the backend response body.
122+
#[tokio::test(flavor = "current_thread", start_paused = true)]
123+
async fn body_data_layer_records_frames() -> Result<(), Error> {
124+
use http_body::Body;
125+
use linkerd_app_core::proxy::http;
126+
use linkerd_http_prom::body_data::response::BodyDataMetrics;
127+
use tower::{Service, ServiceExt};
128+
129+
let _trace = linkerd_tracing::test::trace_init();
130+
131+
let metrics = super::RouteBackendMetrics::default();
132+
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
133+
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
134+
let backend_ref = crate::BackendRef(policy::Meta::new_default("backend"));
135+
136+
let (mut svc, mut handle) =
137+
mock_http_route_backend_metrics(&metrics, &parent_ref, &route_ref, &backend_ref);
138+
handle.allow(1);
139+
140+
// Create a request.
141+
let req = {
142+
let empty = hyper::Body::empty();
143+
let body = BoxBody::new(empty);
144+
http::Request::builder().method("DOOT").body(body).unwrap()
145+
};
146+
147+
// Call the service once it is ready to accept a request.
148+
tracing::info!("calling service");
149+
svc.ready().await.expect("ready");
150+
let call = svc.call(req);
151+
let (req, send_resp) = handle.next_request().await.unwrap();
152+
debug_assert_eq!(req.method().as_str(), "DOOT");
153+
154+
// Acquire the counters for this backend.
155+
tracing::info!("acquiring response body metrics");
156+
let labels = labels::RouteBackend(parent_ref.clone(), route_ref.clone(), backend_ref.clone());
157+
let BodyDataMetrics {
158+
frames_total,
159+
frames_bytes,
160+
} = metrics.get_response_body_metrics(&labels);
161+
162+
// Before we've sent a response, the counter should be zero.
163+
assert_eq!(frames_total.get(), 0);
164+
assert_eq!(frames_bytes.get(), 0);
165+
166+
// Create a response whose body is backed by a channel that we can send chunks to, send it.
167+
tracing::info!("sending response");
168+
let mut resp_tx = {
169+
let (tx, body) = hyper::Body::channel();
170+
let body = BoxBody::new(body);
171+
let resp = http::Response::builder()
172+
.status(http::StatusCode::IM_A_TEAPOT)
173+
.body(body)
174+
.unwrap();
175+
send_resp.send_response(resp);
176+
tx
177+
};
178+
179+
// Before we've sent any bytes, the counter should be zero.
180+
assert_eq!(frames_total.get(), 0);
181+
assert_eq!(frames_bytes.get(), 0);
182+
183+
// On the client end, poll our call future and await the response.
184+
tracing::info!("polling service future");
185+
let (parts, body) = call.await?.into_parts();
186+
debug_assert_eq!(parts.status, 418);
187+
188+
let mut body = Box::pin(body);
189+
190+
/// Returns the next chunk from a boxed body.
191+
async fn read_chunk(body: &mut std::pin::Pin<Box<BoxBody>>) -> Result<Vec<u8>, Error> {
192+
use std::task::{Context, Poll};
193+
let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref());
194+
let data = match body.as_mut().poll_data(&mut ctx) {
195+
Poll::Ready(Some(Ok(d))) => d,
196+
_ => panic!("next chunk should be ready"),
197+
};
198+
let chunk = data.chunk().to_vec();
199+
Ok(chunk)
200+
}
201+
202+
{
203+
// Send a chunk, confirm that our counters are incremented.
204+
tracing::info!("sending first chunk");
205+
resp_tx.send_data("hello".into()).await?;
206+
let chunk = read_chunk(&mut body).await?;
207+
debug_assert_eq!("hello".as_bytes(), chunk, "should get same value back out");
208+
assert_eq!(frames_total.get(), 1);
209+
assert_eq!(frames_bytes.get(), 5);
210+
}
211+
212+
{
213+
// Send another chunk, confirm that our counters are incremented once more.
214+
tracing::info!("sending second chunk");
215+
resp_tx.send_data(", world!".into()).await?;
216+
let chunk = read_chunk(&mut body).await?;
217+
debug_assert_eq!(
218+
", world!".as_bytes(),
219+
chunk,
220+
"should get same value back out"
221+
);
222+
assert_eq!(frames_total.get(), 2);
223+
assert_eq!(frames_bytes.get(), 5 + 8);
224+
}
225+
226+
{
227+
// Close the body, show that the counters remain at the same values.
228+
use std::task::{Context, Poll};
229+
tracing::info!("closing response body");
230+
drop(resp_tx);
231+
let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref());
232+
match body.as_mut().poll_data(&mut ctx) {
233+
Poll::Ready(None) => {}
234+
_ => panic!("got unexpected poll result"),
235+
};
236+
assert_eq!(frames_total.get(), 2);
237+
assert_eq!(frames_bytes.get(), 5 + 8);
238+
}
239+
240+
Ok(())
241+
}
242+
119243
#[tokio::test(flavor = "current_thread", start_paused = true)]
120244
async fn grpc_request_statuses_ok() {
121245
let _trace = linkerd_tracing::test::trace_init();

linkerd/http/prom/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Tower middleware for Prometheus metrics.
1313
test-util = []
1414

1515
[dependencies]
16+
bytes = "1"
1617
futures = { version = "0.3", default-features = false }
1718
http = "0.2"
1819
http-body = "0.4"

linkerd/http/prom/src/body_data.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pub mod request;
2+
pub mod response;
3+
4+
mod body;
5+
mod metrics;
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
use super::metrics::BodyDataMetrics;
2+
use http::HeaderMap;
3+
use http_body::SizeHint;
4+
use pin_project::pin_project;
5+
use std::{
6+
pin::Pin,
7+
task::{Context, Poll},
8+
};
9+
10+
/// An instrumented body.
11+
#[pin_project]
12+
pub struct Body<B> {
13+
/// The inner body.
14+
#[pin]
15+
inner: B,
16+
/// Metrics with which the inner body will be instrumented.
17+
metrics: BodyDataMetrics,
18+
}
19+
20+
impl<B> Body<B> {
21+
/// Returns a new, instrumented body.
22+
pub(crate) fn new(body: B, metrics: BodyDataMetrics) -> Self {
23+
Self {
24+
inner: body,
25+
metrics,
26+
}
27+
}
28+
}
29+
30+
impl<B> http_body::Body for Body<B>
31+
where
32+
B: http_body::Body,
33+
{
34+
type Data = B::Data;
35+
type Error = B::Error;
36+
37+
/// Attempt to pull out the next data buffer of this stream.
38+
fn poll_data(
39+
self: Pin<&mut Self>,
40+
cx: &mut Context<'_>,
41+
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
42+
let this = self.project();
43+
let inner = this.inner;
44+
let BodyDataMetrics {
45+
frames_total,
46+
frames_bytes,
47+
} = this.metrics;
48+
49+
let data = std::task::ready!(inner.poll_data(cx));
50+
51+
if let Some(Ok(data)) = data.as_ref() {
52+
// We've polled and yielded a new chunk! Increment our telemetry.
53+
//
54+
// NB: We're careful to call `remaining()` rather than `chunk()`, which
55+
// "can return a shorter slice (this allows non-continuous internal representation)."
56+
let bytes = <B::Data as bytes::Buf>::remaining(data)
57+
.try_into()
58+
.unwrap_or(u64::MAX);
59+
frames_bytes.inc_by(bytes);
60+
frames_total.inc();
61+
}
62+
63+
Poll::Ready(data)
64+
}
65+
66+
fn poll_trailers(
67+
self: Pin<&mut Self>,
68+
cx: &mut Context<'_>,
69+
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
70+
self.project().inner.poll_trailers(cx)
71+
}
72+
73+
fn is_end_stream(&self) -> bool {
74+
self.inner.is_end_stream()
75+
}
76+
77+
fn size_hint(&self) -> SizeHint {
78+
self.inner.size_hint()
79+
}
80+
}

0 commit comments

Comments
 (0)