diff --git a/Cargo.lock b/Cargo.lock index 56a9d35af03e6..bda8bc7bd23f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4328,9 +4328,9 @@ dependencies = [ [[package]] name = "lgalloc" -version = "0.5.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76a3c66aab975ca357cdecbc25eb804168ff50a9242a3d03e6087c91f3824dd0" +checksum = "ed7f869aaa981ee942e37600f7c5749ae0af82b64c1a6794795ed7267d1b8f1b" dependencies = [ "crossbeam-deque", "libc", diff --git a/src/cluster/Cargo.toml b/src/cluster/Cargo.toml index 1467b509f03a6..abd31731d8866 100644 --- a/src/cluster/Cargo.toml +++ b/src/cluster/Cargo.toml @@ -15,7 +15,7 @@ async-trait = "0.1.88" crossbeam-channel = "0.5.15" differential-dataflow = "0.15.2" futures = "0.3.31" -lgalloc = "0.5.0" +lgalloc = "0.6.0" mz-cluster-client = { path = "../cluster-client" } mz-ore = { path = "../ore", features = ["async", "process", "tracing"] } mz-service = { path = "../service" } diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index 30de3b5866a67..96614240ab4d6 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -20,7 +20,7 @@ differential-dataflow = "0.15.2" differential-dogs3 = "0.1.13" futures = "0.3.31" itertools = "0.14.0" -lgalloc = "0.5" +lgalloc = "0.6" mz-cluster = { path = "../cluster" } mz-compute-client = { path = "../compute-client" } mz-compute-types = { path = "../compute-types" } diff --git a/src/metrics/Cargo.toml b/src/metrics/Cargo.toml index 1793449c44984..db89422a26a38 100644 --- a/src/metrics/Cargo.toml +++ b/src/metrics/Cargo.toml @@ -10,7 +10,7 @@ publish = false workspace = true [dependencies] -lgalloc = "0.5" +lgalloc = "0.6" libc = "0.2.172" mz-dyncfg = { path = "../dyncfg" } mz-ore = { path = "../ore", features = ["metrics"] } diff --git a/src/metrics/src/dyncfgs.rs b/src/metrics/src/dyncfgs.rs index cb53cbb1c2acc..97c73febae245 100644 --- a/src/metrics/src/dyncfgs.rs +++ b/src/metrics/src/dyncfgs.rs @@ -13,6 +13,13 @@ use std::time::Duration; use mz_dyncfg::{Config, ConfigSet}; +/// How frequently to refresh lgalloc map stats. +pub(crate) const MZ_METRICS_LGALLOC_MAP_REFRESH_INTERVAL: Config = Config::new( + "mz_metrics_lgalloc_map_refresh_interval", + Duration::from_secs(60), + "How frequently to refresh lgalloc stats. A zero duration disables refreshing.", +); + /// How frequently to refresh lgalloc stats. pub(crate) const MZ_METRICS_LGALLOC_REFRESH_INTERVAL: Config = Config::new( "mz_metrics_lgalloc_refresh_interval", @@ -30,6 +37,7 @@ pub(crate) const MZ_METRICS_RUSAGE_REFRESH_INTERVAL: Config = Config:: /// Adds the full set of all storage `Config`s. pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { configs + .add(&MZ_METRICS_LGALLOC_MAP_REFRESH_INTERVAL) .add(&MZ_METRICS_LGALLOC_REFRESH_INTERVAL) .add(&MZ_METRICS_RUSAGE_REFRESH_INTERVAL) } diff --git a/src/metrics/src/lgalloc.rs b/src/metrics/src/lgalloc.rs index 7277cbacdcf9e..771a3e5df4e06 100644 --- a/src/metrics/src/lgalloc.rs +++ b/src/metrics/src/lgalloc.rs @@ -8,7 +8,7 @@ use std::collections::BTreeMap; use std::ops::AddAssign; -use lgalloc::{FileStats, SizeClassStats}; +use lgalloc::{FileStats, MapStats}; use mz_ore::cast::CastFrom; use mz_ore::metrics::{MetricsRegistry, raw}; use paste::paste; @@ -48,6 +48,11 @@ struct FileStatsAccum { pub file_size: usize, /// Size of the file on disk in bytes. pub allocated_size: usize, +} + +/// An accumulator for [`MapStats`]. +#[derive(Default)] +struct MapStatsAccum { /// Number of mapped bytes, if different from `dirty`. Consult `man 7 numa` for details. pub mapped: usize, /// Number of active bytes. Consult `man 7 numa` for details. @@ -60,6 +65,11 @@ impl AddAssign<&FileStats> for FileStatsAccum { fn add_assign(&mut self, rhs: &FileStats) { self.file_size += rhs.file_size; self.allocated_size += rhs.allocated_size; + } +} + +impl AddAssign<&MapStats> for MapStatsAccum { + fn add_assign(&mut self, rhs: &MapStats) { self.mapped += rhs.mapped; self.active += rhs.active; self.dirty += rhs.dirty; @@ -118,32 +128,25 @@ macro_rules! metrics_size_class { } fn update(&mut self) -> Result<(), Error> { let stats = lgalloc::lgalloc_stats(); - for sc in &stats.size_class { - let sc_stats = self.get_size_class(sc.size_class); - $(sc_stats.$metric.set(($conv)(u64::cast_from(sc.$name), sc));)* + for (size_class, sc) in &stats.size_class { + let sc_stats = self.get_size_class(*size_class); + $(sc_stats.$metric.set(($conv)(u64::cast_from(sc.$name), *size_class));)* } - let mut accums = BTreeMap::new(); - match &stats.file_stats { - Ok(file_stats) => { - for file_stat in file_stats { - let accum: &mut FileStatsAccum = accums.entry(file_stat.size_class).or_default(); + let mut f_accums = BTreeMap::new(); + for (size_class, file_stat) in &stats.file { + let accum: &mut FileStatsAccum = f_accums.entry(*size_class).or_default(); + match file_stat { + Ok(file_stat) => { accum.add_assign(file_stat); } - } - #[cfg(target_os = "linux")] - Err(err) => { - return Err(Error::new(ErrorKind::FileStatsFailed(err.to_string()))); - } - #[cfg(not(target_os = "linux"))] - Err(err) => { - if err.kind() != std::io::ErrorKind::NotFound { - return Err(Error::new(ErrorKind::FileStatsFailed(err.to_string()))); + Err(err) => { + return Err(ErrorKind::FileStatsFailed(err.to_string()).into()); } } } - for (size_class, accum) in accums { + for (size_class, f_accum) in f_accums { let sc_stats = self.get_size_class(size_class); - $(sc_stats.$f_metric.set(u64::cast_from(accum.$f_name));)* + $(sc_stats.$f_metric.set(u64::cast_from(f_accum.$f_name));)* } Ok(()) } @@ -152,11 +155,74 @@ macro_rules! metrics_size_class { }; } -fn normalize_by_size_class(value: u64, stats: &SizeClassStats) -> u64 { - value * u64::cast_from(stats.size_class) +macro_rules! map_metrics { + ($namespace:ident + @mem ($(($m_name:ident, $m_metric:ident, $m_desc:expr)),*) + ) => { + map_metrics! { + @define $namespace + @mem $(($m_name, $m_metric, $m_desc)),* + } + }; + (@define $namespace:ident + @mem $(($m_name:ident, $m_metric:ident, $m_desc:expr)),* + ) => { + paste! { + pub(crate) struct LgMapMetrics { + size_class: BTreeMap, + $($m_metric: raw::UIntGaugeVec,)* + } + struct LgMapMetricsSC { + $($m_metric: GenericGauge,)* + } + impl LgMapMetrics { + fn new(registry: &MetricsRegistry) -> Self { + Self { + size_class: BTreeMap::default(), + $($m_metric: registry.register(mz_ore::metric!( + name: concat!(stringify!($namespace), "_", stringify!($m_metric)), + help: $m_desc, + var_labels: ["size_class"], + )),)* + } + } + fn get_size_class(&mut self, size_class: usize) -> &LgMapMetricsSC { + self.size_class.entry(size_class).or_insert_with(|| { + let labels: &[&str] = &[&size_class.to_string()]; + LgMapMetricsSC { + $($m_metric: self.$m_metric.with_label_values(labels),)* + } + }) + } + #[cfg(target_os = "linux")] + fn update(&mut self) -> std::io::Result<()> { + let stats = lgalloc::lgalloc_stats_with_mapping()?; + let mut m_accums = BTreeMap::new(); + for (size_class, map_stat) in stats.map.iter().flatten() { + let accum: &mut MapStatsAccum = m_accums.entry(*size_class).or_default(); + accum.add_assign(map_stat); + } + for (size_class, m_accum) in m_accums { + let sc_stats = self.get_size_class(size_class); + $(sc_stats.$m_metric.set(u64::cast_from(m_accum.$m_name));)* + } + Ok(()) + } + #[cfg(not(target_os = "linux"))] + fn update(&mut self) -> std::io::Result<()> { + // On non-Linux systems, we do not have `numa_maps` stats. + Ok(()) + } + } + } + }; } -fn id(value: u64, _stats: &SizeClassStats) -> u64 { +fn normalize_by_size_class(value: u64, size_class: usize) -> u64 { + value * u64::cast_from(size_class) +} + +fn id(value: u64, _size_class: usize) -> u64 { value } @@ -184,7 +250,13 @@ metrics_size_class! { ) @file ( (file_size, file_size_bytes, "Sum of file sizes in size class"), - (allocated_size, file_allocated_size_bytes, "Sum of allocated sizes in size class"), + (allocated_size, file_allocated_size_bytes, "Sum of allocated sizes in size class") + ) +} + +map_metrics! { + mz_metrics_lgalloc + @mem ( (mapped, vm_mapped_bytes, "Sum of mapped sizes in size class"), (active, vm_active_bytes, "Sum of active sizes in size class"), (dirty, vm_dirty_bytes, "Sum of dirty sizes in size class") @@ -196,6 +268,11 @@ pub(crate) fn register_metrics_into(metrics_registry: &MetricsRegistry) -> LgMet LgMetrics::new(metrics_registry) } +/// Register a task to read mapping-related lgalloc stats. +pub(crate) fn register_map_metrics_into(metrics_registry: &MetricsRegistry) -> LgMapMetrics { + LgMapMetrics::new(metrics_registry) +} + impl MetricsUpdate for LgMetrics { type Error = Error; const NAME: &'static str = "lgalloc"; @@ -203,3 +280,11 @@ impl MetricsUpdate for LgMetrics { self.update() } } + +impl MetricsUpdate for LgMapMetrics { + type Error = std::io::Error; + const NAME: &'static str = "lgalloc_map"; + fn update(&mut self) -> Result<(), Self::Error> { + self.update() + } +} diff --git a/src/metrics/src/lib.rs b/src/metrics/src/lib.rs index 48023d8244527..9d66aa526d083 100644 --- a/src/metrics/src/lib.rs +++ b/src/metrics/src/lib.rs @@ -34,6 +34,7 @@ pub mod rusage; pub struct Metrics { config_set: ConfigSet, lgalloc: MetricsTask, + lgalloc_map: MetricsTask, rusage: MetricsTask, } @@ -61,6 +62,12 @@ pub async fn register_metrics_into(metrics_registry: &MetricsRegistry, config_se dyncfgs::MZ_METRICS_LGALLOC_REFRESH_INTERVAL, &update_duration_metric, ); + let lgalloc_map = Metrics::new_metrics_task( + metrics_registry, + lgalloc::register_map_metrics_into, + dyncfgs::MZ_METRICS_LGALLOC_MAP_REFRESH_INTERVAL, + &update_duration_metric, + ); let rusage = Metrics::new_metrics_task( metrics_registry, rusage::register_metrics_into, @@ -70,6 +77,7 @@ pub async fn register_metrics_into(metrics_registry: &MetricsRegistry, config_se *METRICS.lock().expect("lock poisoned") = Some(Metrics { lgalloc, + lgalloc_map, rusage, config_set, }); @@ -89,6 +97,7 @@ impl Metrics { config_updates.apply(&self.config_set); // Notify tasks about updated configuration. self.lgalloc.update_dyncfg(&self.config_set); + self.lgalloc_map.update_dyncfg(&self.config_set); self.rusage.update_dyncfg(&self.config_set); } diff --git a/src/ore/Cargo.toml b/src/ore/Cargo.toml index b483a397d29fc..c011463837650 100644 --- a/src/ore/Cargo.toml +++ b/src/ore/Cargo.toml @@ -35,7 +35,7 @@ either = "1.15.0" futures = { version = "0.3.31", optional = true } hibitset = { version = "0.6.4", optional = true } itertools = "0.14.0" -lgalloc = { version = "0.5", optional = true } +lgalloc = { version = "0.6", optional = true } libc = { version = "0.2.172", optional = true } mz-ore-proc = { path = "../ore-proc", default-features = false } num = "0.4.3" diff --git a/src/timely-util/Cargo.toml b/src/timely-util/Cargo.toml index 04617d4fa5d49..7b075b31ae951 100644 --- a/src/timely-util/Cargo.toml +++ b/src/timely-util/Cargo.toml @@ -18,7 +18,7 @@ columnation = "0.1.0" differential-dataflow = "0.15.2" either = "1" futures-util = "0.3.31" -lgalloc = "0.5" +lgalloc = "0.6" mz-ore = { path = "../ore", features = ["async", "process", "tracing", "test", "num-traits", "region", "differential-dataflow", "overflowing"] } num-traits = "0.2" proptest = { version = "1.6.0", default-features = false, features = ["std"] }