Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.d/24650_buffer-utilization-metrics.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Fixed recording of buffer utilization metrics to properly record on both send
and receive in order to reflect the actual level and not just the "full" level.

authors: bruceg
116 changes: 87 additions & 29 deletions lib/vector-buffers/src/topology/channel/limited_queue.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
cmp, fmt,
fmt::Debug,
cmp,
fmt::{self, Debug},
num::NonZeroUsize,
pin::Pin,
sync::{
Arc,
Expand Down Expand Up @@ -200,6 +201,7 @@ struct Inner<T> {
limiter: Arc<Semaphore>,
read_waker: Arc<Notify>,
metrics: Option<Metrics>,
capacity: NonZeroUsize,
}

impl<T> Clone for Inner<T> {
Expand All @@ -210,11 +212,12 @@ impl<T> Clone for Inner<T> {
limiter: self.limiter.clone(),
read_waker: self.read_waker.clone(),
metrics: self.metrics.clone(),
capacity: self.capacity,
}
}
}

impl<T: InMemoryBufferable> Inner<T> {
impl<T: Send + Sync + Debug + 'static> Inner<T> {
fn new(
limit: MemoryBufferSize,
metric_metadata: Option<ChannelMetricMetadata>,
Expand All @@ -229,31 +232,56 @@ impl<T: InMemoryBufferable> Inner<T> {
limiter: Arc::new(Semaphore::new(max_events.get())),
read_waker,
metrics,
capacity: max_events,
},
MemoryBufferSize::MaxSize(max_bytes) => Inner {
data: Arc::new(SegQueue::new()),
limit,
limiter: Arc::new(Semaphore::new(max_bytes.get())),
read_waker,
metrics,
capacity: max_bytes,
},
}
}

/// Records a send after acquiring all required permits.
///
/// The `total` value represents the channel utilization after this send completes. It may be
/// greater than the configured limit because the channel intentionally allows a single
/// oversized payload to flow through rather than forcing the sender to split it.
fn send_with_permit(&mut self, total: usize, permits: OwnedSemaphorePermit, item: T) {
/// The `size` value is the true utilization contribution of `item`, which may exceed the number
/// of permits acquired for oversized payloads.
fn send_with_permits(&mut self, size: usize, permits: OwnedSemaphorePermit, item: T) {
if let Some(metrics) = &self.metrics {
// For normal items, capacity - available_permits() exactly represents the total queued
// utilization (including this item's just-acquired permits). For oversized items that
// acquired fewer permits than their true size, `size` is the correct utilization since
// the queue must have been empty for the oversized acquire to succeed.
let utilization = size.max(self.used_capacity());
metrics.record(utilization);
}
self.data.push((permits, item));
self.read_waker.notify_one();
// Due to the race between getting the available capacity, acquiring the permits, and the
// above push, the total may be inaccurate. Record it anyways as the histogram totals will
// _eventually_ converge on a true picture of the buffer utilization.
if let Some(metrics) = self.metrics.as_ref() {
metrics.record(total);
}
}
}

impl<T> Inner<T> {
fn used_capacity(&self) -> usize {
self.capacity.get() - self.limiter.available_permits()
}

fn pop_and_record(&self) -> Option<T> {
self.data.pop().map(|(permit, item)| {
if let Some(metrics) = &self.metrics {
// Compute remaining utilization from the semaphore state. Since our permits haven't
// been released yet, used_capacity is stable against racing senders acquiring those
// permits.
let utilization = self.used_capacity() - permit.num_permits();
metrics.record(utilization);
}
// Release permits after recording so a waiting sender cannot enqueue a new item
// before this pop's utilization measurement is taken.
drop(permit);
item
})
}
}

