From 8d1bcce90f4d8c02a69b4241e4f00d46dad173d9 Mon Sep 17 00:00:00 2001 From: Shiv Tyagi Date: Thu, 7 May 2026 03:09:46 +0000 Subject: [PATCH 1/7] feat(spurdbd): wire fairshare integration between accounting and scheduler Add GetFairshareFactors RPC to SlurmAccounting service so spurctld can periodically fetch precomputed fair-share factors from spurdbd. The controller caches these factors and uses them in pending_jobs() priority computation instead of the previous hardcoded neutral value. - Add GetFairshareFactors RPC + messages to proto - Implement handler in spurdbd wiring compute_fairshare to the new RPC - Add FairshareCache in spurctld with eager initial fetch and periodic background refresh (configurable via fairshare_refresh_secs, default 300s) - Replace hardcoded 1.0 in pending_jobs() with live cache lookup - Remove redundant fair_share_factor() from spur-sched (spurdbd owns this) - Simplify priority tests to table-driven style --- crates/spur-core/src/config.rs | 7 ++ crates/spur-sched/src/priority.rs | 73 +++++--------------- crates/spur-tests/src/t24_priority.rs | 40 ----------- crates/spurctld/src/cluster.rs | 15 +++- crates/spurctld/src/fairshare_cache.rs | 95 ++++++++++++++++++++++++++ crates/spurctld/src/main.rs | 8 +++ crates/spurdbd/src/server.rs | 44 +++++++++++- proto/slurm.proto | 11 +++ 8 files changed, 193 insertions(+), 100 deletions(-) create mode 100644 crates/spurctld/src/fairshare_cache.rs diff --git a/crates/spur-core/src/config.rs b/crates/spur-core/src/config.rs index 52c199e..c0872ab 100644 --- a/crates/spur-core/src/config.rs +++ b/crates/spur-core/src/config.rs @@ -199,6 +199,9 @@ pub struct AccountingConfig { /// How long to keep completed job records. #[serde(default = "default_purge_days")] pub purge_after_days: u32, + /// How often to refresh fairshare factors from the accounting daemon. + #[serde(default = "default_fairshare_refresh_secs")] + pub fairshare_refresh_secs: u32, } fn default_accounting_host() -> String { @@ -210,6 +213,9 @@ fn default_database_url() -> String { fn default_purge_days() -> u32 { 365 } +fn default_fairshare_refresh_secs() -> u32 { + 300 +} impl Default for AccountingConfig { fn default() -> Self { @@ -217,6 +223,7 @@ impl Default for AccountingConfig { host: "localhost:6819".into(), database_url: "postgresql://spur:spur@localhost/spur".into(), purge_after_days: 365, + fairshare_refresh_secs: 300, } } } diff --git a/crates/spur-sched/src/priority.rs b/crates/spur-sched/src/priority.rs index ef06330..b7df43a 100644 --- a/crates/spur-sched/src/priority.rs +++ b/crates/spur-sched/src/priority.rs @@ -1,31 +1,3 @@ -use chrono::{DateTime, Duration, Utc}; - -/// Calculate fair-share factor for a user/account. -/// -/// fair_share_factor = target_share / max(decayed_actual_usage, epsilon) -/// -/// The decay uses a half-life model where older usage matters less. -pub fn fair_share_factor( - target_share: f64, - usage_records: &[(DateTime, f64)], // (timestamp, cpu_hours) - halflife_days: u32, - now: DateTime, -) -> f64 { - let halflife = Duration::days(halflife_days as i64); - let decay_rate = 2.0_f64.ln() / halflife.num_seconds() as f64; - - let decayed_usage: f64 = usage_records - .iter() - .map(|(time, usage)| { - let age = (now - *time).num_seconds().max(0) as f64; - usage * (-decay_rate * age).exp() - }) - .sum(); - - let epsilon = 0.001; // Avoid division by zero - target_share / decayed_usage.max(epsilon) -} - /// Calculate effective job priority. /// /// effective_priority = base_priority * fair_share_factor * age_factor * partition_tier @@ -49,34 +21,21 @@ mod tests { use super::*; #[test] - fn test_fair_share_no_usage() { - let factor = fair_share_factor(1.0, &[], 14, Utc::now()); - // With no usage, factor should be very high (share / epsilon) - assert!(factor > 100.0); - } - - #[test] - fn test_fair_share_heavy_usage() { - let now = Utc::now(); - let records: Vec<(DateTime, f64)> = - (0..14).map(|d| (now - Duration::days(d), 100.0)).collect(); - - let factor = fair_share_factor(1.0, &records, 14, now); - // Heavy recent usage → low factor - assert!(factor < 1.0); - } - - #[test] - fn test_effective_priority() { - let p = effective_priority(1000, 1.0, 0, 1); - assert_eq!(p, 1000); - - // With age bonus - let p = effective_priority(1000, 1.0, 10080, 1); - assert_eq!(p, 2000); - - // With partition tier - let p = effective_priority(1000, 1.0, 0, 2); - assert_eq!(p, 2000); + fn effective_priority_cases() { + let cases = [ + ((1000, 1.0, 0, 1), 1000), + ((1000, 1.0, 10080, 1), 2000), + ((1000, 1.0, 0, 2), 2000), + ((1000, 100.0, 0, 1), 10_000), + ((0, 0.001, 0, 1), 1), + ]; + + for ((base, fs, age, tier), expected) in cases { + let got = effective_priority(base, fs, age, tier); + assert_eq!( + got, expected, + "effective_priority({base}, {fs}, {age}, {tier})" + ); + } } } diff --git a/crates/spur-tests/src/t24_priority.rs b/crates/spur-tests/src/t24_priority.rs index 91f7134..6b6e7ba 100644 --- a/crates/spur-tests/src/t24_priority.rs +++ b/crates/spur-tests/src/t24_priority.rs @@ -4,48 +4,8 @@ #[cfg(test)] mod tests { - use chrono::{Duration, Utc}; use spur_sched::priority; - // ── T24.1: Fair-share with no usage ────────────────────────── - - #[test] - fn t24_1_fair_share_no_usage() { - let factor = priority::fair_share_factor(1.0, &[], 14, Utc::now()); - assert!(factor > 100.0, "no usage → very high factor"); - } - - // ── T24.2: Fair-share with heavy recent usage ──────────────── - - #[test] - fn t24_2_fair_share_heavy_usage() { - let now = Utc::now(); - let records: Vec<_> = (0..14).map(|d| (now - Duration::days(d), 100.0)).collect(); - let factor = priority::fair_share_factor(1.0, &records, 14, now); - assert!(factor < 1.0, "heavy usage → low factor"); - } - - // ── T24.3: Usage decay ─────────────────────────────────────── - - #[test] - fn t24_3_old_usage_decays() { - let now = Utc::now(); - - // Recent usage - let recent = vec![(now - Duration::hours(1), 100.0)]; - let factor_recent = priority::fair_share_factor(1.0, &recent, 14, now); - - // Old usage (same amount but 30 days ago) - let old = vec![(now - Duration::days(30), 100.0)]; - let factor_old = priority::fair_share_factor(1.0, &old, 14, now); - - // Old usage should decay → higher factor - assert!( - factor_old > factor_recent, - "old usage should result in higher factor than recent" - ); - } - // ── T24.4: Effective priority ──────────────────────────────── #[test] diff --git a/crates/spurctld/src/cluster.rs b/crates/spurctld/src/cluster.rs index 02769d8..57b697a 100644 --- a/crates/spurctld/src/cluster.rs +++ b/crates/spurctld/src/cluster.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::path::Path; use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; use chrono::{DateTime, Utc}; use parking_lot::RwLock; @@ -18,6 +19,7 @@ use spur_core::step::{JobStep, StepState, STEP_BATCH}; use spur_core::wal::WalOperation; use crate::accounting::AccountingNotifier; +use crate::fairshare_cache::FairshareCache; use crate::raft::{SpurRaft, StateMachineApply}; /// Central cluster state manager. @@ -36,12 +38,14 @@ pub struct ClusterManager { hostname_aliases: RwLock>, raft: RwLock>, accounting: RwLock>, + fairshare_cache: Arc, } impl ClusterManager { pub fn new(config: SlurmConfig, _state_dir: &Path) -> anyhow::Result { let partitions = config.build_partitions(); let license_pool = config.licenses.clone(); + let fairshare_cache = Arc::new(FairshareCache::new()); let cm = Self { config, @@ -55,6 +59,7 @@ impl ClusterManager { hostname_aliases: RwLock::new(HashMap::new()), raft: RwLock::new(None), accounting: RwLock::new(None), + fairshare_cache, }; info!("cluster manager initialized (state will be recovered via Raft)"); @@ -953,10 +958,12 @@ impl ClusterManager { .and_then(|pname| partitions.iter().find(|p| p.name == *pname)) .map(|p| p.priority_tier) .unwrap_or(1); - // fair_share = 1.0 (neutral) until spurdbd integration + let fair_share = self + .fairshare_cache + .get(&job.spec.user, job.spec.account.as_deref().unwrap_or("")); job.priority = spur_sched::priority::effective_priority( job.priority, - 1.0, + fair_share, age_minutes, partition_tier, ); @@ -1248,6 +1255,10 @@ impl ClusterManager { *self.accounting.write() = Some(notifier); } + pub fn fairshare_cache(&self) -> &Arc { + &self.fairshare_cache + } + /// Persist a mutation via Raft consensus. The apply callback /// (`StateMachineApply`) handles in-memory state on all nodes. fn propose(&self, op: WalOperation) -> anyhow::Result<()> { diff --git a/crates/spurctld/src/fairshare_cache.rs b/crates/spurctld/src/fairshare_cache.rs new file mode 100644 index 0000000..9e9474a --- /dev/null +++ b/crates/spurctld/src/fairshare_cache.rs @@ -0,0 +1,95 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use parking_lot::RwLock; +use tonic::transport::Channel; +use tracing::{debug, info, warn}; + +use spur_proto::proto::slurm_accounting_client::SlurmAccountingClient; +use spur_proto::proto::GetFairshareFactorsRequest; + +pub struct FairshareCache { + factors: RwLock>, +} + +impl FairshareCache { + pub fn new() -> Self { + Self { + factors: RwLock::new(HashMap::new()), + } + } + + pub fn get(&self, user: &str, account: &str) -> f64 { + let key = format!("{}:{}", user, account); + match self.factors.read().get(&key) { + Some(&factor) => factor, + None => { + debug!( + user, + account, "fairshare factor not found, defaulting to neutral" + ); + 1.0 + } + } + } + + fn replace(&self, new_factors: HashMap) { + *self.factors.write() = new_factors; + } + + pub fn spawn_refresh_loop( + self: &Arc, + host: String, + halflife_days: u32, + refresh_interval_secs: u64, + ) { + let cache = Arc::clone(self); + let interval = Duration::from_secs(refresh_interval_secs); + + tokio::spawn(async move { + let uri = if host.starts_with("http://") || host.starts_with("https://") { + host.clone() + } else { + format!("http://{}", host) + }; + + // Eager first fetch so the cache is populated before the scheduler runs + match tokio::time::timeout(Duration::from_secs(5), Self::fetch(&uri, halflife_days)) + .await + { + Ok(Ok(factors)) => { + info!(count = factors.len(), "fairshare cache initialized"); + cache.replace(factors); + } + Ok(Err(e)) => { + warn!(error = %e, "initial fairshare fetch failed, will retry in background"); + } + Err(_) => { + warn!("initial fairshare fetch timed out, will retry in background"); + } + } + + loop { + tokio::time::sleep(interval).await; + + match Self::fetch(&uri, halflife_days).await { + Ok(factors) => { + cache.replace(factors); + } + Err(e) => { + warn!(error = %e, "fairshare refresh failed, retaining stale data"); + } + } + } + }); + } + + async fn fetch(uri: &str, halflife_days: u32) -> anyhow::Result> { + let mut client: SlurmAccountingClient = + SlurmAccountingClient::connect(uri.to_owned()).await?; + let req = GetFairshareFactorsRequest { halflife_days }; + let resp = client.get_fairshare_factors(req).await?; + Ok(resp.into_inner().factors) + } +} diff --git a/crates/spurctld/src/main.rs b/crates/spurctld/src/main.rs index ce5db44..2377712 100644 --- a/crates/spurctld/src/main.rs +++ b/crates/spurctld/src/main.rs @@ -1,5 +1,6 @@ mod accounting; mod cluster; +mod fairshare_cache; mod raft; mod raft_server; mod scheduler_loop; @@ -147,6 +148,13 @@ async fn main() -> anyhow::Result<()> { } } + // Start fairshare factor refresh loop + cluster.fairshare_cache().spawn_refresh_loop( + config.accounting.host.clone(), + config.scheduler.fairshare_halflife_days, + config.accounting.fairshare_refresh_secs as u64, + ); + // Start scheduler loop (only schedules when this node is Raft leader) let sched_cluster = cluster.clone(); let sched_raft = raft_handle.clone(); diff --git a/crates/spurdbd/src/server.rs b/crates/spurdbd/src/server.rs index 7ea117a..1338ed0 100644 --- a/crates/spurdbd/src/server.rs +++ b/crates/spurdbd/src/server.rs @@ -9,7 +9,7 @@ use spur_proto::proto::*; #[allow(unused_imports)] use tracing::info; -use crate::db; +use crate::{db, fairshare}; pub struct AccountingService { pool: PgPool, @@ -417,6 +417,48 @@ impl SlurmAccounting for AccountingService { Ok(Response::new(ListQosResponse { qos_list })) } + + // ============================================================ + // Fairshare + // ============================================================ + + async fn get_fairshare_factors( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let halflife_days = if req.halflife_days == 0 { + 14 + } else { + req.halflife_days + }; + + let now = Utc::now(); + let since = now - chrono::Duration::days(halflife_days as i64 * 4); + + let usage = db::get_usage(&self.pool, None, None, since) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + let accounts = db::list_accounts(&self.pool) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + let account_weights: std::collections::HashMap = accounts + .into_iter() + .map(|a| (a.name, a.fairshare_weight as f64)) + .collect(); + + let raw_factors = + fairshare::compute_fairshare(&usage, &account_weights, halflife_days, now); + + let factors = raw_factors + .into_iter() + .map(|((user, account), factor)| (format!("{}:{}", user, account), factor)) + .collect(); + + Ok(Response::new(GetFairshareFactorsResponse { factors })) + } } pub async fn serve(addr: SocketAddr, pool: PgPool) -> anyhow::Result<()> { diff --git a/proto/slurm.proto b/proto/slurm.proto index d16c05d..6db4bac 100644 --- a/proto/slurm.proto +++ b/proto/slurm.proto @@ -310,6 +310,9 @@ service SlurmAccounting { rpc CreateQos(CreateQosRequest) returns (google.protobuf.Empty); rpc DeleteQos(DeleteQosRequest) returns (google.protobuf.Empty); rpc ListQos(ListQosRequest) returns (ListQosResponse); + + // Fairshare + rpc GetFairshareFactors(GetFairshareFactorsRequest) returns (GetFairshareFactorsResponse); } // ============================================================ @@ -560,6 +563,14 @@ message GetUsageResponse { map job_count = 3; } +message GetFairshareFactorsRequest { + uint32 halflife_days = 1; +} + +message GetFairshareFactorsResponse { + map factors = 1; // key = "user:account" +} + // -- K8s Operator -- message ReportJobStatusRequest { From 9546ec30785b39f581eabe548cfac62bb794617b Mon Sep 17 00:00:00 2001 From: Shiv Tyagi Date: Fri, 8 May 2026 02:26:31 +0000 Subject: [PATCH 2/7] fix(spurdbd): cast SUM aggregates to BIGINT in get_usage query PostgreSQL SUM() on BIGINT columns returns NUMERIC, causing a runtime panic when sqlx tries to decode into i64. --- crates/spurdbd/src/db.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/spurdbd/src/db.rs b/crates/spurdbd/src/db.rs index 7fdc320..479a56a 100644 --- a/crates/spurdbd/src/db.rs +++ b/crates/spurdbd/src/db.rs @@ -361,8 +361,10 @@ pub async fn get_usage( ) -> anyhow::Result> { let rows = sqlx::query( r#" - SELECT user_name, account, SUM(cpu_seconds) as total_cpu_seconds, - SUM(gpu_seconds) as total_gpu_seconds, SUM(job_count) as total_jobs, + SELECT user_name, account, + SUM(cpu_seconds)::BIGINT as total_cpu_seconds, + SUM(gpu_seconds)::BIGINT as total_gpu_seconds, + SUM(job_count)::BIGINT as total_jobs, period_start FROM usage WHERE period_start >= $1 From 71f444c3a4d678c97e8d2a056b9283a80f9fc83b Mon Sep 17 00:00:00 2001 From: Shiv Tyagi Date: Fri, 8 May 2026 02:26:40 +0000 Subject: [PATCH 3/7] fix(spur-cli): parse composite keys in sshare usage display The get_usage RPC returns cpu_hours keyed as "user:account" but sshare looked up by bare account or user name, resulting in zero usage shown. Pre-aggregate the map by account and (user, account) before display. --- crates/spur-cli/src/sshare.rs | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/crates/spur-cli/src/sshare.rs b/crates/spur-cli/src/sshare.rs index 6fb0058..4ab4b30 100644 --- a/crates/spur-cli/src/sshare.rs +++ b/crates/spur-cli/src/sshare.rs @@ -78,8 +78,21 @@ pub async fn main_with_args(args: Vec) -> Result<()> { total_shares }; + // The server returns keys in "user:account" format. Pre-aggregate by account + // and by (user, account) for lookups below. + let mut account_cpu_hours: std::collections::HashMap<&str, f64> = + std::collections::HashMap::new(); + let mut user_account_cpu_hours: std::collections::HashMap<(&str, &str), f64> = + std::collections::HashMap::new(); + for (key, &hours) in &usage.cpu_hours { + if let Some((user, account)) = key.split_once(':') { + *account_cpu_hours.entry(account).or_default() += hours; + *user_account_cpu_hours.entry((user, account)).or_default() += hours; + } + } + // Compute total usage for normalization - let total_cpu_usage: f64 = usage.cpu_hours.values().sum(); + let total_cpu_usage: f64 = account_cpu_hours.values().sum(); let total_cpu_usage = if total_cpu_usage <= 0.0 { 1.0 } else { @@ -119,7 +132,10 @@ pub async fn main_with_args(args: Vec) -> Result<()> { let raw_shares = account.fairshare_weight; let norm_shares = raw_shares / total_shares; - let raw_usage = usage.cpu_hours.get(&account.name).copied().unwrap_or(0.0); + let raw_usage = account_cpu_hours + .get(account.name.as_str()) + .copied() + .unwrap_or(0.0); let norm_usage = raw_usage / total_cpu_usage; let fair_share = if norm_usage > 0.001 { norm_shares / norm_usage @@ -159,7 +175,10 @@ pub async fn main_with_args(args: Vec) -> Result<()> { } } - let user_usage = usage.cpu_hours.get(&user.name).copied().unwrap_or(0.0); + let user_usage = user_account_cpu_hours + .get(&(user.name.as_str(), account.name.as_str())) + .copied() + .unwrap_or(0.0); let user_norm_usage = user_usage / total_cpu_usage; // Each user within an account gets an equal sub-share let user_count = account_users.len().max(1) as f64; From d8511a1a0b72a6d9c83f6ca7c5936579e44d285d Mon Sep 17 00:00:00 2001 From: Shiv Tyagi Date: Fri, 8 May 2026 02:26:47 +0000 Subject: [PATCH 4/7] fix(deploy): use correct field name for accounting host in K8s configmap The AccountingConfig struct expects `host` but the configmap had `address` and a non-existent `enabled` field, both silently ignored by serde causing spurctld to default to localhost:6819. --- deploy/k8s/configmap.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/deploy/k8s/configmap.yaml b/deploy/k8s/configmap.yaml index 139f9fd..81ebcbf 100644 --- a/deploy/k8s/configmap.yaml +++ b/deploy/k8s/configmap.yaml @@ -22,8 +22,7 @@ data: plugin = "backfill" [accounting] - enabled = true - address = "spurdbd.spur.svc.cluster.local:6819" + host = "spurdbd.spur.svc.cluster.local:6819" [[partitions]] name = "default" From 25f22efb15ff95bde849e40b377a27265f75c659 Mon Sep 17 00:00:00 2001 From: Shiv Tyagi Date: Fri, 8 May 2026 03:19:47 +0000 Subject: [PATCH 5/7] refactor(proto): replace composite-key maps with structured repeated messages GetUsageResponse and GetFairshareFactorsResponse used map with "user:account" composite keys, forcing consumers to split strings and aggregate manually. Replace both with repeated message types (UsageEntry, FairshareEntry) carrying user, account, and metrics as distinct fields. Updates spurdbd server, spurctld fairshare cache, sshare, and sreport. --- crates/spur-cli/src/sreport.rs | 75 +++++++++++++++++++------- crates/spur-cli/src/sshare.rs | 13 +++-- crates/spurctld/src/fairshare_cache.rs | 20 ++++--- crates/spurdbd/src/server.rs | 38 ++++++------- proto/slurm.proto | 20 +++++-- 5 files changed, 111 insertions(+), 55 deletions(-) diff --git a/crates/spur-cli/src/sreport.rs b/crates/spur-cli/src/sreport.rs index 1512cba..6a00a8d 100644 --- a/crates/spur-cli/src/sreport.rs +++ b/crates/spur-cli/src/sreport.rs @@ -151,12 +151,27 @@ async fn report_account_utilization_by_user( } } + // Aggregate entries by account and (user, account) + let mut acct_agg: std::collections::HashMap<&str, (f64, f64, u64)> = + std::collections::HashMap::new(); + let mut user_agg: std::collections::HashMap<(&str, &str), (f64, f64, u64)> = + std::collections::HashMap::new(); + for e in &usage.entries { + let a = acct_agg.entry(&e.account).or_default(); + a.0 += e.cpu_hours; + a.1 += e.gpu_hours; + a.2 += e.job_count; + let u = user_agg.entry((&e.user, &e.account)).or_default(); + u.0 += e.cpu_hours; + u.1 += e.gpu_hours; + u.2 += e.job_count; + } + for account in &accounts { - let acct_cpu = usage.cpu_hours.get(&account.name).copied().unwrap_or(0.0); - let acct_gpu = usage.gpu_hours.get(&account.name).copied().unwrap_or(0.0); - let acct_jobs = usage.job_count.get(&account.name).copied().unwrap_or(0); + let &(acct_cpu, acct_gpu, acct_jobs) = acct_agg + .get(account.name.as_str()) + .unwrap_or(&(0.0, 0.0, 0)); - // Account summary row println!( "{:<20}{}{:<15}{}{:>12.1}{}{:>12.1}{}{:>10}", account.name, @@ -170,12 +185,11 @@ async fn report_account_utilization_by_user( acct_jobs ); - // Per-user rows under this account let account_users: Vec<_> = users.iter().filter(|u| u.account == account.name).collect(); for user in &account_users { - let user_cpu = usage.cpu_hours.get(&user.name).copied().unwrap_or(0.0); - let user_gpu = usage.gpu_hours.get(&user.name).copied().unwrap_or(0.0); - let user_jobs = usage.job_count.get(&user.name).copied().unwrap_or(0); + let &(user_cpu, user_gpu, user_jobs) = user_agg + .get(&(user.name.as_str(), account.name.as_str())) + .unwrap_or(&(0.0, 0.0, 0)); println!( " {:<19}{}{:<15}{}{:>12.1}{}{:>12.1}{}{:>10}", @@ -238,10 +252,19 @@ async fn report_user_utilization_by_account( } } + let mut user_agg: std::collections::HashMap<(&str, &str), (f64, f64, u64)> = + std::collections::HashMap::new(); + for e in &usage.entries { + let u = user_agg.entry((&e.user, &e.account)).or_default(); + u.0 += e.cpu_hours; + u.1 += e.gpu_hours; + u.2 += e.job_count; + } + for user in &users { - let cpu = usage.cpu_hours.get(&user.name).copied().unwrap_or(0.0); - let gpu = usage.gpu_hours.get(&user.name).copied().unwrap_or(0.0); - let jobs = usage.job_count.get(&user.name).copied().unwrap_or(0); + let &(cpu, gpu, jobs) = user_agg + .get(&(user.name.as_str(), user.account.as_str())) + .unwrap_or(&(0.0, 0.0, 0)); println!( "{:<15}{}{:<20}{}{:>12.1}{}{:>12.1}{}{:>10}", @@ -273,8 +296,15 @@ async fn report_job_sizes_by_account( .context("failed to get usage")?; let usage = usage_resp.into_inner(); - let total_jobs: u64 = usage.job_count.values().sum(); - let total_cpu: f64 = usage.cpu_hours.values().sum(); + let mut acct_agg: std::collections::HashMap<&str, (f64, u64)> = + std::collections::HashMap::new(); + for e in &usage.entries { + let a = acct_agg.entry(&e.account).or_default(); + a.0 += e.cpu_hours; + a.1 += e.job_count; + } + let total_cpu: f64 = acct_agg.values().map(|v| v.0).sum(); + let total_jobs: u64 = acct_agg.values().map(|v| v.1).sum(); let delimiter = if args.parsable { "|" } else { " " }; @@ -289,8 +319,7 @@ async fn report_job_sizes_by_account( } for account in &accounts { - let jobs = usage.job_count.get(&account.name).copied().unwrap_or(0); - let cpu = usage.cpu_hours.get(&account.name).copied().unwrap_or(0.0); + let &(cpu, jobs) = acct_agg.get(account.name.as_str()).unwrap_or(&(0.0, 0)); let pct = if total_cpu > 0.0 { (cpu / total_cpu) * 100.0 } else { @@ -337,8 +366,15 @@ async fn report_job_sizes_by_user( .context("failed to get usage")?; let usage = usage_resp.into_inner(); - let total_jobs: u64 = usage.job_count.values().sum(); - let total_cpu: f64 = usage.cpu_hours.values().sum(); + let mut user_agg: std::collections::HashMap<(&str, &str), (f64, u64)> = + std::collections::HashMap::new(); + for e in &usage.entries { + let u = user_agg.entry((&e.user, &e.account)).or_default(); + u.0 += e.cpu_hours; + u.1 += e.job_count; + } + let total_cpu: f64 = user_agg.values().map(|v| v.0).sum(); + let total_jobs: u64 = user_agg.values().map(|v| v.1).sum(); let delimiter = if args.parsable { "|" } else { " " }; @@ -361,8 +397,9 @@ async fn report_job_sizes_by_user( } for user in &users { - let jobs = usage.job_count.get(&user.name).copied().unwrap_or(0); - let cpu = usage.cpu_hours.get(&user.name).copied().unwrap_or(0.0); + let &(cpu, jobs) = user_agg + .get(&(user.name.as_str(), user.account.as_str())) + .unwrap_or(&(0.0, 0)); let pct = if total_cpu > 0.0 { (cpu / total_cpu) * 100.0 } else { diff --git a/crates/spur-cli/src/sshare.rs b/crates/spur-cli/src/sshare.rs index 4ab4b30..c185280 100644 --- a/crates/spur-cli/src/sshare.rs +++ b/crates/spur-cli/src/sshare.rs @@ -78,17 +78,16 @@ pub async fn main_with_args(args: Vec) -> Result<()> { total_shares }; - // The server returns keys in "user:account" format. Pre-aggregate by account - // and by (user, account) for lookups below. + // Aggregate usage entries by account and by (user, account) let mut account_cpu_hours: std::collections::HashMap<&str, f64> = std::collections::HashMap::new(); let mut user_account_cpu_hours: std::collections::HashMap<(&str, &str), f64> = std::collections::HashMap::new(); - for (key, &hours) in &usage.cpu_hours { - if let Some((user, account)) = key.split_once(':') { - *account_cpu_hours.entry(account).or_default() += hours; - *user_account_cpu_hours.entry((user, account)).or_default() += hours; - } + for entry in &usage.entries { + *account_cpu_hours.entry(&entry.account).or_default() += entry.cpu_hours; + *user_account_cpu_hours + .entry((&entry.user, &entry.account)) + .or_default() += entry.cpu_hours; } // Compute total usage for normalization diff --git a/crates/spurctld/src/fairshare_cache.rs b/crates/spurctld/src/fairshare_cache.rs index 9e9474a..53ac1ad 100644 --- a/crates/spurctld/src/fairshare_cache.rs +++ b/crates/spurctld/src/fairshare_cache.rs @@ -10,7 +10,7 @@ use spur_proto::proto::slurm_accounting_client::SlurmAccountingClient; use spur_proto::proto::GetFairshareFactorsRequest; pub struct FairshareCache { - factors: RwLock>, + factors: RwLock>, } impl FairshareCache { @@ -21,7 +21,7 @@ impl FairshareCache { } pub fn get(&self, user: &str, account: &str) -> f64 { - let key = format!("{}:{}", user, account); + let key = (user.to_owned(), account.to_owned()); match self.factors.read().get(&key) { Some(&factor) => factor, None => { @@ -34,7 +34,7 @@ impl FairshareCache { } } - fn replace(&self, new_factors: HashMap) { + fn replace(&self, new_factors: HashMap<(String, String), f64>) { *self.factors.write() = new_factors; } @@ -54,7 +54,6 @@ impl FairshareCache { format!("http://{}", host) }; - // Eager first fetch so the cache is populated before the scheduler runs match tokio::time::timeout(Duration::from_secs(5), Self::fetch(&uri, halflife_days)) .await { @@ -85,11 +84,20 @@ impl FairshareCache { }); } - async fn fetch(uri: &str, halflife_days: u32) -> anyhow::Result> { + async fn fetch( + uri: &str, + halflife_days: u32, + ) -> anyhow::Result> { let mut client: SlurmAccountingClient = SlurmAccountingClient::connect(uri.to_owned()).await?; let req = GetFairshareFactorsRequest { halflife_days }; let resp = client.get_fairshare_factors(req).await?; - Ok(resp.into_inner().factors) + let factors = resp + .into_inner() + .entries + .into_iter() + .map(|e| ((e.user, e.account), e.factor)) + .collect(); + Ok(factors) } } diff --git a/crates/spurdbd/src/server.rs b/crates/spurdbd/src/server.rs index 1338ed0..7d09f8d 100644 --- a/crates/spurdbd/src/server.rs +++ b/crates/spurdbd/src/server.rs @@ -207,22 +207,18 @@ impl SlurmAccounting for AccountingService { .await .map_err(|e| Status::internal(e.to_string()))?; - let mut cpu_hours = std::collections::HashMap::new(); - let mut gpu_hours = std::collections::HashMap::new(); - let mut job_count = std::collections::HashMap::new(); - - for r in &records { - let key = format!("{}:{}", r.user_name, r.account); - *cpu_hours.entry(key.clone()).or_insert(0.0) += r.cpu_seconds as f64 / 3600.0; - *gpu_hours.entry(key.clone()).or_insert(0.0) += r.gpu_seconds as f64 / 3600.0; - *job_count.entry(key).or_insert(0u64) += r.job_count; - } - - Ok(Response::new(GetUsageResponse { - cpu_hours, - gpu_hours, - job_count, - })) + let entries = records + .iter() + .map(|r| UsageEntry { + user: r.user_name.clone(), + account: r.account.clone(), + cpu_hours: r.cpu_seconds as f64 / 3600.0, + gpu_hours: r.gpu_seconds as f64 / 3600.0, + job_count: r.job_count, + }) + .collect(); + + Ok(Response::new(GetUsageResponse { entries })) } // ============================================================ @@ -452,12 +448,16 @@ impl SlurmAccounting for AccountingService { let raw_factors = fairshare::compute_fairshare(&usage, &account_weights, halflife_days, now); - let factors = raw_factors + let entries = raw_factors .into_iter() - .map(|((user, account), factor)| (format!("{}:{}", user, account), factor)) + .map(|((user, account), factor)| FairshareEntry { + user, + account, + factor, + }) .collect(); - Ok(Response::new(GetFairshareFactorsResponse { factors })) + Ok(Response::new(GetFairshareFactorsResponse { entries })) } } diff --git a/proto/slurm.proto b/proto/slurm.proto index 6db4bac..29b8705 100644 --- a/proto/slurm.proto +++ b/proto/slurm.proto @@ -557,18 +557,30 @@ message GetUsageRequest { google.protobuf.Timestamp since = 3; } +message UsageEntry { + string user = 1; + string account = 2; + double cpu_hours = 3; + double gpu_hours = 4; + uint64 job_count = 5; +} + message GetUsageResponse { - map cpu_hours = 1; // per account - map gpu_hours = 2; - map job_count = 3; + repeated UsageEntry entries = 1; } message GetFairshareFactorsRequest { uint32 halflife_days = 1; } +message FairshareEntry { + string user = 1; + string account = 2; + double factor = 3; +} + message GetFairshareFactorsResponse { - map factors = 1; // key = "user:account" + repeated FairshareEntry entries = 1; } // -- K8s Operator -- From 91753d11212fdc3fed1813972447e1a25ba904d0 Mon Sep 17 00:00:00 2001 From: Shiv Tyagi Date: Fri, 8 May 2026 07:58:41 +0000 Subject: [PATCH 6/7] fix(spurdbd): normalize fairshare weights and harden refresh loop - Normalize target_share by dividing by total account weight sum so users consuming exactly their allocation get factor ~1.0 - Aggregate usage entries server-side by (user, account) in get_usage handler; simplify sshare/sreport clients to use direct inserts - Clamp halflife_days to 1..=365 in get_fairshare_factors - Enforce minimum 10s refresh interval in fairshare cache - Wrap refresh loop fetch in 10s timeout to avoid hangs - Fix test to use two accounts for meaningful cross-account assertions --- crates/spur-cli/src/sreport.rs | 18 ++++++++-------- crates/spur-cli/src/sshare.rs | 6 ++---- crates/spurctld/src/fairshare_cache.rs | 18 +++++++++------- crates/spurdbd/src/fairshare.rs | 27 +++++++++++++----------- crates/spurdbd/src/server.rs | 29 ++++++++++++++++++-------- 5 files changed, 57 insertions(+), 41 deletions(-) diff --git a/crates/spur-cli/src/sreport.rs b/crates/spur-cli/src/sreport.rs index 6a00a8d..4eb825a 100644 --- a/crates/spur-cli/src/sreport.rs +++ b/crates/spur-cli/src/sreport.rs @@ -151,7 +151,7 @@ async fn report_account_utilization_by_user( } } - // Aggregate entries by account and (user, account) + // Build lookup maps (server guarantees one entry per user+account) let mut acct_agg: std::collections::HashMap<&str, (f64, f64, u64)> = std::collections::HashMap::new(); let mut user_agg: std::collections::HashMap<(&str, &str), (f64, f64, u64)> = @@ -161,10 +161,10 @@ async fn report_account_utilization_by_user( a.0 += e.cpu_hours; a.1 += e.gpu_hours; a.2 += e.job_count; - let u = user_agg.entry((&e.user, &e.account)).or_default(); - u.0 += e.cpu_hours; - u.1 += e.gpu_hours; - u.2 += e.job_count; + user_agg.insert( + (&e.user, &e.account), + (e.cpu_hours, e.gpu_hours, e.job_count), + ); } for account in &accounts { @@ -255,10 +255,10 @@ async fn report_user_utilization_by_account( let mut user_agg: std::collections::HashMap<(&str, &str), (f64, f64, u64)> = std::collections::HashMap::new(); for e in &usage.entries { - let u = user_agg.entry((&e.user, &e.account)).or_default(); - u.0 += e.cpu_hours; - u.1 += e.gpu_hours; - u.2 += e.job_count; + user_agg.insert( + (&e.user, &e.account), + (e.cpu_hours, e.gpu_hours, e.job_count), + ); } for user in &users { diff --git a/crates/spur-cli/src/sshare.rs b/crates/spur-cli/src/sshare.rs index c185280..ab58b87 100644 --- a/crates/spur-cli/src/sshare.rs +++ b/crates/spur-cli/src/sshare.rs @@ -78,16 +78,14 @@ pub async fn main_with_args(args: Vec) -> Result<()> { total_shares }; - // Aggregate usage entries by account and by (user, account) + // Build lookup maps from entries (server guarantees one entry per user+account) let mut account_cpu_hours: std::collections::HashMap<&str, f64> = std::collections::HashMap::new(); let mut user_account_cpu_hours: std::collections::HashMap<(&str, &str), f64> = std::collections::HashMap::new(); for entry in &usage.entries { *account_cpu_hours.entry(&entry.account).or_default() += entry.cpu_hours; - *user_account_cpu_hours - .entry((&entry.user, &entry.account)) - .or_default() += entry.cpu_hours; + user_account_cpu_hours.insert((&entry.user, &entry.account), entry.cpu_hours); } // Compute total usage for normalization diff --git a/crates/spurctld/src/fairshare_cache.rs b/crates/spurctld/src/fairshare_cache.rs index 53ac1ad..1f468b2 100644 --- a/crates/spurctld/src/fairshare_cache.rs +++ b/crates/spurctld/src/fairshare_cache.rs @@ -45,7 +45,7 @@ impl FairshareCache { refresh_interval_secs: u64, ) { let cache = Arc::clone(self); - let interval = Duration::from_secs(refresh_interval_secs); + let interval = Duration::from_secs(refresh_interval_secs.max(10)); tokio::spawn(async move { let uri = if host.starts_with("http://") || host.starts_with("https://") { @@ -72,13 +72,17 @@ impl FairshareCache { loop { tokio::time::sleep(interval).await; - match Self::fetch(&uri, halflife_days).await { - Ok(factors) => { - cache.replace(factors); - } - Err(e) => { - warn!(error = %e, "fairshare refresh failed, retaining stale data"); + match tokio::time::timeout( + Duration::from_secs(10), + Self::fetch(&uri, halflife_days), + ) + .await + { + Ok(Ok(factors)) => cache.replace(factors), + Ok(Err(e)) => { + warn!(error = %e, "fairshare refresh failed, retaining stale data") } + Err(_) => warn!("fairshare refresh timed out, retaining stale data"), } } }); diff --git a/crates/spurdbd/src/fairshare.rs b/crates/spurdbd/src/fairshare.rs index 60288b6..fd44b01 100644 --- a/crates/spurdbd/src/fairshare.rs +++ b/crates/spurdbd/src/fairshare.rs @@ -11,13 +11,17 @@ pub fn compute_fairshare( halflife_days: u32, now: DateTime, ) -> std::collections::HashMap<(String, String), f64> { + let total_weight: f64 = account_weights.values().sum(); + if total_weight <= 0.0 { + return std::collections::HashMap::new(); + } + let halflife = Duration::days(halflife_days as i64); let decay_rate = 2.0_f64.ln() / halflife.num_seconds() as f64; let mut user_usage: std::collections::HashMap<(String, String), f64> = std::collections::HashMap::new(); - // Sum decayed usage per user+account for record in usage { let age = (now - record.period_start).num_seconds().max(0) as f64; let decay = (-decay_rate * age).exp(); @@ -28,22 +32,17 @@ pub fn compute_fairshare( .or_insert(0.0) += weighted_usage; } - // Compute total usage across all users let total_usage: f64 = user_usage.values().sum(); let epsilon = 0.001; - // Compute fair-share factors let mut factors = std::collections::HashMap::new(); for ((user, account), usage) in &user_usage { - let target_share = account_weights.get(account).copied().unwrap_or(1.0); - - // Normalize: actual_share = user_usage / total_usage + let target_share = account_weights.get(account).copied().unwrap_or(1.0) / total_weight; let actual_share = usage / total_usage.max(epsilon); - // fair_share = target_share / actual_share - // High value = underusing allocation → higher priority - // Low value = overusing allocation → lower priority + // factor > 1.0 = underusing allocation (boosted priority) + // factor < 1.0 = overusing allocation (penalized) let factor = target_share / actual_share.max(epsilon); factors.insert((user.clone(), account.clone()), factor.min(100.0)); @@ -71,7 +70,7 @@ mod tests { }, UsageRecord { user_name: "bob".into(), - account: "research".into(), + account: "engineering".into(), cpu_seconds: 10_000, gpu_seconds: 0, job_count: 2, @@ -81,12 +80,16 @@ mod tests { let mut weights = HashMap::new(); weights.insert("research".into(), 1.0); + weights.insert("engineering".into(), 1.0); let factors = compute_fairshare(&usage, &weights, 14, now); - // Bob should have higher factor (used less) let alice_factor = factors.get(&("alice".into(), "research".into())).unwrap(); - let bob_factor = factors.get(&("bob".into(), "research".into())).unwrap(); + let bob_factor = factors.get(&("bob".into(), "engineering".into())).unwrap(); + // Bob used less than his share → boosted above 1.0 + assert!(*bob_factor > 1.0); + // Alice used more than her share → penalized below 1.0 + assert!(*alice_factor < 1.0); assert!(bob_factor > alice_factor); } } diff --git a/crates/spurdbd/src/server.rs b/crates/spurdbd/src/server.rs index 7d09f8d..2e3a26c 100644 --- a/crates/spurdbd/src/server.rs +++ b/crates/spurdbd/src/server.rs @@ -207,14 +207,25 @@ impl SlurmAccounting for AccountingService { .await .map_err(|e| Status::internal(e.to_string()))?; - let entries = records - .iter() - .map(|r| UsageEntry { - user: r.user_name.clone(), - account: r.account.clone(), - cpu_hours: r.cpu_seconds as f64 / 3600.0, - gpu_hours: r.gpu_seconds as f64 / 3600.0, - job_count: r.job_count, + let mut agg: std::collections::HashMap<(String, String), (f64, f64, u64)> = + std::collections::HashMap::new(); + for r in &records { + let e = agg + .entry((r.user_name.clone(), r.account.clone())) + .or_default(); + e.0 += r.cpu_seconds as f64 / 3600.0; + e.1 += r.gpu_seconds as f64 / 3600.0; + e.2 += r.job_count; + } + + let entries = agg + .into_iter() + .map(|((user, account), (cpu, gpu, jobs))| UsageEntry { + user, + account, + cpu_hours: cpu, + gpu_hours: gpu, + job_count: jobs, }) .collect(); @@ -426,7 +437,7 @@ impl SlurmAccounting for AccountingService { let halflife_days = if req.halflife_days == 0 { 14 } else { - req.halflife_days + req.halflife_days.clamp(1, 365) }; let now = Utc::now(); From 0e83613bba7dce4b2960d0ff8fec6bf60e07b637 Mon Sep 17 00:00:00 2001 From: Shiv Tyagi Date: Fri, 8 May 2026 08:39:45 +0000 Subject: [PATCH 7/7] fix(deploy): read log files instead of journalctl in cluster_test debug The debug_fail helper used journalctl which only works with systemd units. The CI cluster runs processes via nohup with logs in ~/spur/log/. Also show full scontrol output, job stdout content, and remote agent logs from mi300-2. --- deploy/bare-metal/cluster_test.sh | 35 ++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/deploy/bare-metal/cluster_test.sh b/deploy/bare-metal/cluster_test.sh index 2590af6..341cd78 100755 --- a/deploy/bare-metal/cluster_test.sh +++ b/deploy/bare-metal/cluster_test.sh @@ -77,18 +77,39 @@ wait_job() { debug_fail() { local name="$1" job_id="$2" echo " DEBUG ${name} (job ${job_id}):" - "${SPUR}/scontrol" show job "$job_id" 2>/dev/null \ - | grep -E 'JobState|ExitCode|NodeList|Reason|Partition|Priority' \ - | sed 's/^/ /' || true + "${SPUR}/scontrol" show job "$job_id" 2>/dev/null | sed 's/^/ /' || true echo " CLUSTER STATE:" echo " Nodes:" "${SPUR}/sinfo" 2>/dev/null | head -20 | sed 's/^/ /' || true echo " Queue:" "${SPUR}/squeue" -t all 2>/dev/null | head -20 | sed 's/^/ /' || true - echo " Controller logs (last 15 lines):" - journalctl -u spurctld --no-pager -n 15 2>/dev/null | sed 's/^/ /' || true - echo " Agent logs (last 10 lines):" - journalctl -u spurd --no-pager -n 10 2>/dev/null | sed 's/^/ /' || true + + # Job output file (if it exists locally or on remote node) + local out_path + out_path=$("${SPUR}/scontrol" show job "$job_id" 2>/dev/null | grep -oP 'StdOut=\K\S+' || true) + if [ -n "$out_path" ]; then + echo " Job stdout (${out_path}):" + if [ -f "$out_path" ]; then + tail -20 "$out_path" 2>/dev/null | sed 's/^/ /' || true + else + ssh mi300-2 "tail -20 '$out_path' 2>/dev/null" 2>/dev/null | sed 's/^/ (mi300-2) /' || echo " (file not found)" + fi + fi + + # Controller log (runs on this node) + echo " Controller logs (last 20 lines):" + tail -20 "${SPUR_HOME}/log/spurctld.log" 2>/dev/null | sed 's/^/ /' \ + || echo " (not found at ${SPUR_HOME}/log/spurctld.log)" + + # Local agent log + echo " Agent logs — $(hostname) (last 15 lines):" + tail -15 "${SPUR_HOME}/log/spurd.log" 2>/dev/null | sed 's/^/ /' \ + || echo " (not found at ${SPUR_HOME}/log/spurd.log)" + + # Remote agent log + echo " Agent logs — mi300-2 (last 15 lines):" + ssh mi300-2 "tail -15 ~/spur/log/spurd.log 2>/dev/null" 2>/dev/null | sed 's/^/ /' \ + || echo " (not reachable or log not found)" } job_state() {