Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 56 additions & 19 deletions crates/spur-cli/src/sreport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,27 @@ async fn report_account_utilization_by_user(
}
}

// Build lookup maps (server guarantees one entry per user+account)
let mut acct_agg: std::collections::HashMap<&str, (f64, f64, u64)> =
std::collections::HashMap::new();
let mut user_agg: std::collections::HashMap<(&str, &str), (f64, f64, u64)> =
std::collections::HashMap::new();
for e in &usage.entries {
let a = acct_agg.entry(&e.account).or_default();
a.0 += e.cpu_hours;
a.1 += e.gpu_hours;
a.2 += e.job_count;
user_agg.insert(
(&e.user, &e.account),
(e.cpu_hours, e.gpu_hours, e.job_count),
);
}

for account in &accounts {
let acct_cpu = usage.cpu_hours.get(&account.name).copied().unwrap_or(0.0);
let acct_gpu = usage.gpu_hours.get(&account.name).copied().unwrap_or(0.0);
let acct_jobs = usage.job_count.get(&account.name).copied().unwrap_or(0);
let &(acct_cpu, acct_gpu, acct_jobs) = acct_agg
.get(account.name.as_str())
.unwrap_or(&(0.0, 0.0, 0));

// Account summary row
println!(
"{:<20}{}{:<15}{}{:>12.1}{}{:>12.1}{}{:>10}",
account.name,
Expand All @@ -170,12 +185,11 @@ async fn report_account_utilization_by_user(
acct_jobs
);

// Per-user rows under this account
let account_users: Vec<_> = users.iter().filter(|u| u.account == account.name).collect();
for user in &account_users {
let user_cpu = usage.cpu_hours.get(&user.name).copied().unwrap_or(0.0);
let user_gpu = usage.gpu_hours.get(&user.name).copied().unwrap_or(0.0);
let user_jobs = usage.job_count.get(&user.name).copied().unwrap_or(0);
let &(user_cpu, user_gpu, user_jobs) = user_agg
.get(&(user.name.as_str(), account.name.as_str()))
.unwrap_or(&(0.0, 0.0, 0));

println!(
" {:<19}{}{:<15}{}{:>12.1}{}{:>12.1}{}{:>10}",
Expand Down Expand Up @@ -238,10 +252,19 @@ async fn report_user_utilization_by_account(
}
}

let mut user_agg: std::collections::HashMap<(&str, &str), (f64, f64, u64)> =
std::collections::HashMap::new();
for e in &usage.entries {
user_agg.insert(
(&e.user, &e.account),
(e.cpu_hours, e.gpu_hours, e.job_count),
);
}

for user in &users {
let cpu = usage.cpu_hours.get(&user.name).copied().unwrap_or(0.0);
let gpu = usage.gpu_hours.get(&user.name).copied().unwrap_or(0.0);
let jobs = usage.job_count.get(&user.name).copied().unwrap_or(0);
let &(cpu, gpu, jobs) = user_agg
.get(&(user.name.as_str(), user.account.as_str()))
.unwrap_or(&(0.0, 0.0, 0));

println!(
"{:<15}{}{:<20}{}{:>12.1}{}{:>12.1}{}{:>10}",
Expand Down Expand Up @@ -273,8 +296,15 @@ async fn report_job_sizes_by_account(
.context("failed to get usage")?;
let usage = usage_resp.into_inner();

let total_jobs: u64 = usage.job_count.values().sum();
let total_cpu: f64 = usage.cpu_hours.values().sum();
let mut acct_agg: std::collections::HashMap<&str, (f64, u64)> =
std::collections::HashMap::new();
for e in &usage.entries {
let a = acct_agg.entry(&e.account).or_default();
a.0 += e.cpu_hours;
a.1 += e.job_count;
}
let total_cpu: f64 = acct_agg.values().map(|v| v.0).sum();
let total_jobs: u64 = acct_agg.values().map(|v| v.1).sum();

let delimiter = if args.parsable { "|" } else { " " };

Expand All @@ -289,8 +319,7 @@ async fn report_job_sizes_by_account(
}

for account in &accounts {
let jobs = usage.job_count.get(&account.name).copied().unwrap_or(0);
let cpu = usage.cpu_hours.get(&account.name).copied().unwrap_or(0.0);
let &(cpu, jobs) = acct_agg.get(account.name.as_str()).unwrap_or(&(0.0, 0));
let pct = if total_cpu > 0.0 {
(cpu / total_cpu) * 100.0
} else {
Expand Down Expand Up @@ -337,8 +366,15 @@ async fn report_job_sizes_by_user(
.context("failed to get usage")?;
let usage = usage_resp.into_inner();

let total_jobs: u64 = usage.job_count.values().sum();
let total_cpu: f64 = usage.cpu_hours.values().sum();
let mut user_agg: std::collections::HashMap<(&str, &str), (f64, u64)> =
std::collections::HashMap::new();
for e in &usage.entries {
let u = user_agg.entry((&e.user, &e.account)).or_default();
u.0 += e.cpu_hours;
u.1 += e.job_count;
}
let total_cpu: f64 = user_agg.values().map(|v| v.0).sum();
let total_jobs: u64 = user_agg.values().map(|v| v.1).sum();

let delimiter = if args.parsable { "|" } else { " " };

Expand All @@ -361,8 +397,9 @@ async fn report_job_sizes_by_user(
}

for user in &users {
let jobs = usage.job_count.get(&user.name).copied().unwrap_or(0);
let cpu = usage.cpu_hours.get(&user.name).copied().unwrap_or(0.0);
let &(cpu, jobs) = user_agg
.get(&(user.name.as_str(), user.account.as_str()))
.unwrap_or(&(0.0, 0));
let pct = if total_cpu > 0.0 {
(cpu / total_cpu) * 100.0
} else {
Expand Down
22 changes: 19 additions & 3 deletions crates/spur-cli/src/sshare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,18 @@ pub async fn main_with_args(args: Vec<String>) -> Result<()> {
total_shares
};

// Build lookup maps from entries (server guarantees one entry per user+account)
let mut account_cpu_hours: std::collections::HashMap<&str, f64> =
std::collections::HashMap::new();
let mut user_account_cpu_hours: std::collections::HashMap<(&str, &str), f64> =
std::collections::HashMap::new();
for entry in &usage.entries {
*account_cpu_hours.entry(&entry.account).or_default() += entry.cpu_hours;
user_account_cpu_hours.insert((&entry.user, &entry.account), entry.cpu_hours);
}

// Compute total usage for normalization
let total_cpu_usage: f64 = usage.cpu_hours.values().sum();
let total_cpu_usage: f64 = account_cpu_hours.values().sum();
let total_cpu_usage = if total_cpu_usage <= 0.0 {
1.0
} else {
Expand Down Expand Up @@ -119,7 +129,10 @@ pub async fn main_with_args(args: Vec<String>) -> Result<()> {

let raw_shares = account.fairshare_weight;
let norm_shares = raw_shares / total_shares;
let raw_usage = usage.cpu_hours.get(&account.name).copied().unwrap_or(0.0);
let raw_usage = account_cpu_hours
.get(account.name.as_str())
.copied()
.unwrap_or(0.0);
let norm_usage = raw_usage / total_cpu_usage;
let fair_share = if norm_usage > 0.001 {
norm_shares / norm_usage
Expand Down Expand Up @@ -159,7 +172,10 @@ pub async fn main_with_args(args: Vec<String>) -> Result<()> {
}
}

let user_usage = usage.cpu_hours.get(&user.name).copied().unwrap_or(0.0);
let user_usage = user_account_cpu_hours
.get(&(user.name.as_str(), account.name.as_str()))
.copied()
.unwrap_or(0.0);
let user_norm_usage = user_usage / total_cpu_usage;
// Each user within an account gets an equal sub-share
let user_count = account_users.len().max(1) as f64;
Expand Down
7 changes: 7 additions & 0 deletions crates/spur-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ pub struct AccountingConfig {
/// How long to keep completed job records.
#[serde(default = "default_purge_days")]
pub purge_after_days: u32,
/// How often to refresh fairshare factors from the accounting daemon.
#[serde(default = "default_fairshare_refresh_secs")]
pub fairshare_refresh_secs: u32,
}

fn default_accounting_host() -> String {
Expand All @@ -210,13 +213,17 @@ fn default_database_url() -> String {
fn default_purge_days() -> u32 {
365
}
fn default_fairshare_refresh_secs() -> u32 {
300
}

impl Default for AccountingConfig {
fn default() -> Self {
Self {
host: "localhost:6819".into(),
database_url: "postgresql://spur:spur@localhost/spur".into(),
purge_after_days: 365,
fairshare_refresh_secs: 300,
}
}
}
Expand Down
73 changes: 16 additions & 57 deletions crates/spur-sched/src/priority.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,3 @@
use chrono::{DateTime, Duration, Utc};

/// Calculate fair-share factor for a user/account.
///
/// fair_share_factor = target_share / max(decayed_actual_usage, epsilon)
///
/// The decay uses a half-life model where older usage matters less.
pub fn fair_share_factor(
target_share: f64,
usage_records: &[(DateTime<Utc>, f64)], // (timestamp, cpu_hours)
halflife_days: u32,
now: DateTime<Utc>,
) -> f64 {
let halflife = Duration::days(halflife_days as i64);
let decay_rate = 2.0_f64.ln() / halflife.num_seconds() as f64;

let decayed_usage: f64 = usage_records
.iter()
.map(|(time, usage)| {
let age = (now - *time).num_seconds().max(0) as f64;
usage * (-decay_rate * age).exp()
})
.sum();

let epsilon = 0.001; // Avoid division by zero
target_share / decayed_usage.max(epsilon)
}

/// Calculate effective job priority.
///
/// effective_priority = base_priority * fair_share_factor * age_factor * partition_tier
Expand All @@ -49,34 +21,21 @@ mod tests {
use super::*;

#[test]
fn test_fair_share_no_usage() {
let factor = fair_share_factor(1.0, &[], 14, Utc::now());
// With no usage, factor should be very high (share / epsilon)
assert!(factor > 100.0);
}

#[test]
fn test_fair_share_heavy_usage() {
let now = Utc::now();
let records: Vec<(DateTime<Utc>, f64)> =
(0..14).map(|d| (now - Duration::days(d), 100.0)).collect();

let factor = fair_share_factor(1.0, &records, 14, now);
// Heavy recent usage → low factor
assert!(factor < 1.0);
}

#[test]
fn test_effective_priority() {
let p = effective_priority(1000, 1.0, 0, 1);
assert_eq!(p, 1000);

// With age bonus
let p = effective_priority(1000, 1.0, 10080, 1);
assert_eq!(p, 2000);

// With partition tier
let p = effective_priority(1000, 1.0, 0, 2);
assert_eq!(p, 2000);
fn effective_priority_cases() {
let cases = [
((1000, 1.0, 0, 1), 1000),
((1000, 1.0, 10080, 1), 2000),
((1000, 1.0, 0, 2), 2000),
((1000, 100.0, 0, 1), 10_000),
((0, 0.001, 0, 1), 1),
];

for ((base, fs, age, tier), expected) in cases {
let got = effective_priority(base, fs, age, tier);
assert_eq!(
got, expected,
"effective_priority({base}, {fs}, {age}, {tier})"
);
}
}
}
40 changes: 0 additions & 40 deletions crates/spur-tests/src/t24_priority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,8 @@

#[cfg(test)]
mod tests {
use chrono::{Duration, Utc};
use spur_sched::priority;

// ── T24.1: Fair-share with no usage ──────────────────────────

#[test]
fn t24_1_fair_share_no_usage() {
let factor = priority::fair_share_factor(1.0, &[], 14, Utc::now());
assert!(factor > 100.0, "no usage → very high factor");
}

// ── T24.2: Fair-share with heavy recent usage ────────────────

#[test]
fn t24_2_fair_share_heavy_usage() {
let now = Utc::now();
let records: Vec<_> = (0..14).map(|d| (now - Duration::days(d), 100.0)).collect();
let factor = priority::fair_share_factor(1.0, &records, 14, now);
assert!(factor < 1.0, "heavy usage → low factor");
}

// ── T24.3: Usage decay ───────────────────────────────────────

#[test]
fn t24_3_old_usage_decays() {
let now = Utc::now();

// Recent usage
let recent = vec![(now - Duration::hours(1), 100.0)];
let factor_recent = priority::fair_share_factor(1.0, &recent, 14, now);

// Old usage (same amount but 30 days ago)
let old = vec![(now - Duration::days(30), 100.0)];
let factor_old = priority::fair_share_factor(1.0, &old, 14, now);

// Old usage should decay → higher factor
assert!(
factor_old > factor_recent,
"old usage should result in higher factor than recent"
);
}

// ── T24.4: Effective priority ────────────────────────────────

#[test]
Expand Down
15 changes: 13 additions & 2 deletions crates/spurctld/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::path::Path;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

use chrono::{DateTime, Utc};
use parking_lot::RwLock;
Expand All @@ -18,6 +19,7 @@ use spur_core::step::{JobStep, StepState, STEP_BATCH};
use spur_core::wal::WalOperation;

use crate::accounting::AccountingNotifier;
use crate::fairshare_cache::FairshareCache;
use crate::raft::{SpurRaft, StateMachineApply};

/// Central cluster state manager.
Expand All @@ -36,12 +38,14 @@ pub struct ClusterManager {
hostname_aliases: RwLock<HashMap<String, String>>,
raft: RwLock<Option<SpurRaft>>,
accounting: RwLock<Option<AccountingNotifier>>,
fairshare_cache: Arc<FairshareCache>,
}

impl ClusterManager {
pub fn new(config: SlurmConfig, _state_dir: &Path) -> anyhow::Result<Self> {
let partitions = config.build_partitions();
let license_pool = config.licenses.clone();
let fairshare_cache = Arc::new(FairshareCache::new());

let cm = Self {
config,
Expand All @@ -55,6 +59,7 @@ impl ClusterManager {
hostname_aliases: RwLock::new(HashMap::new()),
raft: RwLock::new(None),
accounting: RwLock::new(None),
fairshare_cache,
};

info!("cluster manager initialized (state will be recovered via Raft)");
Expand Down Expand Up @@ -953,10 +958,12 @@ impl ClusterManager {
.and_then(|pname| partitions.iter().find(|p| p.name == *pname))
.map(|p| p.priority_tier)
.unwrap_or(1);
// fair_share = 1.0 (neutral) until spurdbd integration
let fair_share = self
.fairshare_cache
.get(&job.spec.user, job.spec.account.as_deref().unwrap_or(""));
job.priority = spur_sched::priority::effective_priority(
job.priority,
1.0,
fair_share,
age_minutes,
partition_tier,
);
Expand Down Expand Up @@ -1248,6 +1255,10 @@ impl ClusterManager {
*self.accounting.write() = Some(notifier);
}

pub fn fairshare_cache(&self) -> &Arc<FairshareCache> {
&self.fairshare_cache
}

/// Persist a mutation via Raft consensus. The apply callback
/// (`StateMachineApply`) handles in-memory state on all nodes.
fn propose(&self, op: WalOperation) -> anyhow::Result<()> {
Expand Down
Loading
Loading