Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.d/24650_buffer-utilization-metrics.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Fixed recording of buffer utilization metrics to properly record on both send
and receive in order to reflect the actual level and not just the "full" level.

authors: bruceg
116 changes: 87 additions & 29 deletions lib/vector-buffers/src/topology/channel/limited_queue.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
cmp, fmt,
fmt::Debug,
cmp,
fmt::{self, Debug},
num::NonZeroUsize,
pin::Pin,
sync::{
Arc,
Expand Down Expand Up @@ -200,6 +201,7 @@ struct Inner<T> {
limiter: Arc<Semaphore>,
read_waker: Arc<Notify>,
metrics: Option<Metrics>,
capacity: NonZeroUsize,
}

impl<T> Clone for Inner<T> {
Expand All @@ -210,11 +212,12 @@ impl<T> Clone for Inner<T> {
limiter: self.limiter.clone(),
read_waker: self.read_waker.clone(),
metrics: self.metrics.clone(),
capacity: self.capacity,
}
}
}

impl<T: InMemoryBufferable> Inner<T> {
impl<T: Send + Sync + Debug + 'static> Inner<T> {
fn new(
limit: MemoryBufferSize,
metric_metadata: Option<ChannelMetricMetadata>,
Expand All @@ -229,31 +232,56 @@ impl<T: InMemoryBufferable> Inner<T> {
limiter: Arc::new(Semaphore::new(max_events.get())),
read_waker,
metrics,
capacity: max_events,
},
MemoryBufferSize::MaxSize(max_bytes) => Inner {
data: Arc::new(SegQueue::new()),
limit,
limiter: Arc::new(Semaphore::new(max_bytes.get())),
read_waker,
metrics,
capacity: max_bytes,
},
}
}

/// Records a send after acquiring all required permits.
///
/// The `total` value represents the channel utilization after this send completes. It may be
/// greater than the configured limit because the channel intentionally allows a single
/// oversized payload to flow through rather than forcing the sender to split it.
fn send_with_permit(&mut self, total: usize, permits: OwnedSemaphorePermit, item: T) {
/// The `size` value is the true utilization contribution of `item`, which may exceed the number
/// of permits acquired for oversized payloads.
fn send_with_permits(&mut self, size: usize, permits: OwnedSemaphorePermit, item: T) {
if let Some(metrics) = &self.metrics {
// For normal items, capacity - available_permits() exactly represents the total queued
// utilization (including this item's just-acquired permits). For oversized items that
// acquired fewer permits than their true size, `size` is the correct utilization since
// the queue must have been empty for the oversized acquire to succeed.
let utilization = size.max(self.used_capacity());
metrics.record(utilization);
}
self.data.push((permits, item));
self.read_waker.notify_one();
// Due to the race between getting the available capacity, acquiring the permits, and the
// above push, the total may be inaccurate. Record it anyways as the histogram totals will
// _eventually_ converge on a true picture of the buffer utilization.
if let Some(metrics) = self.metrics.as_ref() {
metrics.record(total);
}
}
}

impl<T> Inner<T> {
fn used_capacity(&self) -> usize {
self.capacity.get() - self.limiter.available_permits()
}

fn pop_and_record(&self) -> Option<T> {
self.data.pop().map(|(permit, item)| {
if let Some(metrics) = &self.metrics {
// Compute remaining utilization from the semaphore state. Since our permits haven't
// been released yet, used_capacity is stable against racing senders acquiring those
// permits.
let utilization = self.used_capacity().saturating_sub(permit.num_permits());
metrics.record(utilization);
}
// Release permits after recording so a waiting sender cannot enqueue a new item
// before this pop's utilization measurement is taken.
drop(permit);
item
})
}
}

