Skip to content

Commit

Permalink
First pass at tracking system metrics
Browse files Browse the repository at this point in the history
This spawns a background thread to track common system metrics;
currently only tracks a few, but we can add more as needed.

I evaluated a few different metrics:
 - [opentelemetry-system-metrics](https://crates.io/crates/opentelemetry-system-metrics)
 - [sys_metrics](https://crates.io/crates/sys_metrics)
 - [heim](https://github.com/heim-rs/heim?tab=readme-ov-file)

Of these, sys_metrics doesn't support windows, but is the most actively
maintained.

As a follow up, it might be useful to track Tokio runtime metrics as
well:

https://docs.rs/tokio/latest/tokio/runtime/struct.RuntimeMetrics.html
  • Loading branch information
Quantumplation committed Jan 6, 2025
1 parent 55cc9f1 commit 2e56f9f
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 2 deletions.
50 changes: 50 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/amaru/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ opentelemetry-otlp = { version = "0.27.0", features = [
"reqwest-client",
] }
tracing-opentelemetry = { version = "0.28.0" }
sys_metrics = "0.2.7"

[dev-dependencies]
envpath = { version = "0.0.1-beta.3", features = ["rand"] }
Expand Down
12 changes: 11 additions & 1 deletion crates/amaru/src/bin/amaru/cmd/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::config::NetworkName;
use crate::metrics::track_system_metrics;
use amaru::{consensus::nonce, sync::Config};
use clap::{builder::TypedValueParser as _, Parser};
use miette::IntoDiagnostic;
use opentelemetry::metrics::Counter;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use pallas_network::facades::PeerClient;
use std::{
path::{Path, PathBuf},
Expand Down Expand Up @@ -33,9 +35,15 @@ pub struct Args {
data_dir: String,
}

pub async fn run(args: Args, counter: Counter<u64>) -> miette::Result<()> {
pub async fn run(
args: Args,
counter: Counter<u64>,
metrics: SdkMeterProvider,
) -> miette::Result<()> {
let config = parse_args(args, counter)?;

let metrics = track_system_metrics(metrics);

let client = Arc::new(Mutex::new(
PeerClient::connect(config.upstream_peer.clone(), config.network_magic as u64)
.await
Expand All @@ -48,6 +56,8 @@ pub async fn run(args: Args, counter: Counter<u64>) -> miette::Result<()> {

run_pipeline(gasket::daemon::Daemon::new(sync), exit.clone()).await;

metrics.abort();

Ok(())
}

Expand Down
3 changes: 2 additions & 1 deletion crates/amaru/src/bin/amaru/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::env;
mod cmd;
mod config;
mod exit;
mod metrics;
mod panic;

pub const SERVICE_NAME: &str = "amaru";
Expand Down Expand Up @@ -39,7 +40,7 @@ async fn main() -> miette::Result<()> {
let args = Cli::parse();

let result = match args.command {
Command::Daemon(args) => cmd::daemon::run(args, counter).await,
Command::Daemon(args) => cmd::daemon::run(args, counter, metrics.clone()).await,
Command::Import(args) => cmd::import::run(args).await,
};

Expand Down
123 changes: 123 additions & 0 deletions crates/amaru/src/bin/amaru/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::time::Duration;

use miette::IntoDiagnostic;
use opentelemetry::{
metrics::{Counter, Gauge, MeterProvider},
KeyValue,
};
use opentelemetry_sdk::metrics::SdkMeterProvider;
use sys_metrics::{
cpu::{CpuTimes, LoadAvg},
memory::Memory,
};
use tokio::task::JoinHandle;
use tracing::warn;

pub fn track_system_metrics(metrics: SdkMeterProvider) -> JoinHandle<()> {
tokio::spawn(async move {
let counters = make_system_counters(metrics);
let mut delay = Duration::from_secs(1);
loop {
// TODO(pi): configurable parameter?
tokio::time::sleep(delay).await;

let reading = match get_reading() {
Ok(sys) => sys,
Err(err) => {
warn!("failed to read system metrics: {}", err);
delay = delay * 2;
if delay > Duration::from_secs(30) {
delay = Duration::from_secs(30);
}
continue;
}
};
delay = Duration::from_secs(1);

record_system_metrics(reading, &counters);
}
})
}

struct SystemCounters {
total_memory: Gauge<u64>,
free_memory: Gauge<u64>,
cpu_load: Gauge<f64>,
user_time: Counter<u64>,
}

#[derive(Debug)]
struct Reading {
memory: Memory,
cpu: CpuTimes,
load: LoadAvg,
}

fn make_system_counters(metrics: SdkMeterProvider) -> SystemCounters {
// TODO: standardize with the Haskell node somehow?
let meter = metrics.meter("system");
let total_memory = meter
.u64_gauge("memory.limit")
.with_description("The total system memory, updated once per second")
.with_unit("MB")
.build();

let free_memory = meter
.u64_gauge("memory.usage")
.with_description("The free system memory, measured once per second")
.with_unit("MB")
.build();

let cpu_load = meter
.f64_gauge("cpu.utilization")
.with_description("the 1m average load, measured once per second")
.build();

let user_time = meter
.u64_counter("cpu.time")
.with_description("the total cpu time spent in user processes")
.with_unit("ms")
.build();

SystemCounters {
total_memory,
free_memory,
cpu_load,
user_time,
}
}

fn get_reading() -> miette::Result<Reading> {
use sys_metrics::*;
let memory = memory::get_memory().into_diagnostic()?;
let cpu = cpu::get_cputimes().into_diagnostic()?;
let load = cpu::get_loadavg().into_diagnostic()?;

Ok(Reading { memory, cpu, load })
}

fn record_system_metrics(reading: Reading, counters: &SystemCounters) {
counters.total_memory.record(reading.memory.total, &[]);
counters.free_memory.record(reading.memory.free, &[]);
counters.cpu_load.record(reading.load.one, &[]);
counters
.user_time
.add(reading.cpu.user, &[KeyValue::new("state", "user")]);
counters
.user_time
.add(reading.cpu.system, &[KeyValue::new("state", "system")]);
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn can_read_system_metrics() {
let reading = get_reading().expect("failed to read system metrics");
assert!(reading.memory.free > 0, "failed to read free memory");
assert!(reading.memory.total > 0, "failed to read total memory");
assert!(reading.cpu.user > 0, "failed to read user cpu time");
assert!(reading.load.one > 0.0, "failed to read cpu load average");
}
}

0 comments on commit 2e56f9f

Please sign in to comment.