diff --git a/crates/spur-cli/src/sreport.rs b/crates/spur-cli/src/sreport.rs index 1512cba..4eb825a 100644 --- a/crates/spur-cli/src/sreport.rs +++ b/crates/spur-cli/src/sreport.rs @@ -151,12 +151,27 @@ async fn report_account_utilization_by_user( } } + // Build lookup maps (server guarantees one entry per user+account) + let mut acct_agg: std::collections::HashMap<&str, (f64, f64, u64)> = + std::collections::HashMap::new(); + let mut user_agg: std::collections::HashMap<(&str, &str), (f64, f64, u64)> = + std::collections::HashMap::new(); + for e in &usage.entries { + let a = acct_agg.entry(&e.account).or_default(); + a.0 += e.cpu_hours; + a.1 += e.gpu_hours; + a.2 += e.job_count; + user_agg.insert( + (&e.user, &e.account), + (e.cpu_hours, e.gpu_hours, e.job_count), + ); + } + for account in &accounts { - let acct_cpu = usage.cpu_hours.get(&account.name).copied().unwrap_or(0.0); - let acct_gpu = usage.gpu_hours.get(&account.name).copied().unwrap_or(0.0); - let acct_jobs = usage.job_count.get(&account.name).copied().unwrap_or(0); + let &(acct_cpu, acct_gpu, acct_jobs) = acct_agg + .get(account.name.as_str()) + .unwrap_or(&(0.0, 0.0, 0)); - // Account summary row println!( "{:<20}{}{:<15}{}{:>12.1}{}{:>12.1}{}{:>10}", account.name, @@ -170,12 +185,11 @@ async fn report_account_utilization_by_user( acct_jobs ); - // Per-user rows under this account let account_users: Vec<_> = users.iter().filter(|u| u.account == account.name).collect(); for user in &account_users { - let user_cpu = usage.cpu_hours.get(&user.name).copied().unwrap_or(0.0); - let user_gpu = usage.gpu_hours.get(&user.name).copied().unwrap_or(0.0); - let user_jobs = usage.job_count.get(&user.name).copied().unwrap_or(0); + let &(user_cpu, user_gpu, user_jobs) = user_agg + .get(&(user.name.as_str(), account.name.as_str())) + .unwrap_or(&(0.0, 0.0, 0)); println!( " {:<19}{}{:<15}{}{:>12.1}{}{:>12.1}{}{:>10}", @@ -238,10 +252,19 @@ async fn report_user_utilization_by_account( } } + let mut user_agg: std::collections::HashMap<(&str, &str), (f64, f64, u64)> = + std::collections::HashMap::new(); + for e in &usage.entries { + user_agg.insert( + (&e.user, &e.account), + (e.cpu_hours, e.gpu_hours, e.job_count), + ); + } + for user in &users { - let cpu = usage.cpu_hours.get(&user.name).copied().unwrap_or(0.0); - let gpu = usage.gpu_hours.get(&user.name).copied().unwrap_or(0.0); - let jobs = usage.job_count.get(&user.name).copied().unwrap_or(0); + let &(cpu, gpu, jobs) = user_agg + .get(&(user.name.as_str(), user.account.as_str())) + .unwrap_or(&(0.0, 0.0, 0)); println!( "{:<15}{}{:<20}{}{:>12.1}{}{:>12.1}{}{:>10}", @@ -273,8 +296,15 @@ async fn report_job_sizes_by_account( .context("failed to get usage")?; let usage = usage_resp.into_inner(); - let total_jobs: u64 = usage.job_count.values().sum(); - let total_cpu: f64 = usage.cpu_hours.values().sum(); + let mut acct_agg: std::collections::HashMap<&str, (f64, u64)> = + std::collections::HashMap::new(); + for e in &usage.entries { + let a = acct_agg.entry(&e.account).or_default(); + a.0 += e.cpu_hours; + a.1 += e.job_count; + } + let total_cpu: f64 = acct_agg.values().map(|v| v.0).sum(); + let total_jobs: u64 = acct_agg.values().map(|v| v.1).sum(); let delimiter = if args.parsable { "|" } else { " " }; @@ -289,8 +319,7 @@ async fn report_job_sizes_by_account( } for account in &accounts { - let jobs = usage.job_count.get(&account.name).copied().unwrap_or(0); - let cpu = usage.cpu_hours.get(&account.name).copied().unwrap_or(0.0); + let &(cpu, jobs) = acct_agg.get(account.name.as_str()).unwrap_or(&(0.0, 0)); let pct = if total_cpu > 0.0 { (cpu / total_cpu) * 100.0 } else { @@ -337,8 +366,15 @@ async fn report_job_sizes_by_user( .context("failed to get usage")?; let usage = usage_resp.into_inner(); - let total_jobs: u64 = usage.job_count.values().sum(); - let total_cpu: f64 = usage.cpu_hours.values().sum(); + let mut user_agg: std::collections::HashMap<(&str, &str), (f64, u64)> = + std::collections::HashMap::new(); + for e in &usage.entries { + let u = user_agg.entry((&e.user, &e.account)).or_default(); + u.0 += e.cpu_hours; + u.1 += e.job_count; + } + let total_cpu: f64 = user_agg.values().map(|v| v.0).sum(); + let total_jobs: u64 = user_agg.values().map(|v| v.1).sum(); let delimiter = if args.parsable { "|" } else { " " }; @@ -361,8 +397,9 @@ async fn report_job_sizes_by_user( } for user in &users { - let jobs = usage.job_count.get(&user.name).copied().unwrap_or(0); - let cpu = usage.cpu_hours.get(&user.name).copied().unwrap_or(0.0); + let &(cpu, jobs) = user_agg + .get(&(user.name.as_str(), user.account.as_str())) + .unwrap_or(&(0.0, 0)); let pct = if total_cpu > 0.0 { (cpu / total_cpu) * 100.0 } else { diff --git a/crates/spur-cli/src/sshare.rs b/crates/spur-cli/src/sshare.rs index 6fb0058..ab58b87 100644 --- a/crates/spur-cli/src/sshare.rs +++ b/crates/spur-cli/src/sshare.rs @@ -78,8 +78,18 @@ pub async fn main_with_args(args: Vec) -> Result<()> { total_shares }; + // Build lookup maps from entries (server guarantees one entry per user+account) + let mut account_cpu_hours: std::collections::HashMap<&str, f64> = + std::collections::HashMap::new(); + let mut user_account_cpu_hours: std::collections::HashMap<(&str, &str), f64> = + std::collections::HashMap::new(); + for entry in &usage.entries { + *account_cpu_hours.entry(&entry.account).or_default() += entry.cpu_hours; + user_account_cpu_hours.insert((&entry.user, &entry.account), entry.cpu_hours); + } + // Compute total usage for normalization - let total_cpu_usage: f64 = usage.cpu_hours.values().sum(); + let total_cpu_usage: f64 = account_cpu_hours.values().sum(); let total_cpu_usage = if total_cpu_usage <= 0.0 { 1.0 } else { @@ -119,7 +129,10 @@ pub async fn main_with_args(args: Vec) -> Result<()> { let raw_shares = account.fairshare_weight; let norm_shares = raw_shares / total_shares; - let raw_usage = usage.cpu_hours.get(&account.name).copied().unwrap_or(0.0); + let raw_usage = account_cpu_hours + .get(account.name.as_str()) + .copied() + .unwrap_or(0.0); let norm_usage = raw_usage / total_cpu_usage; let fair_share = if norm_usage > 0.001 { norm_shares / norm_usage @@ -159,7 +172,10 @@ pub async fn main_with_args(args: Vec) -> Result<()> { } } - let user_usage = usage.cpu_hours.get(&user.name).copied().unwrap_or(0.0); + let user_usage = user_account_cpu_hours + .get(&(user.name.as_str(), account.name.as_str())) + .copied() + .unwrap_or(0.0); let user_norm_usage = user_usage / total_cpu_usage; // Each user within an account gets an equal sub-share let user_count = account_users.len().max(1) as f64; diff --git a/crates/spur-core/src/config.rs b/crates/spur-core/src/config.rs index 52c199e..c0872ab 100644 --- a/crates/spur-core/src/config.rs +++ b/crates/spur-core/src/config.rs @@ -199,6 +199,9 @@ pub struct AccountingConfig { /// How long to keep completed job records. #[serde(default = "default_purge_days")] pub purge_after_days: u32, + /// How often to refresh fairshare factors from the accounting daemon. + #[serde(default = "default_fairshare_refresh_secs")] + pub fairshare_refresh_secs: u32, } fn default_accounting_host() -> String { @@ -210,6 +213,9 @@ fn default_database_url() -> String { fn default_purge_days() -> u32 { 365 } +fn default_fairshare_refresh_secs() -> u32 { + 300 +} impl Default for AccountingConfig { fn default() -> Self { @@ -217,6 +223,7 @@ impl Default for AccountingConfig { host: "localhost:6819".into(), database_url: "postgresql://spur:spur@localhost/spur".into(), purge_after_days: 365, + fairshare_refresh_secs: 300, } } } diff --git a/crates/spur-sched/src/priority.rs b/crates/spur-sched/src/priority.rs index ef06330..b7df43a 100644 --- a/crates/spur-sched/src/priority.rs +++ b/crates/spur-sched/src/priority.rs @@ -1,31 +1,3 @@ -use chrono::{DateTime, Duration, Utc}; - -/// Calculate fair-share factor for a user/account. -/// -/// fair_share_factor = target_share / max(decayed_actual_usage, epsilon) -/// -/// The decay uses a half-life model where older usage matters less. -pub fn fair_share_factor( - target_share: f64, - usage_records: &[(DateTime, f64)], // (timestamp, cpu_hours) - halflife_days: u32, - now: DateTime, -) -> f64 { - let halflife = Duration::days(halflife_days as i64); - let decay_rate = 2.0_f64.ln() / halflife.num_seconds() as f64; - - let decayed_usage: f64 = usage_records - .iter() - .map(|(time, usage)| { - let age = (now - *time).num_seconds().max(0) as f64; - usage * (-decay_rate * age).exp() - }) - .sum(); - - let epsilon = 0.001; // Avoid division by zero - target_share / decayed_usage.max(epsilon) -} - /// Calculate effective job priority. /// /// effective_priority = base_priority * fair_share_factor * age_factor * partition_tier @@ -49,34 +21,21 @@ mod tests { use super::*; #[test] - fn test_fair_share_no_usage() { - let factor = fair_share_factor(1.0, &[], 14, Utc::now()); - // With no usage, factor should be very high (share / epsilon) - assert!(factor > 100.0); - } - - #[test] - fn test_fair_share_heavy_usage() { - let now = Utc::now(); - let records: Vec<(DateTime, f64)> = - (0..14).map(|d| (now - Duration::days(d), 100.0)).collect(); - - let factor = fair_share_factor(1.0, &records, 14, now); - // Heavy recent usage → low factor - assert!(factor < 1.0); - } - - #[test] - fn test_effective_priority() { - let p = effective_priority(1000, 1.0, 0, 1); - assert_eq!(p, 1000); - - // With age bonus - let p = effective_priority(1000, 1.0, 10080, 1); - assert_eq!(p, 2000); - - // With partition tier - let p = effective_priority(1000, 1.0, 0, 2); - assert_eq!(p, 2000); + fn effective_priority_cases() { + let cases = [ + ((1000, 1.0, 0, 1), 1000), + ((1000, 1.0, 10080, 1), 2000), + ((1000, 1.0, 0, 2), 2000), + ((1000, 100.0, 0, 1), 10_000), + ((0, 0.001, 0, 1), 1), + ]; + + for ((base, fs, age, tier), expected) in cases { + let got = effective_priority(base, fs, age, tier); + assert_eq!( + got, expected, + "effective_priority({base}, {fs}, {age}, {tier})" + ); + } } } diff --git a/crates/spur-tests/src/t24_priority.rs b/crates/spur-tests/src/t24_priority.rs index 91f7134..6b6e7ba 100644 --- a/crates/spur-tests/src/t24_priority.rs +++ b/crates/spur-tests/src/t24_priority.rs @@ -4,48 +4,8 @@ #[cfg(test)] mod tests { - use chrono::{Duration, Utc}; use spur_sched::priority; - // ── T24.1: Fair-share with no usage ────────────────────────── - - #[test] - fn t24_1_fair_share_no_usage() { - let factor = priority::fair_share_factor(1.0, &[], 14, Utc::now()); - assert!(factor > 100.0, "no usage → very high factor"); - } - - // ── T24.2: Fair-share with heavy recent usage ──────────────── - - #[test] - fn t24_2_fair_share_heavy_usage() { - let now = Utc::now(); - let records: Vec<_> = (0..14).map(|d| (now - Duration::days(d), 100.0)).collect(); - let factor = priority::fair_share_factor(1.0, &records, 14, now); - assert!(factor < 1.0, "heavy usage → low factor"); - } - - // ── T24.3: Usage decay ─────────────────────────────────────── - - #[test] - fn t24_3_old_usage_decays() { - let now = Utc::now(); - - // Recent usage - let recent = vec![(now - Duration::hours(1), 100.0)]; - let factor_recent = priority::fair_share_factor(1.0, &recent, 14, now); - - // Old usage (same amount but 30 days ago) - let old = vec![(now - Duration::days(30), 100.0)]; - let factor_old = priority::fair_share_factor(1.0, &old, 14, now); - - // Old usage should decay → higher factor - assert!( - factor_old > factor_recent, - "old usage should result in higher factor than recent" - ); - } - // ── T24.4: Effective priority ──────────────────────────────── #[test] diff --git a/crates/spurctld/src/cluster.rs b/crates/spurctld/src/cluster.rs index 02769d8..57b697a 100644 --- a/crates/spurctld/src/cluster.rs +++ b/crates/spurctld/src/cluster.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::path::Path; use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; use chrono::{DateTime, Utc}; use parking_lot::RwLock; @@ -18,6 +19,7 @@ use spur_core::step::{JobStep, StepState, STEP_BATCH}; use spur_core::wal::WalOperation; use crate::accounting::AccountingNotifier; +use crate::fairshare_cache::FairshareCache; use crate::raft::{SpurRaft, StateMachineApply}; /// Central cluster state manager. @@ -36,12 +38,14 @@ pub struct ClusterManager { hostname_aliases: RwLock>, raft: RwLock>, accounting: RwLock>, + fairshare_cache: Arc, } impl ClusterManager { pub fn new(config: SlurmConfig, _state_dir: &Path) -> anyhow::Result { let partitions = config.build_partitions(); let license_pool = config.licenses.clone(); + let fairshare_cache = Arc::new(FairshareCache::new()); let cm = Self { config, @@ -55,6 +59,7 @@ impl ClusterManager { hostname_aliases: RwLock::new(HashMap::new()), raft: RwLock::new(None), accounting: RwLock::new(None), + fairshare_cache, }; info!("cluster manager initialized (state will be recovered via Raft)"); @@ -953,10 +958,12 @@ impl ClusterManager { .and_then(|pname| partitions.iter().find(|p| p.name == *pname)) .map(|p| p.priority_tier) .unwrap_or(1); - // fair_share = 1.0 (neutral) until spurdbd integration + let fair_share = self + .fairshare_cache + .get(&job.spec.user, job.spec.account.as_deref().unwrap_or("")); job.priority = spur_sched::priority::effective_priority( job.priority, - 1.0, + fair_share, age_minutes, partition_tier, ); @@ -1248,6 +1255,10 @@ impl ClusterManager { *self.accounting.write() = Some(notifier); } + pub fn fairshare_cache(&self) -> &Arc { + &self.fairshare_cache + } + /// Persist a mutation via Raft consensus. The apply callback /// (`StateMachineApply`) handles in-memory state on all nodes. fn propose(&self, op: WalOperation) -> anyhow::Result<()> { diff --git a/crates/spurctld/src/fairshare_cache.rs b/crates/spurctld/src/fairshare_cache.rs new file mode 100644 index 0000000..1f468b2 --- /dev/null +++ b/crates/spurctld/src/fairshare_cache.rs @@ -0,0 +1,107 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use parking_lot::RwLock; +use tonic::transport::Channel; +use tracing::{debug, info, warn}; + +use spur_proto::proto::slurm_accounting_client::SlurmAccountingClient; +use spur_proto::proto::GetFairshareFactorsRequest; + +pub struct FairshareCache { + factors: RwLock>, +} + +impl FairshareCache { + pub fn new() -> Self { + Self { + factors: RwLock::new(HashMap::new()), + } + } + + pub fn get(&self, user: &str, account: &str) -> f64 { + let key = (user.to_owned(), account.to_owned()); + match self.factors.read().get(&key) { + Some(&factor) => factor, + None => { + debug!( + user, + account, "fairshare factor not found, defaulting to neutral" + ); + 1.0 + } + } + } + + fn replace(&self, new_factors: HashMap<(String, String), f64>) { + *self.factors.write() = new_factors; + } + + pub fn spawn_refresh_loop( + self: &Arc, + host: String, + halflife_days: u32, + refresh_interval_secs: u64, + ) { + let cache = Arc::clone(self); + let interval = Duration::from_secs(refresh_interval_secs.max(10)); + + tokio::spawn(async move { + let uri = if host.starts_with("http://") || host.starts_with("https://") { + host.clone() + } else { + format!("http://{}", host) + }; + + match tokio::time::timeout(Duration::from_secs(5), Self::fetch(&uri, halflife_days)) + .await + { + Ok(Ok(factors)) => { + info!(count = factors.len(), "fairshare cache initialized"); + cache.replace(factors); + } + Ok(Err(e)) => { + warn!(error = %e, "initial fairshare fetch failed, will retry in background"); + } + Err(_) => { + warn!("initial fairshare fetch timed out, will retry in background"); + } + } + + loop { + tokio::time::sleep(interval).await; + + match tokio::time::timeout( + Duration::from_secs(10), + Self::fetch(&uri, halflife_days), + ) + .await + { + Ok(Ok(factors)) => cache.replace(factors), + Ok(Err(e)) => { + warn!(error = %e, "fairshare refresh failed, retaining stale data") + } + Err(_) => warn!("fairshare refresh timed out, retaining stale data"), + } + } + }); + } + + async fn fetch( + uri: &str, + halflife_days: u32, + ) -> anyhow::Result> { + let mut client: SlurmAccountingClient = + SlurmAccountingClient::connect(uri.to_owned()).await?; + let req = GetFairshareFactorsRequest { halflife_days }; + let resp = client.get_fairshare_factors(req).await?; + let factors = resp + .into_inner() + .entries + .into_iter() + .map(|e| ((e.user, e.account), e.factor)) + .collect(); + Ok(factors) + } +} diff --git a/crates/spurctld/src/main.rs b/crates/spurctld/src/main.rs index ce5db44..2377712 100644 --- a/crates/spurctld/src/main.rs +++ b/crates/spurctld/src/main.rs @@ -1,5 +1,6 @@ mod accounting; mod cluster; +mod fairshare_cache; mod raft; mod raft_server; mod scheduler_loop; @@ -147,6 +148,13 @@ async fn main() -> anyhow::Result<()> { } } + // Start fairshare factor refresh loop + cluster.fairshare_cache().spawn_refresh_loop( + config.accounting.host.clone(), + config.scheduler.fairshare_halflife_days, + config.accounting.fairshare_refresh_secs as u64, + ); + // Start scheduler loop (only schedules when this node is Raft leader) let sched_cluster = cluster.clone(); let sched_raft = raft_handle.clone(); diff --git a/crates/spurdbd/src/db.rs b/crates/spurdbd/src/db.rs index 7fdc320..479a56a 100644 --- a/crates/spurdbd/src/db.rs +++ b/crates/spurdbd/src/db.rs @@ -361,8 +361,10 @@ pub async fn get_usage( ) -> anyhow::Result> { let rows = sqlx::query( r#" - SELECT user_name, account, SUM(cpu_seconds) as total_cpu_seconds, - SUM(gpu_seconds) as total_gpu_seconds, SUM(job_count) as total_jobs, + SELECT user_name, account, + SUM(cpu_seconds)::BIGINT as total_cpu_seconds, + SUM(gpu_seconds)::BIGINT as total_gpu_seconds, + SUM(job_count)::BIGINT as total_jobs, period_start FROM usage WHERE period_start >= $1 diff --git a/crates/spurdbd/src/fairshare.rs b/crates/spurdbd/src/fairshare.rs index 60288b6..fd44b01 100644 --- a/crates/spurdbd/src/fairshare.rs +++ b/crates/spurdbd/src/fairshare.rs @@ -11,13 +11,17 @@ pub fn compute_fairshare( halflife_days: u32, now: DateTime, ) -> std::collections::HashMap<(String, String), f64> { + let total_weight: f64 = account_weights.values().sum(); + if total_weight <= 0.0 { + return std::collections::HashMap::new(); + } + let halflife = Duration::days(halflife_days as i64); let decay_rate = 2.0_f64.ln() / halflife.num_seconds() as f64; let mut user_usage: std::collections::HashMap<(String, String), f64> = std::collections::HashMap::new(); - // Sum decayed usage per user+account for record in usage { let age = (now - record.period_start).num_seconds().max(0) as f64; let decay = (-decay_rate * age).exp(); @@ -28,22 +32,17 @@ pub fn compute_fairshare( .or_insert(0.0) += weighted_usage; } - // Compute total usage across all users let total_usage: f64 = user_usage.values().sum(); let epsilon = 0.001; - // Compute fair-share factors let mut factors = std::collections::HashMap::new(); for ((user, account), usage) in &user_usage { - let target_share = account_weights.get(account).copied().unwrap_or(1.0); - - // Normalize: actual_share = user_usage / total_usage + let target_share = account_weights.get(account).copied().unwrap_or(1.0) / total_weight; let actual_share = usage / total_usage.max(epsilon); - // fair_share = target_share / actual_share - // High value = underusing allocation → higher priority - // Low value = overusing allocation → lower priority + // factor > 1.0 = underusing allocation (boosted priority) + // factor < 1.0 = overusing allocation (penalized) let factor = target_share / actual_share.max(epsilon); factors.insert((user.clone(), account.clone()), factor.min(100.0)); @@ -71,7 +70,7 @@ mod tests { }, UsageRecord { user_name: "bob".into(), - account: "research".into(), + account: "engineering".into(), cpu_seconds: 10_000, gpu_seconds: 0, job_count: 2, @@ -81,12 +80,16 @@ mod tests { let mut weights = HashMap::new(); weights.insert("research".into(), 1.0); + weights.insert("engineering".into(), 1.0); let factors = compute_fairshare(&usage, &weights, 14, now); - // Bob should have higher factor (used less) let alice_factor = factors.get(&("alice".into(), "research".into())).unwrap(); - let bob_factor = factors.get(&("bob".into(), "research".into())).unwrap(); + let bob_factor = factors.get(&("bob".into(), "engineering".into())).unwrap(); + // Bob used less than his share → boosted above 1.0 + assert!(*bob_factor > 1.0); + // Alice used more than her share → penalized below 1.0 + assert!(*alice_factor < 1.0); assert!(bob_factor > alice_factor); } } diff --git a/crates/spurdbd/src/server.rs b/crates/spurdbd/src/server.rs index 7ea117a..2e3a26c 100644 --- a/crates/spurdbd/src/server.rs +++ b/crates/spurdbd/src/server.rs @@ -9,7 +9,7 @@ use spur_proto::proto::*; #[allow(unused_imports)] use tracing::info; -use crate::db; +use crate::{db, fairshare}; pub struct AccountingService { pool: PgPool, @@ -207,22 +207,29 @@ impl SlurmAccounting for AccountingService { .await .map_err(|e| Status::internal(e.to_string()))?; - let mut cpu_hours = std::collections::HashMap::new(); - let mut gpu_hours = std::collections::HashMap::new(); - let mut job_count = std::collections::HashMap::new(); - + let mut agg: std::collections::HashMap<(String, String), (f64, f64, u64)> = + std::collections::HashMap::new(); for r in &records { - let key = format!("{}:{}", r.user_name, r.account); - *cpu_hours.entry(key.clone()).or_insert(0.0) += r.cpu_seconds as f64 / 3600.0; - *gpu_hours.entry(key.clone()).or_insert(0.0) += r.gpu_seconds as f64 / 3600.0; - *job_count.entry(key).or_insert(0u64) += r.job_count; + let e = agg + .entry((r.user_name.clone(), r.account.clone())) + .or_default(); + e.0 += r.cpu_seconds as f64 / 3600.0; + e.1 += r.gpu_seconds as f64 / 3600.0; + e.2 += r.job_count; } - Ok(Response::new(GetUsageResponse { - cpu_hours, - gpu_hours, - job_count, - })) + let entries = agg + .into_iter() + .map(|((user, account), (cpu, gpu, jobs))| UsageEntry { + user, + account, + cpu_hours: cpu, + gpu_hours: gpu, + job_count: jobs, + }) + .collect(); + + Ok(Response::new(GetUsageResponse { entries })) } // ============================================================ @@ -417,6 +424,52 @@ impl SlurmAccounting for AccountingService { Ok(Response::new(ListQosResponse { qos_list })) } + + // ============================================================ + // Fairshare + // ============================================================ + + async fn get_fairshare_factors( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let halflife_days = if req.halflife_days == 0 { + 14 + } else { + req.halflife_days.clamp(1, 365) + }; + + let now = Utc::now(); + let since = now - chrono::Duration::days(halflife_days as i64 * 4); + + let usage = db::get_usage(&self.pool, None, None, since) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + let accounts = db::list_accounts(&self.pool) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + let account_weights: std::collections::HashMap = accounts + .into_iter() + .map(|a| (a.name, a.fairshare_weight as f64)) + .collect(); + + let raw_factors = + fairshare::compute_fairshare(&usage, &account_weights, halflife_days, now); + + let entries = raw_factors + .into_iter() + .map(|((user, account), factor)| FairshareEntry { + user, + account, + factor, + }) + .collect(); + + Ok(Response::new(GetFairshareFactorsResponse { entries })) + } } pub async fn serve(addr: SocketAddr, pool: PgPool) -> anyhow::Result<()> { diff --git a/deploy/bare-metal/cluster_test.sh b/deploy/bare-metal/cluster_test.sh index 2590af6..341cd78 100755 --- a/deploy/bare-metal/cluster_test.sh +++ b/deploy/bare-metal/cluster_test.sh @@ -77,18 +77,39 @@ wait_job() { debug_fail() { local name="$1" job_id="$2" echo " DEBUG ${name} (job ${job_id}):" - "${SPUR}/scontrol" show job "$job_id" 2>/dev/null \ - | grep -E 'JobState|ExitCode|NodeList|Reason|Partition|Priority' \ - | sed 's/^/ /' || true + "${SPUR}/scontrol" show job "$job_id" 2>/dev/null | sed 's/^/ /' || true echo " CLUSTER STATE:" echo " Nodes:" "${SPUR}/sinfo" 2>/dev/null | head -20 | sed 's/^/ /' || true echo " Queue:" "${SPUR}/squeue" -t all 2>/dev/null | head -20 | sed 's/^/ /' || true - echo " Controller logs (last 15 lines):" - journalctl -u spurctld --no-pager -n 15 2>/dev/null | sed 's/^/ /' || true - echo " Agent logs (last 10 lines):" - journalctl -u spurd --no-pager -n 10 2>/dev/null | sed 's/^/ /' || true + + # Job output file (if it exists locally or on remote node) + local out_path + out_path=$("${SPUR}/scontrol" show job "$job_id" 2>/dev/null | grep -oP 'StdOut=\K\S+' || true) + if [ -n "$out_path" ]; then + echo " Job stdout (${out_path}):" + if [ -f "$out_path" ]; then + tail -20 "$out_path" 2>/dev/null | sed 's/^/ /' || true + else + ssh mi300-2 "tail -20 '$out_path' 2>/dev/null" 2>/dev/null | sed 's/^/ (mi300-2) /' || echo " (file not found)" + fi + fi + + # Controller log (runs on this node) + echo " Controller logs (last 20 lines):" + tail -20 "${SPUR_HOME}/log/spurctld.log" 2>/dev/null | sed 's/^/ /' \ + || echo " (not found at ${SPUR_HOME}/log/spurctld.log)" + + # Local agent log + echo " Agent logs — $(hostname) (last 15 lines):" + tail -15 "${SPUR_HOME}/log/spurd.log" 2>/dev/null | sed 's/^/ /' \ + || echo " (not found at ${SPUR_HOME}/log/spurd.log)" + + # Remote agent log + echo " Agent logs — mi300-2 (last 15 lines):" + ssh mi300-2 "tail -15 ~/spur/log/spurd.log 2>/dev/null" 2>/dev/null | sed 's/^/ /' \ + || echo " (not reachable or log not found)" } job_state() { diff --git a/deploy/k8s/configmap.yaml b/deploy/k8s/configmap.yaml index 139f9fd..81ebcbf 100644 --- a/deploy/k8s/configmap.yaml +++ b/deploy/k8s/configmap.yaml @@ -22,8 +22,7 @@ data: plugin = "backfill" [accounting] - enabled = true - address = "spurdbd.spur.svc.cluster.local:6819" + host = "spurdbd.spur.svc.cluster.local:6819" [[partitions]] name = "default" diff --git a/proto/slurm.proto b/proto/slurm.proto index d16c05d..29b8705 100644 --- a/proto/slurm.proto +++ b/proto/slurm.proto @@ -310,6 +310,9 @@ service SlurmAccounting { rpc CreateQos(CreateQosRequest) returns (google.protobuf.Empty); rpc DeleteQos(DeleteQosRequest) returns (google.protobuf.Empty); rpc ListQos(ListQosRequest) returns (ListQosResponse); + + // Fairshare + rpc GetFairshareFactors(GetFairshareFactorsRequest) returns (GetFairshareFactorsResponse); } // ============================================================ @@ -554,10 +557,30 @@ message GetUsageRequest { google.protobuf.Timestamp since = 3; } +message UsageEntry { + string user = 1; + string account = 2; + double cpu_hours = 3; + double gpu_hours = 4; + uint64 job_count = 5; +} + message GetUsageResponse { - map cpu_hours = 1; // per account - map gpu_hours = 2; - map job_count = 3; + repeated UsageEntry entries = 1; +} + +message GetFairshareFactorsRequest { + uint32 halflife_days = 1; +} + +message FairshareEntry { + string user = 1; + string account = 2; + double factor = 3; +} + +message GetFairshareFactorsResponse { + repeated FairshareEntry entries = 1; } // -- K8s Operator --