Skip to content

Commit 41d2c9c

Browse files
committed
feat(app): add backend response frame count metric
Signed-off-by: katelyn martin <[email protected]>
1 parent c268774 commit 41d2c9c

File tree

11 files changed

+539
-6
lines changed

11 files changed

+539
-6
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,7 +1325,9 @@ dependencies = [
13251325
"ahash",
13261326
"bytes",
13271327
"futures",
1328+
"futures-util",
13281329
"http",
1330+
"http-body",
13291331
"hyper",
13301332
"linkerd-app-core",
13311333
"linkerd-app-test",

linkerd/app/outbound/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ linkerd-tonic-stream = { path = "../../tonic-stream" }
4949
linkerd-tonic-watch = { path = "../../tonic-watch" }
5050

5151
[dev-dependencies]
52+
futures-util = "0.3"
53+
http-body = "0.4"
5254
hyper = { version = "0.14", features = ["http1", "http2"] }
5355
tokio = { version = "1", features = ["macros", "sync", "time"] }
5456
tokio-rustls = "0.24"

linkerd/app/outbound/src/http/logical/policy/route/backend.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ where
9999
}| concrete,
100100
)
101101
.push(filters::NewApplyFilters::<Self, _, _>::layer())
102+
.check_new_service::<Self, http::Request<http::BoxBody>>()
102103
.push(metrics::layer(&metrics))
104+
.check_new_service::<Self, http::Request<http::BoxBody>>()
103105
.push(svc::NewMapErr::layer_with(|t: &Self| {
104106
let backend = t.params.concrete.backend_ref.clone();
105107
move |source| {

linkerd/app/outbound/src/http/logical/policy/route/backend/metrics.rs

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::{BackendRef, ParentRef, RouteRef};
22
use linkerd_app_core::{metrics::prom, svc};
33
use linkerd_http_prom::{
4+
body_data::response::{BodyDataMetrics, NewRecordBodyData, ResponseMetricsFamilies},
45
record_response::{self, NewResponseDuration, StreamLabel},
56
NewCountRequests, RequestCount, RequestCountFamilies,
67
};
@@ -11,10 +12,12 @@ pub use linkerd_http_prom::record_response::MkStreamLabel;
1112
#[cfg(test)]
1213
mod tests;
1314

15+
// DEV(kate); the backend metrics structure where body data is introduced.
1416
#[derive(Debug)]
1517
pub struct RouteBackendMetrics<L: StreamLabel> {
1618
requests: RequestCountFamilies<labels::RouteBackend>,
1719
responses: ResponseMetrics<L>,
20+
body_metrics: ResponseMetricsFamilies<labels::RouteBackend>, // DEV(kate); rename this `ResponseBodyFamilies`
1821
}
1922

2023
type ResponseMetrics<L> = record_response::ResponseMetrics<
@@ -26,14 +29,24 @@ pub fn layer<T, N>(
2629
metrics: &RouteBackendMetrics<T::StreamLabel>,
2730
) -> impl svc::Layer<
2831
N,
29-
Service = NewCountRequests<
30-
ExtractRequestCount,
31-
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
32+
Service = NewRecordBodyData<
33+
ExtractRecordBodyDataParams,
34+
NewCountRequests<
35+
ExtractRequestCount,
36+
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
37+
>,
3238
>,
3339
> + Clone
3440
where
3541
T: MkStreamLabel,
3642
N: svc::NewService<T>,
43+
NewRecordBodyData<
44+
ExtractRecordBodyDataParams,
45+
NewCountRequests<
46+
ExtractRequestCount,
47+
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
48+
>,
49+
>: svc::NewService<T>,
3750
NewCountRequests<
3851
ExtractRequestCount,
3952
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
@@ -44,28 +57,37 @@ where
4457
let RouteBackendMetrics {
4558
requests,
4659
responses,
60+
body_metrics,
4761
} = metrics.clone();
62+
4863
svc::layer::mk(move |inner| {
4964
use svc::Layer;
50-
NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer(
51-
NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone()))
52-
.layer(inner),
65+
NewRecordBodyData::layer_via(ExtractRecordBodyDataParams(body_metrics.clone())).layer(
66+
NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer(
67+
NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone()))
68+
.layer(inner),
69+
),
5370
)
5471
})
5572
}
5673

5774
#[derive(Clone, Debug)]
5875
pub struct ExtractRequestCount(RequestCountFamilies<labels::RouteBackend>);
5976

77+
#[derive(Clone, Debug)]
78+
pub struct ExtractRecordBodyDataParams(ResponseMetricsFamilies<labels::RouteBackend>);
79+
6080
// === impl RouteBackendMetrics ===
6181

