Skip to content

Commit

Permalink
fix(utilization_metric): run a separate task for utilization to ensur…
Browse files Browse the repository at this point in the history
…e it is regularly published

This adds a separate task that runs periodically to emit utilization metrics and collect messages
from components that need their utilization metrics calculated. This ensures that utilization metric
is published even when no events are running through a component.

Fixes: vectordotdev#20216
  • Loading branch information
esensar committed Dec 21, 2024
1 parent 7c6d0c9 commit 95886f8
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 70 deletions.
99 changes: 81 additions & 18 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use futures::{stream::FuturesOrdered, FutureExt, StreamExt, TryStreamExt};
use futures_util::stream::FuturesUnordered;
use metrics::gauge;
use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
use tokio::{
select,
Expand Down Expand Up @@ -51,7 +52,7 @@ use crate::{
spawn_named,
topology::task::TaskError,
transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
utilization::wrap,
utilization::{wrap, UtilizationEmitter, UtilizationTimerMessage},
SourceSender,
};

Expand Down Expand Up @@ -84,6 +85,7 @@ struct Builder<'a> {
healthchecks: HashMap<ComponentKey, Task>,
detach_triggers: HashMap<ComponentKey, Trigger>,
extra_context: ExtraContext,
utilization_emitter: UtilizationEmitter,
}

impl<'a> Builder<'a> {
Expand All @@ -105,6 +107,7 @@ impl<'a> Builder<'a> {
healthchecks: HashMap::new(),
detach_triggers: HashMap::new(),
extra_context,
utilization_emitter: UtilizationEmitter::new(),
}
}

