Skip to content

Separate fast and slow lgalloc metrics #32565

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async-trait = "0.1.88"
crossbeam-channel = "0.5.15"
differential-dataflow = "0.15.2"
futures = "0.3.31"
lgalloc = "0.5.0"
lgalloc = "0.6.0"
mz-cluster-client = { path = "../cluster-client" }
mz-ore = { path = "../ore", features = ["async", "process", "tracing"] }
mz-service = { path = "../service" }
Expand Down
2 changes: 1 addition & 1 deletion src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ differential-dataflow = "0.15.2"
differential-dogs3 = "0.1.13"
futures = "0.3.31"
itertools = "0.14.0"
lgalloc = "0.5"
lgalloc = "0.6"
mz-cluster = { path = "../cluster" }
mz-compute-client = { path = "../compute-client" }
mz-compute-types = { path = "../compute-types" }
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ publish = false
workspace = true

[dependencies]
lgalloc = "0.5"
lgalloc = "0.6"
libc = "0.2.172"
mz-dyncfg = { path = "../dyncfg" }
mz-ore = { path = "../ore", features = ["metrics"] }
Expand Down
8 changes: 8 additions & 0 deletions src/metrics/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ use std::time::Duration;

use mz_dyncfg::{Config, ConfigSet};

/// How frequently to refresh lgalloc map stats.
pub(crate) const MZ_METRICS_LGALLOC_MAP_REFRESH_INTERVAL: Config<Duration> = Config::new(
"mz_metrics_lgalloc_map_refresh_interval",
Duration::from_secs(60),
"How frequently to refresh lgalloc stats. A zero duration disables refreshing.",
);

/// How frequently to refresh lgalloc stats.
pub(crate) const MZ_METRICS_LGALLOC_REFRESH_INTERVAL: Config<Duration> = Config::new(
"mz_metrics_lgalloc_refresh_interval",
Expand All @@ -30,6 +37,7 @@ pub(crate) const MZ_METRICS_RUSAGE_REFRESH_INTERVAL: Config<Duration> = Config::
/// Adds the full set of all storage `Config`s.
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
.add(&MZ_METRICS_LGALLOC_MAP_REFRESH_INTERVAL)
.add(&MZ_METRICS_LGALLOC_REFRESH_INTERVAL)
.add(&MZ_METRICS_RUSAGE_REFRESH_INTERVAL)
}
133 changes: 109 additions & 24 deletions src/metrics/src/lgalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use std::collections::BTreeMap;
use std::ops::AddAssign;

use lgalloc::{FileStats, SizeClassStats};
use lgalloc::{FileStats, MapStats};
use mz_ore::cast::CastFrom;
use mz_ore::metrics::{MetricsRegistry, raw};
use paste::paste;
Expand Down Expand Up @@ -48,6 +48,11 @@ struct FileStatsAccum {
pub file_size: usize,
/// Size of the file on disk in bytes.
pub allocated_size: usize,
}