6282
impl<L: StreamLabel> RouteBackendMetrics<L> {
6383
pub fn register(reg: &mut prom::Registry, histo: impl IntoIterator<Item = f64>) -> Self {
6484
let requests = RequestCountFamilies::register(reg);
6585
let responses = record_response::ResponseMetrics::register(reg, histo);
86+
let body_metrics = ResponseMetricsFamilies::register(reg);
6687
Self {
6788
requests,
6889
responses,
90+
body_metrics,
6991
}
7092
}
7193

@@ -83,13 +105,22 @@ impl<L: StreamLabel> RouteBackendMetrics<L> {
83105
pub(crate) fn get_statuses(&self, l: &L::StatusLabels) -> prom::Counter {
84106
self.responses.get_statuses(l)
85107
}
108+
109+
#[cfg(test)]
110+
pub(crate) fn get_response_body_metrics(
111+
&self,
112+
l: &labels::RouteBackend,
113+
) -> linkerd_http_prom::body_data::response::BodyDataMetrics {
114+
self.body_metrics.get(l)
115+
}
86116
}
87117

88118
impl<L: StreamLabel> Default for RouteBackendMetrics<L> {
89119
fn default() -> Self {
90120
Self {
91121
requests: Default::default(),
92122
responses: Default::default(),
123+
body_metrics: Default::default(),
93124
}
94125
}
95126
}
@@ -99,6 +130,7 @@ impl<L: StreamLabel> Clone for RouteBackendMetrics<L> {
99130
Self {
100131
requests: self.requests.clone(),
101132
responses: self.responses.clone(),
133+
body_metrics: self.body_metrics.clone(),
102134
}
103135
}
104136
}
@@ -114,3 +146,17 @@ where
114146
.metrics(&labels::RouteBackend(t.param(), t.param(), t.param()))
115147
}
116148
}
149+
150+
// === impl ExtractRecordBodyDataParams ===
151+
152+
impl<T> svc::ExtractParam<BodyDataMetrics, T> for ExtractRecordBodyDataParams
153+
where
154+
T: svc::Param<ParentRef> + svc::Param<RouteRef> + svc::Param<BackendRef>,
155+
{
156+
fn extract_param(&self, t: &T) -> BodyDataMetrics {
157+
let Self(families) = self;
158+
let labels = labels::RouteBackend(t.param(), t.param(), t.param());
159+
160+
families.get(&labels)
161+
}
162+
}

linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ use super::{
55
LabelGrpcRouteBackendRsp, LabelHttpRouteBackendRsp, RouteBackendMetrics,
66
};
77
use crate::http::{concrete, logical::Concrete};
8+
use bytes::Buf;
89
use linkerd_app_core::{
910
svc::{self, http::BoxBody, Layer, NewService},
1011
transport::{Remote, ServerAddr},
12+
Error,
1113
};
1214
use linkerd_proxy_client_policy as policy;
1315

@@ -116,6 +118,120 @@ async fn http_request_statuses() {
116118
assert_eq!(mixed.get(), 1);
117119
}
118120

