Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
capture: add broker rtt latency and timeout metrics (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored May 29, 2024
1 parent f71dc08 commit ff0780c
Showing 1 changed file with 34 additions and 7 deletions.
41 changes: 34 additions & 7 deletions capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,43 +36,70 @@ impl rdkafka::ClientContext for KafkaContext {
for (topic, stats) in stats.topics {
gauge!(
"capture_kafka_produce_avg_batch_size_bytes",
"topic" => topic.clone()
"topic" => topic.clone()
)
.set(stats.batchsize.avg as f64);
gauge!(
"capture_kafka_produce_avg_batch_size_events",

"topic" => topic
)
.set(stats.batchcnt.avg as f64);
}

for (_, stats) in stats.brokers {
let id_string = format!("{}", stats.nodeid);
if let Some(rtt) = stats.rtt {
gauge!(
"capture_kafka_produce_rtt_latency_ms",
"quantile" => "p50",
"broker" => id_string.clone()
)
.set(rtt.p50 as f64);
gauge!(
"capture_kafka_produce_rtt_latency_ms",
"quantile" => "p90",
"broker" => id_string.clone()
)
.set(rtt.p90 as f64);
gauge!(
"capture_kafka_produce_rtt_latency_ms",
"quantile" => "p95",
"broker" => id_string.clone()
)
.set(rtt.p95 as f64);
gauge!(
"capture_kafka_produce_rtt_latency_ms",
"quantile" => "p99",
"broker" => id_string.clone()
)
.set(rtt.p99 as f64);
}

gauge!(
"capture_kafka_broker_requests_pending",

"broker" => id_string.clone()
)
.set(stats.outbuf_cnt as f64);
gauge!(
"capture_kafka_broker_responses_awaiting",

"broker" => id_string.clone()
)
.set(stats.waitresp_cnt as f64);
counter!(
"capture_kafka_broker_tx_errors_total",

"broker" => id_string.clone()
)
.absolute(stats.txerrs);
counter!(
"capture_kafka_broker_rx_errors_total",

"broker" => id_string
"broker" => id_string.clone()
)
.absolute(stats.rxerrs);
counter!(
"capture_kafka_broker_request_timeouts",
"broker" => id_string
)
.absolute(stats.req_timeouts);
}
}
}
Expand Down

0 comments on commit ff0780c

Please sign in to comment.