Expand All @@ -265,16 +293,16 @@ pub struct LimitedSender<T> {

impl<T: InMemoryBufferable> LimitedSender<T> {
#[allow(clippy::cast_possible_truncation)]
fn calc_required_permits(&self, item: &T) -> (usize, usize, u32) {
fn calc_required_permits(&self, item: &T) -> (usize, u32) {
// We have to limit the number of permits we ask for to the overall limit since we're always
// willing to store more items than the limit if the queue is entirely empty, because
// otherwise we might deadlock ourselves by not being able to send a single item.
let (limit, value) = match self.inner.limit {
MemoryBufferSize::MaxSize(max_size) => (max_size, item.allocated_bytes()),
MemoryBufferSize::MaxEvents(max_events) => (max_events, item.event_count()),
let value = match self.inner.limit {
MemoryBufferSize::MaxSize(_) => item.allocated_bytes(),
MemoryBufferSize::MaxEvents(_) => item.event_count(),
};
let limit = limit.get();
(limit, value, cmp::min(limit, value) as u32)
let limit = self.inner.capacity.get();
(value, cmp::min(limit, value) as u32)
}

/// Gets the number of items that this channel could accept.
Expand All @@ -290,8 +318,7 @@ impl<T: InMemoryBufferable> LimitedSender<T> {
/// with the given `item`.
pub async fn send(&mut self, item: T) -> Result<(), SendError<T>> {
// Calculate how many permits we need, and wait until we can acquire all of them.
let (limit, count, permits_required) = self.calc_required_permits(&item);
let in_use = limit.saturating_sub(self.available_capacity());
let (size, permits_required) = self.calc_required_permits(&item);
match self
.inner
.limiter
Expand All @@ -300,7 +327,7 @@ impl<T: InMemoryBufferable> LimitedSender<T> {
.await
{
Ok(permits) => {
self.inner.send_with_permit(in_use + count, permits, item);
self.inner.send_with_permits(size, permits, item);
trace!("Sent item.");
Ok(())
}
Expand All @@ -322,16 +349,15 @@ impl<T: InMemoryBufferable> LimitedSender<T> {
/// Will panic if adding ack amount overflows.
pub fn try_send(&mut self, item: T) -> Result<(), TrySendError<T>> {
// Calculate how many permits we need, and try to acquire them all without waiting.
let (limit, count, permits_required) = self.calc_required_permits(&item);
let in_use = limit.saturating_sub(self.available_capacity());
let (size, permits_required) = self.calc_required_permits(&item);
match self
.inner
.limiter
.clone()
.try_acquire_many_owned(permits_required)
{
Ok(permits) => {
self.inner.send_with_permit(in_use + count, permits, item);
self.inner.send_with_permits(size, permits, item);
trace!("Attempt to send item succeeded.");
Ok(())
}
Expand Down Expand Up @@ -375,7 +401,7 @@ impl<T: Send + 'static> LimitedReceiver<T> {

pub async fn next(&mut self) -> Option<T> {
loop {
if let Some((_permit, item)) = self.inner.data.pop() {
if let Some(item) = self.inner.pop_and_record() {
return Some(item);
}

Expand Down Expand Up @@ -471,7 +497,7 @@ mod tests {
}

#[tokio::test]
async fn records_utilization_on_send() {
async fn records_utilization() {
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
let (mut tx, mut rx) = limited(
limit,
Expand All @@ -482,9 +508,41 @@ mod tests {
let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();

tx.send(Sample::new(1)).await.expect("send should succeed");
assert_eq!(metrics.lock().unwrap().last().copied(), Some(1));
let records = metrics.lock().unwrap().clone();
assert_eq!(records.len(), 1);
assert_eq!(records.last().copied(), Some(1));

assert_eq!(Sample::new(1), rx.next().await.unwrap());
let records = metrics.lock().unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records.last().copied(), Some(0));
}

#[tokio::test]
async fn oversized_send_records_true_utilization_via_normal_send_path() {
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
let (mut tx, mut rx) = limited(
limit,
Some(ChannelMetricMetadata::new("test_channel_oversized", None)),
None,
);
let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();

// Normal send path: permits are capped to the limit (2), but utilization should reflect
// the true item contribution (3).
let oversized = MultiEventRecord::new(3);
tx.send(oversized.clone())
.await
.expect("send should succeed");

let records = metrics.lock().unwrap().clone();
assert_eq!(records.len(), 1);
assert_eq!(records.last().copied(), Some(3));

let _ = rx.next().await;
assert_eq!(Some(oversized), rx.next().await);
let records = metrics.lock().unwrap().clone();
assert_eq!(records.len(), 2);
assert_eq!(records.last().copied(), Some(0));
}

#[test]
Expand Down
15 changes: 11 additions & 4 deletions lib/vector-core/src/source_sender/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,11 @@ fn assert_counter_metric(metrics: &[Metric], name: &str, expected: f64) {
}

#[tokio::test]
#[expect(clippy::cast_precision_loss)]
async fn emits_buffer_utilization_histogram_on_send_and_receive() {
const BUFFER_SIZE: usize = 2;

metrics::init_test();
let buffer_size = 2;
let (mut sender, mut recv) = SourceSender::new_test_sender_with_options(buffer_size, None);
let (mut sender, mut recv) = SourceSender::new_test_sender_with_options(BUFFER_SIZE, None);

let event = Event::Log(LogEvent::from("test event"));
sender
Expand All @@ -256,10 +256,17 @@ async fn emits_buffer_utilization_histogram_on_send_and_receive() {
.await
.expect("second send succeeds");

assert_buffer_metrics(BUFFER_SIZE, 2);

// Drain the channel so both the send and receive paths are exercised.
assert!(recv.next().await.is_some());
assert!(recv.next().await.is_some());

assert_buffer_metrics(BUFFER_SIZE, 0);
}

#[expect(clippy::cast_precision_loss)]
fn assert_buffer_metrics(buffer_size: usize, level: usize) {
let metrics: Vec<_> = Controller::get()
.expect("metrics controller available")
.capture_metrics()
Expand All @@ -283,7 +290,7 @@ async fn emits_buffer_utilization_histogram_on_send_and_receive() {
let MetricValue::Gauge { value } = metric.value() else {
panic!("source_buffer_utilization_level should be a gauge");
};
assert_eq!(*value, 2.0);
assert_eq!(*value, level as f64);

let metric = find_metric("source_buffer_max_event_size");
let MetricValue::Gauge { value } = metric.value() else {
Expand Down
Loading