121+
// DEV(kate); test response body
122+
#[tokio::test(flavor = "current_thread", start_paused = true)]
123+
async fn body_data_layer_records_frames() -> Result<(), Error> {
124+
use http_body::Body;
125+
use linkerd_app_core::proxy::http;
126+
use linkerd_http_prom::body_data::response::BodyDataMetrics;
127+
use tower::{Service, ServiceExt};
128+
129+
let _trace = linkerd_tracing::test::trace_init();
130+
131+
let metrics = super::RouteBackendMetrics::default();
132+
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
133+
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
134+
let backend_ref = crate::BackendRef(policy::Meta::new_default("backend"));
135+
136+
let (mut svc, mut handle) =
137+
mock_http_route_backend_metrics(&metrics, &parent_ref, &route_ref, &backend_ref);
138+
handle.allow(1);
139+
140+
// Create a request.
141+
let req = {
142+
let empty = hyper::Body::empty();
143+
let body = BoxBody::new(empty);
144+
http::Request::builder().body(body).unwrap()
145+
};
146+
147+
// Call the service once it is ready to accept a request.
148+
tracing::info!("calling service");
149+
svc.ready().await.expect("ready");
150+
let call = svc.call(req);
151+
let (_, send_resp) = handle.next_request().await.unwrap();
152+
153+
// Acquire the counters for this backend.
154+
tracing::info!("acquiring response body metrics");
155+
let labels = labels::RouteBackend(parent_ref.clone(), route_ref.clone(), backend_ref.clone());
156+
let BodyDataMetrics {
157+
frames_total,
158+
frames_bytes,
159+
} = metrics.get_response_body_metrics(&labels);
160+
161+
// Before we've sent a response, the counter should be zero.
162+
assert_eq!(frames_total.get(), 0);
163+
assert_eq!(frames_bytes.get(), 0);
164+
165+
// Create a response whose body is backed by a channel that we can send chunks to, send it.
166+
tracing::info!("sending response");
167+
let mut resp_tx = {
168+
let (tx, body) = hyper::Body::channel();
169+
let body = BoxBody::new(body);
170+
let resp = http::Response::builder()
171+
.status(http::StatusCode::IM_A_TEAPOT)
172+
.body(body)
173+
.unwrap();
174+
send_resp.send_response(resp);
175+
tx
176+
};
177+
178+
// Before we've sent any bytes, the counter should be zero.
179+
assert_eq!(frames_total.get(), 0);
180+
assert_eq!(frames_bytes.get(), 0);
181+
182+
// On the client end, poll our call future and await the response.
183+
tracing::info!("polling service future");
184+
let (parts, body) = call.await?.into_parts();
185+
assert_eq!(parts.status, 418);
186+
187+
let mut body = Box::pin(body);
188+
189+
/// Returns the next chunk from a boxed body.
190+
async fn read_chunk(body: &mut std::pin::Pin<Box<BoxBody>>) -> Result<Vec<u8>, Error> {
191+
use std::task::{Context, Poll};
192+
let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref());
193+
let data = match body.as_mut().poll_data(&mut ctx) {
194+
Poll::Ready(Some(Ok(d))) => d,
195+
_ => panic!("next chunk should be ready"),
196+
};
197+
let chunk = data.chunk().to_vec();
198+
Ok(chunk)
199+
}
200+
201+
{
202+
// Send a chunk, confirm that our counters are incremented.
203+
tracing::info!("sending first chunk");
204+
resp_tx.send_data("hello".into()).await?;
205+
let chunk = read_chunk(&mut body).await?;
206+
debug_assert_eq!("hello".as_bytes(), chunk, "should get same value back out");
207+
assert_eq!(frames_total.get(), 1);
208+
}
209+
210+
{
211+
// Send another chunk, confirm that our counters are incremented once more.
212+
tracing::info!("sending second chunk");
213+
resp_tx.send_data("world".into()).await?;
214+
let chunk = read_chunk(&mut body).await?;
215+
debug_assert_eq!("world".as_bytes(), chunk, "should get same value back out");
216+
assert_eq!(frames_total.get(), 2);
217+
}
218+
219+
{
220+
// Close the body, show that the counters remain at the same values.
221+
use std::task::{Context, Poll};
222+
tracing::info!("closing response body");
223+
drop(resp_tx);
224+
let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref());
225+
match body.as_mut().poll_data(&mut ctx) {
226+
Poll::Ready(None) => {}
227+
_ => panic!("got unexpected poll result"),
228+
};
229+
assert_eq!(frames_total.get(), 2);
230+
}
231+
232+
Ok(())
233+
}
234+
119235
#[tokio::test(flavor = "current_thread", start_paused = true)]
120236
async fn grpc_request_statuses_ok() {
121237
let _trace = linkerd_tracing::test::trace_init();
@@ -279,6 +395,7 @@ async fn grpc_request_statuses_error_body() {
279395

280396
// === Util ===
281397

398+
// DEV(kate); route backend mock
282399
fn mock_http_route_backend_metrics(
283400
metrics: &RouteBackendMetrics<LabelHttpRouteBackendRsp>,
284401
parent_ref: &crate::ParentRef,

linkerd/http/prom/src/body_data.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pub mod request;
2+
pub mod response;
3+
4+
mod body;
5+
mod metrics;
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use super::metrics::BodyDataMetrics;
2+
use http::HeaderMap;
3+
use http_body::SizeHint;
4+
use pin_project::pin_project;
5+
use std::{
6+
pin::Pin,
7+
task::{Context, Poll},
8+
};
9+
10+
/// An instrumented body.
11+
#[pin_project]
12+
pub struct Body<B> {
13+
/// The inner body.
14+
#[pin]
15+
inner: B,
16+
/// Metrics with which the inner body will be instrumented.
17+
metrics: BodyDataMetrics,
18+
}
19+
20+
impl<B> Body<B> {
21+
/// Returns a new, instrumented body.
22+
pub(crate) fn new(body: B, metrics: BodyDataMetrics) -> Self {
23+
Self {
24+
inner: body,
25+
metrics,
26+
}
27+
}
28+
}
29+
30+
impl<B> http_body::Body for Body<B>
31+
where
32+
B: http_body::Body,
33+
{
34+
type Data = B::Data;
35+
type Error = B::Error;
36+
37+
/// Attempt to pull out the next data buffer of this stream.
38+
fn poll_data(
39+
self: Pin<&mut Self>,
40+
cx: &mut Context<'_>,
41+
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
42+
let this = self.project();
43+
let inner = this.inner;
44+
let BodyDataMetrics {
45+
frames_total,
46+
frames_bytes: _, // DEV(kate); count bytes in body chunks.
47+
} = this.metrics;
48+
49+
let data = std::task::ready!(inner.poll_data(cx));
50+
51+
if let Some(Ok(_)) = data.as_ref() {
52+
frames_total.inc();
53+
}
54+
55+
Poll::Ready(data)
56+
}
57+
58+
fn poll_trailers(
59+
self: Pin<&mut Self>,
60+
cx: &mut Context<'_>,
61+
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
62+
self.project().inner.poll_trailers(cx)
63+
}
64+
65+
fn is_end_stream(&self) -> bool {
66+
self.inner.is_end_stream()
67+
}
68+
69+
fn size_hint(&self) -> SizeHint {
70+
self.inner.size_hint()
71+
}
72+
}

0 commit comments

Comments
 (0)