Expand All @@ -128,6 +131,7 @@ impl<'a> Builder<'a> {
healthchecks: self.healthchecks,
shutdown_coordinator: self.shutdown_coordinator,
detach_triggers: self.detach_triggers,
utilization_emitter: Some(self.utilization_emitter),
})
} else {
Err(self.errors)
Expand Down Expand Up @@ -497,7 +501,7 @@ impl<'a> Builder<'a> {

let (transform_task, transform_outputs) = {
let _span = span.enter();
build_transform(transform, node, input_rx)
build_transform(transform, node, input_rx, &mut self.utilization_emitter)
};

self.outputs.extend(transform_outputs);
Expand All @@ -506,6 +510,7 @@ impl<'a> Builder<'a> {
}

async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
let utilization_sender = self.utilization_emitter.get_sender();
for (key, sink) in self
.config
.sinks()
Expand Down Expand Up @@ -585,6 +590,10 @@ impl<'a> Builder<'a> {

let (trigger, tripwire) = Tripwire::new();

self.utilization_emitter
.add_component(key.clone(), gauge!("utilization"));
let utilization_sender = utilization_sender.clone();
let component_key = key.clone();
let sink = async move {
debug!("Sink starting.");

Expand All @@ -600,7 +609,7 @@ impl<'a> Builder<'a> {
.take()
.expect("Task started but input has been taken.");

let mut rx = wrap(rx);
let mut rx = wrap(utilization_sender, component_key.clone(), rx);

let events_received = register!(EventsReceived);
sink.run(
Expand Down Expand Up @@ -682,6 +691,7 @@ pub struct TopologyPieces {
pub(super) healthchecks: HashMap<ComponentKey, Task>,
pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
pub(crate) utilization_emitter: Option<UtilizationEmitter>,
}

impl TopologyPieces {
Expand Down Expand Up @@ -760,18 +770,22 @@ fn build_transform(
transform: Transform,
node: TransformNode,
input_rx: BufferReceiver<EventArray>,
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
match transform {
// TODO: avoid the double boxing for function transforms here
Transform::Function(t) => build_sync_transform(Box::new(t), node, input_rx),
Transform::Synchronous(t) => build_sync_transform(t, node, input_rx),
Transform::Function(t) => {
build_sync_transform(Box::new(t), node, input_rx, utilization_emitter)
}
Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_emitter),
Transform::Task(t) => build_task_transform(
t,
input_rx,
node.input_details.data_type(),
node.typetag,
&node.key,
&node.outputs,
utilization_emitter,
),
}
}
Expand All @@ -780,10 +794,19 @@ fn build_sync_transform(
t: Box<dyn SyncTransform>,
node: TransformNode,
input_rx: BufferReceiver<EventArray>,
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);

let runner = Runner::new(t, input_rx, node.input_details.data_type(), outputs);
utilization_emitter.add_component(node.key.clone(), gauge!("utilization"));
let runner = Runner::new(
t,
input_rx,
utilization_emitter.get_sender(),
node.key.clone(),
node.input_details.data_type(),
outputs,
);
let transform = if node.enable_concurrency {
runner.run_concurrently().boxed()
} else {
Expand Down Expand Up @@ -823,15 +846,17 @@ struct Runner {
input_rx: Option<BufferReceiver<EventArray>>,
input_type: DataType,
outputs: TransformOutputs,
timer: crate::utilization::Timer,
last_report: Instant,
key: ComponentKey,
timer_tx: UnboundedSender<UtilizationTimerMessage>,
events_received: Registered<EventsReceived>,
}

impl Runner {
fn new(
transform: Box<dyn SyncTransform>,
input_rx: BufferReceiver<EventArray>,
timer_tx: UnboundedSender<UtilizationTimerMessage>,
key: ComponentKey,
input_type: DataType,
outputs: TransformOutputs,
) -> Self {
Expand All @@ -840,17 +865,22 @@ impl Runner {
input_rx: Some(input_rx),
input_type,
outputs,
timer: crate::utilization::Timer::new(),
last_report: Instant::now(),
key,
timer_tx,
events_received: register!(EventsReceived),
}
}

fn on_events_received(&mut self, events: &EventArray) {
let stopped = self.timer.stop_wait();
if stopped.duration_since(self.last_report).as_secs() >= 5 {
self.timer.report();
self.last_report = stopped;
if self
.timer_tx
.send(UtilizationTimerMessage::StopWait(
self.key.clone(),
Instant::now(),
))
.is_err()
{
debug!(component_id = ?self.key, "Couldn't send utilization stop wait message from sync transform.");
}

self.events_received.emit(CountByteSize(
Expand All @@ -860,7 +890,16 @@ impl Runner {
}

async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
self.timer.start_wait();
if self
.timer_tx
.send(UtilizationTimerMessage::StartWait(
self.key.clone(),
Instant::now(),
))
.is_err()
{
debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform.");
}
self.outputs.send(outputs_buf).await
}

Expand All @@ -877,7 +916,16 @@ impl Runner {
.into_stream()
.filter(move |events| ready(filter_events_type(events, self.input_type)));

self.timer.start_wait();
if self
.timer_tx
.send(UtilizationTimerMessage::StartWait(
self.key.clone(),
Instant::now(),
))
.is_err()
{
debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform.");
}
while let Some(events) = input_rx.next().await {
self.on_events_received(&events);
self.transform.transform_all(events, &mut outputs_buf);
Expand All @@ -903,7 +951,16 @@ impl Runner {
let mut in_flight = FuturesOrdered::new();
let mut shutting_down = false;

self.timer.start_wait();
if self
.timer_tx
.send(UtilizationTimerMessage::StartWait(
self.key.clone(),
Instant::now(),
))
.is_err()
{
debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform.");
}
loop {
tokio::select! {
biased;
Expand Down Expand Up @@ -964,10 +1021,16 @@ fn build_task_transform(
typetag: &str,
key: &ComponentKey,
outputs: &[TransformOutput],
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (mut fanout, control) = Fanout::new();

let input_rx = crate::utilization::wrap(input_rx.into_stream());
utilization_emitter.add_component(key.clone(), gauge!("utilization"));
let input_rx = wrap(
utilization_emitter.get_sender(),
key.clone(),
input_rx.into_stream(),
);

let events_received = register!(EventsReceived);
let filtered = input_rx
Expand Down
21 changes: 17 additions & 4 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use std::{
};

use super::{
builder,
builder::TopologyPieces,
builder::{self, TopologyPieces},
fanout::{ControlChannel, ControlMessage},
handle_errors, retain, take_healthchecks,
task::TaskOutput,
task::{Task, TaskOutput},
BuiltBuffer, TaskHandle,
};
use crate::{
Expand All @@ -28,9 +27,9 @@ use tokio::{
time::{interval, sleep_until, Duration, Instant},
};
use tracing::Instrument;
use vector_lib::buffers::topology::channel::BufferSender;
use vector_lib::tap::topology::{TapOutput, TapResource, WatchRx, WatchTx};
use vector_lib::trigger::DisabledTrigger;
use vector_lib::{buffers::topology::channel::BufferSender, shutdown::ShutdownSignal};

pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;

Expand All @@ -49,6 +48,7 @@ pub struct RunningTopology {
watch: (WatchTx, WatchRx),
pub(crate) running: Arc<AtomicBool>,
graceful_shutdown_duration: Option<Duration>,
utilization_task: Option<TaskHandle>,
}

impl RunningTopology {
Expand All @@ -67,6 +67,7 @@ impl RunningTopology {
running: Arc::new(AtomicBool::new(true)),
graceful_shutdown_duration: config.graceful_shutdown_duration,
config,
utilization_task: None,
}
}

Expand Down Expand Up @@ -1042,6 +1043,7 @@ impl RunningTopology {
return None;
}

let mut utilization_emitter = pieces.utilization_emitter.take().unwrap();
let mut running_topology = Self::new(config, abort_tx);

if !running_topology
Expand All @@ -1053,6 +1055,17 @@ impl RunningTopology {
running_topology.connect_diff(&diff, &mut pieces).await;
running_topology.spawn_diff(&diff, pieces);

running_topology.utilization_task =
// TODO: how to name this custom task?
Some(tokio::spawn(Task::new("".into(), "", async move {
utilization_emitter
.run_utilization(ShutdownSignal::noop())
.await;
// TODO: new task output type for this? Or handle this task in a completely
// different way
Ok(TaskOutput::Healthcheck)
})));

Some((running_topology, abort_rx))
}
}
Expand Down
Loading

0 comments on commit 95886f8

Please sign in to comment.