/// An accumulator for [`MapStats`].
#[derive(Default)]
struct MapStatsAccum {
/// Number of mapped bytes, if different from `dirty`. Consult `man 7 numa` for details.
pub mapped: usize,
/// Number of active bytes. Consult `man 7 numa` for details.
Expand All @@ -60,6 +65,11 @@ impl AddAssign<&FileStats> for FileStatsAccum {
fn add_assign(&mut self, rhs: &FileStats) {
self.file_size += rhs.file_size;
self.allocated_size += rhs.allocated_size;
}
}

impl AddAssign<&MapStats> for MapStatsAccum {
fn add_assign(&mut self, rhs: &MapStats) {
self.mapped += rhs.mapped;
self.active += rhs.active;
self.dirty += rhs.dirty;
Expand Down Expand Up @@ -118,32 +128,25 @@ macro_rules! metrics_size_class {
}
fn update(&mut self) -> Result<(), Error> {
let stats = lgalloc::lgalloc_stats();
for sc in &stats.size_class {
let sc_stats = self.get_size_class(sc.size_class);
$(sc_stats.$metric.set(($conv)(u64::cast_from(sc.$name), sc));)*
for (size_class, sc) in &stats.size_class {
let sc_stats = self.get_size_class(*size_class);
$(sc_stats.$metric.set(($conv)(u64::cast_from(sc.$name), *size_class));)*
}
let mut accums = BTreeMap::new();
match &stats.file_stats {
Ok(file_stats) => {
for file_stat in file_stats {
let accum: &mut FileStatsAccum = accums.entry(file_stat.size_class).or_default();
let mut f_accums = BTreeMap::new();
for (size_class, file_stat) in &stats.file {
let accum: &mut FileStatsAccum = f_accums.entry(*size_class).or_default();
match file_stat {
Ok(file_stat) => {
accum.add_assign(file_stat);
}
}
#[cfg(target_os = "linux")]
Err(err) => {
return Err(Error::new(ErrorKind::FileStatsFailed(err.to_string())));
}
#[cfg(not(target_os = "linux"))]
Err(err) => {
if err.kind() != std::io::ErrorKind::NotFound {
return Err(Error::new(ErrorKind::FileStatsFailed(err.to_string())));
Err(err) => {
return Err(ErrorKind::FileStatsFailed(err.to_string()).into());
}
}
}
for (size_class, accum) in accums {
for (size_class, f_accum) in f_accums {
let sc_stats = self.get_size_class(size_class);
$(sc_stats.$f_metric.set(u64::cast_from(accum.$f_name));)*
$(sc_stats.$f_metric.set(u64::cast_from(f_accum.$f_name));)*
}
Ok(())
}
Expand All @@ -152,11 +155,74 @@ macro_rules! metrics_size_class {
};
}

fn normalize_by_size_class(value: u64, stats: &SizeClassStats) -> u64 {
value * u64::cast_from(stats.size_class)
macro_rules! map_metrics {
($namespace:ident
@mem ($(($m_name:ident, $m_metric:ident, $m_desc:expr)),*)
) => {
map_metrics! {
@define $namespace
@mem $(($m_name, $m_metric, $m_desc)),*
}
};
(@define $namespace:ident
@mem $(($m_name:ident, $m_metric:ident, $m_desc:expr)),*
) => {
paste! {
pub(crate) struct LgMapMetrics {
size_class: BTreeMap<usize, LgMapMetricsSC>,
$($m_metric: raw::UIntGaugeVec,)*
}
struct LgMapMetricsSC {
$($m_metric: GenericGauge<AtomicU64>,)*
}
impl LgMapMetrics {
fn new(registry: &MetricsRegistry) -> Self {
Self {
size_class: BTreeMap::default(),
$($m_metric: registry.register(mz_ore::metric!(
name: concat!(stringify!($namespace), "_", stringify!($m_metric)),
help: $m_desc,
var_labels: ["size_class"],
)),)*
}
}
fn get_size_class(&mut self, size_class: usize) -> &LgMapMetricsSC {
self.size_class.entry(size_class).or_insert_with(|| {
let labels: &[&str] = &[&size_class.to_string()];
LgMapMetricsSC {
$($m_metric: self.$m_metric.with_label_values(labels),)*
}
})
}
#[cfg(target_os = "linux")]
fn update(&mut self) -> std::io::Result<()> {
let stats = lgalloc::lgalloc_stats_with_mapping()?;
let mut m_accums = BTreeMap::new();
for (size_class, map_stat) in stats.map.iter().flatten() {
let accum: &mut MapStatsAccum = m_accums.entry(*size_class).or_default();
accum.add_assign(map_stat);
}
for (size_class, m_accum) in m_accums {
let sc_stats = self.get_size_class(size_class);
$(sc_stats.$m_metric.set(u64::cast_from(m_accum.$m_name));)*
}
Ok(())
}
#[cfg(not(target_os = "linux"))]
fn update(&mut self) -> std::io::Result<()> {
// On non-Linux systems, we do not have `numa_maps` stats.
Ok(())
}
}
}
};
}

fn id(value: u64, _stats: &SizeClassStats) -> u64 {
fn normalize_by_size_class(value: u64, size_class: usize) -> u64 {
value * u64::cast_from(size_class)
}

fn id(value: u64, _size_class: usize) -> u64 {
value
}

Expand Down Expand Up @@ -184,7 +250,13 @@ metrics_size_class! {
)
@file (
(file_size, file_size_bytes, "Sum of file sizes in size class"),
(allocated_size, file_allocated_size_bytes, "Sum of allocated sizes in size class"),
(allocated_size, file_allocated_size_bytes, "Sum of allocated sizes in size class")
)
}

map_metrics! {
mz_metrics_lgalloc
@mem (
(mapped, vm_mapped_bytes, "Sum of mapped sizes in size class"),
(active, vm_active_bytes, "Sum of active sizes in size class"),
(dirty, vm_dirty_bytes, "Sum of dirty sizes in size class")
Expand All @@ -196,10 +268,23 @@ pub(crate) fn register_metrics_into(metrics_registry: &MetricsRegistry) -> LgMet
LgMetrics::new(metrics_registry)
}

/// Register a task to read mapping-related lgalloc stats.
pub(crate) fn register_map_metrics_into(metrics_registry: &MetricsRegistry) -> LgMapMetrics {
LgMapMetrics::new(metrics_registry)
}

impl MetricsUpdate for LgMetrics {
type Error = Error;
const NAME: &'static str = "lgalloc";
fn update(&mut self) -> Result<(), Self::Error> {
self.update()
}
}

impl MetricsUpdate for LgMapMetrics {
type Error = std::io::Error;
const NAME: &'static str = "lgalloc_map";
fn update(&mut self) -> Result<(), Self::Error> {
self.update()
}
}
9 changes: 9 additions & 0 deletions src/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub mod rusage;
pub struct Metrics {
config_set: ConfigSet,
lgalloc: MetricsTask,
lgalloc_map: MetricsTask,
rusage: MetricsTask,
}

Expand Down Expand Up @@ -61,6 +62,12 @@ pub async fn register_metrics_into(metrics_registry: &MetricsRegistry, config_se
dyncfgs::MZ_METRICS_LGALLOC_REFRESH_INTERVAL,
&update_duration_metric,
);
let lgalloc_map = Metrics::new_metrics_task(
metrics_registry,
lgalloc::register_map_metrics_into,
dyncfgs::MZ_METRICS_LGALLOC_MAP_REFRESH_INTERVAL,
&update_duration_metric,
);
let rusage = Metrics::new_metrics_task(
metrics_registry,
rusage::register_metrics_into,
Expand All @@ -70,6 +77,7 @@ pub async fn register_metrics_into(metrics_registry: &MetricsRegistry, config_se

*METRICS.lock().expect("lock poisoned") = Some(Metrics {
lgalloc,
lgalloc_map,
rusage,
config_set,
});
Expand All @@ -89,6 +97,7 @@ impl Metrics {
config_updates.apply(&self.config_set);
// Notify tasks about updated configuration.
self.lgalloc.update_dyncfg(&self.config_set);
self.lgalloc_map.update_dyncfg(&self.config_set);
self.rusage.update_dyncfg(&self.config_set);
}

Expand Down
2 changes: 1 addition & 1 deletion src/ore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ either = "1.15.0"
futures = { version = "0.3.31", optional = true }
hibitset = { version = "0.6.4", optional = true }
itertools = "0.14.0"
lgalloc = { version = "0.5", optional = true }
lgalloc = { version = "0.6", optional = true }
libc = { version = "0.2.172", optional = true }
mz-ore-proc = { path = "../ore-proc", default-features = false }
num = "0.4.3"
Expand Down
2 changes: 1 addition & 1 deletion src/timely-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ columnation = "0.1.0"
differential-dataflow = "0.15.2"
either = "1"
futures-util = "0.3.31"
lgalloc = "0.5"
lgalloc = "0.6"
mz-ore = { path = "../ore", features = ["async", "process", "tracing", "test", "num-traits", "region", "differential-dataflow", "overflowing"] }
num-traits = "0.2"
proptest = { version = "1.6.0", default-features = false, features = ["std"] }
Expand Down