Expand All @@ -265,16 +293,16 @@ pub struct LimitedSender<T> {

impl<T: InMemoryBufferable> LimitedSender<T> {
#[allow(clippy::cast_possible_truncation)]
fn calc_required_permits(&self, item: &T) -> (usize, usize, u32) {
fn calc_required_permits(&self, item: &T) -> (usize, u32) {
// We have to limit the number of permits we ask for to the overall limit since we're always
// willing to store more items than the limit if the queue is entirely empty, because
// otherwise we might deadlock ourselves by not being able to send a single item.
let (limit, value) = match self.inner.limit {
MemoryBufferSize::MaxSize(max_size) => (max_size, item.allocated_bytes()),
MemoryBufferSize::MaxEvents(max_events) => (max_events, item.event_count()),
let value = match self.inner.limit {
MemoryBufferSize::MaxSize(_) => item.allocated_bytes(),
MemoryBufferSize::MaxEvents(_) => item.event_count(),
};
let limit = limit.get();
(limit, value, cmp::min(limit, value) as u32)
let limit = self.inner.capacity.get();
(value, cmp::min(limit, value) as u32) as (usize, u32)
}

/// Gets the number of items that this channel could accept.
Expand All @@ -290,8 +318,7 @@ impl<T: InMemoryBufferable> LimitedSender<T> {
/// with the given `item`.
pub async fn send(&mut self, item: T) -> Result<(), SendError<T>> {
// Calculate how many permits we need, and wait until we can acquire all of them.
let (limit, count, permits_required) = self.calc_required_permits(&item);
let in_use = limit.saturating_sub(self.available_capacity());
let (size, permits_required) = self.calc_required_permits(&item);
match self
.inner
.limiter
Expand All @@ -300,7 +327,7 @@ impl<T: InMemoryBufferable> LimitedSender<T> {
.await
{
Ok(permits) => {
self.inner.send_with_permit(in_use + count, permits, item);
self.inner.send_with_permits(size, permits, item);
trace!("Sent item.");
Ok(())
}
Expand All @@ -322,16 +349,15 @@ impl<T: InMemoryBufferable> LimitedSender<T> {
/// Will panic if adding ack amount overflows.
pub fn try_send(&mut self, item: T) -> Result<(), TrySendError<T>> {
// Calculate how many permits we need, and try to acquire them all without waiting.
let (limit, count, permits_required) = self.calc_required_permits(&item);
let in_use = limit.saturating_sub(self.available_capacity());
let (size, permits_required) = self.calc_required_permits(&item);
match self
.inner
.limiter
.clone()
.try_acquire_many_owned(permits_required)
{
Ok(permits) => {
self.inner.send_with_permit(in_use + count, permits, item);
self.inner.send_with_permits(size, permits, item);
trace!("Attempt to send item succeeded.");
Ok(())
}
Expand Down Expand Up @@ -375,7 +401,7 @@ impl<T: Send + 'static> LimitedReceiver<T> {

pub async fn next(&mut self) -> Option<T> {
loop {
if let Some((_permit, item)) = self.inner.data.pop() {
if let Some(item) = self.inner.pop_and_record() {
return Some(item);
}

Expand Down Expand Up @@ -471,7 +497,7 @@ mod tests {
}

#[tokio::test]
async fn records_utilization_on_send() {
async fn records_utilization() {
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
let (mut tx, mut rx) = limited(
limit,
Expand All @@ -482,9 +508,41 @@ mod tests {
let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();

tx.send(Sample::new(1)).await.expect("send should succeed");
assert_eq!(metrics.lock().unwrap().last().copied(), Some(1));
let records = metrics.lock().unwrap().clone();
assert_eq!(records.len(), 1);
assert_eq!(records.last().copied(), Some(1));

assert_eq!(Sample::new(1), rx.next().await.unwrap());
let records = metrics.lock().unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records.last().copied(), Some(0));
}

#[tokio::test]
async fn oversized_send_records_true_utilization_via_normal_send_path() {
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
let (mut tx, mut rx) = limited(
limit,
Some(ChannelMetricMetadata::new("test_channel_oversized", None)),
None,
);
let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();

// Normal send path: permits are capped to the limit (2), but utilization should reflect
// the true item contribution (3).
let oversized = MultiEventRecord::new(3);
tx.send(oversized.clone())
.await
.expect("send should succeed");

let records = metrics.lock().unwrap().clone();
assert_eq!(records.len(), 1);
assert_eq!(records.last().copied(), Some(3));

let _ = rx.next().await;
assert_eq!(Some(oversized), rx.next().await);
let records = metrics.lock().unwrap().clone();
assert_eq!(records.len(), 2);
assert_eq!(records.last().copied(), Some(0));
}

#[test]
Expand Down
15 changes: 11 additions & 4 deletions lib/vector-core/src/source_sender/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,11 @@ fn assert_counter_metric(metrics: &[Metric], name: &str, expected: f64) {
}

#[tokio::test]
#[expect(clippy::cast_precision_loss)]
async fn emits_buffer_utilization_histogram_on_send_and_receive() {
const BUFFER_SIZE: usize = 2;

metrics::init_test();
let buffer_size = 2;
let (mut sender, mut recv) = SourceSender::new_test_sender_with_options(buffer_size, None);
let (mut sender, mut recv) = SourceSender::new_test_sender_with_options(BUFFER_SIZE, None);

let event = Event::Log(LogEvent::from("test event"));
sender
Expand All @@ -256,10 +256,17 @@ async fn emits_buffer_utilization_histogram_on_send_and_receive() {
.await
.expect("second send succeeds");

assert_buffer_metrics(BUFFER_SIZE, 2);

// Drain the channel so both the send and receive paths are exercised.
assert!(recv.next().await.is_some());
assert!(recv.next().await.is_some());

assert_buffer_metrics(BUFFER_SIZE, 0);
}

#[expect(clippy::cast_precision_loss)]
fn assert_buffer_metrics(buffer_size: usize, level: usize) {
let metrics: Vec<_> = Controller::get()
.expect("metrics controller available")
.capture_metrics()
Expand All @@ -283,7 +290,7 @@ async fn emits_buffer_utilization_histogram_on_send_and_receive() {
let MetricValue::Gauge { value } = metric.value() else {
panic!("source_buffer_utilization_level should be a gauge");
};
assert_eq!(*value, 2.0);
assert_eq!(*value, level as f64);

let metric = find_metric("source_buffer_max_event_size");
let MetricValue::Gauge { value } = metric.value() else {
Expand Down
Loading