diff --git a/Cargo.lock b/Cargo.lock index e853f7e..e95de37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3448,7 +3448,6 @@ dependencies = [ "serde_json", "spur-core", "spur-proto", - "spur-sched", "sqlx", "tokio", "tonic", diff --git a/crates/spur-cli/src/image.rs b/crates/spur-cli/src/image.rs index 3cfc584..a885ce0 100644 --- a/crates/spur-cli/src/image.rs +++ b/crates/spur-cli/src/image.rs @@ -200,7 +200,6 @@ async fn cmd_import_podman(image: &str) -> Result<()> { /// The tar contains a manifest.json listing layer tarballs. fn extract_docker_save_tar(tar_data: &[u8], rootfs: &str) -> Result<()> { use flate2::read::GzDecoder; - use std::io::Read; let dest = std::path::Path::new(rootfs); let mut archive = tar::Archive::new(tar_data); diff --git a/crates/spur-cli/src/main.rs b/crates/spur-cli/src/main.rs index 1f392f0..cb026fb 100644 --- a/crates/spur-cli/src/main.rs +++ b/crates/spur-cli/src/main.rs @@ -94,6 +94,9 @@ fn main() -> anyhow::Result<()> { "sattach" => return runtime.block_on(sattach::main()), "scrontab" => return runtime.block_on(scrontab::main()), "smd" => return runtime.block_on(smd::main()), + "net" => return runtime.block_on(net::main()), + "image" => return runtime.block_on(image::main()), + "exec" => return runtime.block_on(exec::main()), _ => {} } diff --git a/crates/spur-core/src/dependency.rs b/crates/spur-core/src/dependency.rs index 8eb4886..217ed89 100644 --- a/crates/spur-core/src/dependency.rs +++ b/crates/spur-core/src/dependency.rs @@ -186,7 +186,7 @@ mod tests { #[test] fn test_afterok_satisfied() { let dep_job = make_job(100, JobState::Completed); - let mut job = Job::new( + let job = Job::new( 1, JobSpec { name: "test".into(), diff --git a/crates/spur-core/src/qos.rs b/crates/spur-core/src/qos.rs index 0bdaf13..c17f867 100644 --- a/crates/spur-core/src/qos.rs +++ b/crates/spur-core/src/qos.rs @@ -2,8 +2,8 @@ //! //! Checks per-QOS limits before allowing a job to be scheduled. -use crate::accounting::{Qos, QosLimits, TresRecord, TresType}; -use crate::job::{Job, JobState, PendingReason}; +use crate::accounting::{Qos, TresRecord, TresType}; +use crate::job::{Job, PendingReason}; /// Result of QOS limit check. #[derive(Debug, Clone, PartialEq, Eq)] @@ -86,6 +86,7 @@ pub fn qos_adjusted_priority(base_priority: u32, qos: &Qos) -> u32 { #[cfg(test)] mod tests { use super::*; + use crate::accounting::QosLimits; use crate::job::JobSpec; fn make_qos(max_jobs: Option, max_wall: Option) -> Qos { diff --git a/crates/spur-ffi/src/lib.rs b/crates/spur-ffi/src/lib.rs index 2bb9141..108ce18 100644 --- a/crates/spur-ffi/src/lib.rs +++ b/crates/spur-ffi/src/lib.rs @@ -7,7 +7,6 @@ mod types; use std::ffi::{CStr, CString}; use std::os::raw::{c_char, c_int, c_uint}; -use std::ptr; use std::sync::OnceLock; use types::*; diff --git a/crates/spur-k8s/src/agent.rs b/crates/spur-k8s/src/agent.rs index 61bfd8a..8275575 100644 --- a/crates/spur-k8s/src/agent.rs +++ b/crates/spur-k8s/src/agent.rs @@ -782,10 +782,9 @@ fn is_nvidia_gpu(gpu_type: &str) -> bool { /// Build a gres string from GPU count and type. pub fn gpu_request_to_gres(count: u32, gpu_type: Option<&str>) -> String { - match gpu_type { - Some(t) if !t.is_empty() => format!("gpu:{}:{}", t, count), - _ => format!("gpu:{}", count), - } + let t = gpu_type.unwrap_or("any"); + let t = if t.is_empty() { "any" } else { t }; + format!("gpu:{}:{}", t, count) } #[cfg(test)] @@ -797,8 +796,8 @@ mod tests { #[test] fn test_gpu_request_to_gres() { assert_eq!(gpu_request_to_gres(8, Some("mi300x")), "gpu:mi300x:8"); - assert_eq!(gpu_request_to_gres(4, None), "gpu:4"); - assert_eq!(gpu_request_to_gres(2, Some("")), "gpu:2"); + assert_eq!(gpu_request_to_gres(4, None), "gpu:any:4"); + assert_eq!(gpu_request_to_gres(2, Some("")), "gpu:any:2"); } #[test] diff --git a/crates/spur-k8s/src/crd.rs b/crates/spur-k8s/src/crd.rs index 2c64120..0a94b91 100644 --- a/crates/spur-k8s/src/crd.rs +++ b/crates/spur-k8s/src/crd.rs @@ -181,8 +181,10 @@ pub struct SpurJobStatus { pub fn to_core_job_spec(spec: &SpurJobSpec, user: &str) -> spur_core::job::JobSpec { let mut gres = Vec::new(); if spec.gpus.count > 0 { - let gpu_type = spec.gpus.gpu_type.as_deref().unwrap_or("any"); - gres.push(format!("gpu:{}:{}", gpu_type, spec.gpus.count)); + gres.push(crate::agent::gpu_request_to_gres( + spec.gpus.count, + spec.gpus.gpu_type.as_deref(), + )); } let memory_per_node_mb = spec diff --git a/crates/spur-net/src/oci.rs b/crates/spur-net/src/oci.rs index 2a9d580..9a4f1f3 100644 --- a/crates/spur-net/src/oci.rs +++ b/crates/spur-net/src/oci.rs @@ -258,7 +258,6 @@ async fn pull_and_extract(image_ref: &ImageRef, rootfs_dir: &Path) -> anyhow::Re for (i, layer) in manifest.layers.iter().enumerate() { let digest = layer.digest.clone(); let size = layer.size; - let media_type = layer.media_type.clone(); let cache_path = cache_dir.join(digest.replace(':', "_")); // Check layer cache @@ -442,7 +441,6 @@ fn load_docker_config_auth(registry: &str) -> Option { for key in keys_to_try { if let Some(entry) = auths.get(key) { if let Some(auth_b64) = entry.get("auth").and_then(|a| a.as_str()) { - use std::io::Read; let decoded = base64_decode(auth_b64)?; let (user, pass) = decoded.split_once(':')?; return Some(RegistryCredentials { diff --git a/crates/spur-sched/src/backfill.rs b/crates/spur-sched/src/backfill.rs index 99f45c3..3a866a2 100644 --- a/crates/spur-sched/src/backfill.rs +++ b/crates/spur-sched/src/backfill.rs @@ -47,7 +47,6 @@ impl BackfillScheduler { &self, job: &Job, nodes: &[Node], - partitions: &[spur_core::partition::Partition], reservations: &[Reservation], ) -> Vec { let partition_name = job.spec.partition.as_deref(); @@ -208,12 +207,7 @@ impl Scheduler for BackfillScheduler { } let all_have_nodes = indices.iter().all(|&idx| { let job = &pending[idx]; - let suitable = self.find_suitable_nodes( - job, - cluster.nodes, - cluster.partitions, - cluster.reservations, - ); + let suitable = self.find_suitable_nodes(job, cluster.nodes, cluster.reservations); suitable.len() >= job.spec.num_nodes as usize }); if !all_have_nodes { @@ -231,12 +225,7 @@ impl Scheduler for BackfillScheduler { if skip_indices.contains(&job_idx) { continue; } - let suitable = self.find_suitable_nodes( - job, - cluster.nodes, - cluster.partitions, - cluster.reservations, - ); + let suitable = self.find_suitable_nodes(job, cluster.nodes, cluster.reservations); if suitable.is_empty() { continue; } diff --git a/crates/spur-spank/src/lib.rs b/crates/spur-spank/src/lib.rs index 0060b0b..f2738c2 100644 --- a/crates/spur-spank/src/lib.rs +++ b/crates/spur-spank/src/lib.rs @@ -4,11 +4,11 @@ //! the spank_* callback API (11 hooks, ~12 API functions). use std::collections::HashMap; -use std::ffi::{CStr, CString}; +use std::ffi::CStr; use std::os::raw::{c_char, c_int}; use std::path::{Path, PathBuf}; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info, warn}; /// SPANK callback hook points (matches Slurm's spank.h). #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -158,11 +158,12 @@ impl SpankHost { match func { Ok(f) => { - debug!(plugin = %plugin.name, hook = symbol, "invoking SPANK hook"); + debug!(plugin = %plugin.name, path = %plugin.path.display(), hook = symbol, "invoking SPANK hook"); let rc = unsafe { f(handle_ptr, 0, std::ptr::null_mut()) }; if rc != 0 { warn!( plugin = %plugin.name, + path = %plugin.path.display(), hook = symbol, rc, "SPANK hook returned error" @@ -178,6 +179,7 @@ impl SpankHost { // Plugin doesn't implement this hook — that's fine debug!( plugin = %plugin.name, + path = %plugin.path.display(), hook = symbol, "SPANK hook not found, skipping" ); diff --git a/crates/spurdbd/Cargo.toml b/crates/spurdbd/Cargo.toml index 3129bad..b0cdb14 100644 --- a/crates/spurdbd/Cargo.toml +++ b/crates/spurdbd/Cargo.toml @@ -10,7 +10,6 @@ path = "src/main.rs" [dependencies] spur-proto = { path = "../spur-proto" } spur-core = { path = "../spur-core" } -spur-sched = { path = "../spur-sched" } tokio = { workspace = true } tonic = { workspace = true } prost-types = { workspace = true } diff --git a/crates/spurdbd/src/fairshare.rs b/crates/spurdbd/src/fairshare.rs index ef4afd4..60288b6 100644 --- a/crates/spurdbd/src/fairshare.rs +++ b/crates/spurdbd/src/fairshare.rs @@ -52,21 +52,6 @@ pub fn compute_fairshare( factors } -/// Compute effective priority for a job given fair-share data. -pub fn effective_priority( - base_priority: u32, - fair_share_factor: f64, - age_minutes: i64, - partition_tier: u32, -) -> u32 { - spur_sched::priority::effective_priority( - base_priority, - fair_share_factor, - age_minutes, - partition_tier, - ) -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/spurrestd/src/routes.rs b/crates/spurrestd/src/routes.rs index 809a8d3..9b3e5e3 100644 --- a/crates/spurrestd/src/routes.rs +++ b/crates/spurrestd/src/routes.rs @@ -158,9 +158,17 @@ async fn get_jobs( .await .map_err(|e| error_response(&format!("connection failed: {}", e)))?; + let states: Vec = query + .state + .iter() + .flat_map(|s| s.split(',')) + .filter_map(|s| parse_job_state(s.trim())) + .map(|s| s as i32) + .collect(); + let resp = client .get_jobs(spur_proto::proto::GetJobsRequest { - states: Vec::new(), + states, user: query.user.unwrap_or_default(), partition: query.partition.unwrap_or_default(), account: query.account.unwrap_or_default(), @@ -456,3 +464,20 @@ fn node_state_name(state: i32) -> &'static str { _ => "unknown", } } + +fn parse_job_state(s: &str) -> Option { + use spur_proto::proto::JobState; + match s.to_uppercase().as_str() { + "PD" | "PENDING" => Some(JobState::JobPending), + "R" | "RUNNING" => Some(JobState::JobRunning), + "CG" | "COMPLETING" => Some(JobState::JobCompleting), + "CD" | "COMPLETED" => Some(JobState::JobCompleted), + "F" | "FAILED" => Some(JobState::JobFailed), + "CA" | "CANCELLED" => Some(JobState::JobCancelled), + "TO" | "TIMEOUT" => Some(JobState::JobTimeout), + "NF" | "NODE_FAIL" => Some(JobState::JobNodeFail), + "PR" | "PREEMPTED" => Some(JobState::JobPreempted), + "S" | "SUSPENDED" => Some(JobState::JobSuspended), + _ => None, + } +} diff --git a/deploy/bare-metal/cluster_test.sh b/deploy/bare-metal/cluster_test.sh index ba1a577..2590af6 100755 --- a/deploy/bare-metal/cluster_test.sh +++ b/deploy/bare-metal/cluster_test.sh @@ -74,6 +74,23 @@ wait_job() { echo "(timeout after ${timeout}s)" } +debug_fail() { + local name="$1" job_id="$2" + echo " DEBUG ${name} (job ${job_id}):" + "${SPUR}/scontrol" show job "$job_id" 2>/dev/null \ + | grep -E 'JobState|ExitCode|NodeList|Reason|Partition|Priority' \ + | sed 's/^/ /' || true + echo " CLUSTER STATE:" + echo " Nodes:" + "${SPUR}/sinfo" 2>/dev/null | head -20 | sed 's/^/ /' || true + echo " Queue:" + "${SPUR}/squeue" -t all 2>/dev/null | head -20 | sed 's/^/ /' || true + echo " Controller logs (last 15 lines):" + journalctl -u spurctld --no-pager -n 15 2>/dev/null | sed 's/^/ /' || true + echo " Agent logs (last 10 lines):" + journalctl -u spurd --no-pager -n 10 2>/dev/null | sed 's/^/ /' || true +} + job_state() { local job_id="$1" # squeue data: when whitespace-collapsed, fields are: @@ -106,6 +123,17 @@ echo " Spur MI300X Cluster Integration Tests" echo "============================================" echo "" +echo "--- Cluster Baseline ---" +echo " Date: $(date -u '+%Y-%m-%d %H:%M:%S UTC')" +echo " Controller: ${SPUR_CONTROLLER_ADDR:-http://localhost:6817}" +echo " Nodes:" +"${SPUR}/sinfo" 2>/dev/null | sed 's/^/ /' || echo " (sinfo failed)" +echo " Queue:" +"${SPUR}/squeue" -t all 2>/dev/null | sed 's/^/ /' || echo " (squeue failed)" +echo " Controller PID: $(pgrep -x spurctld 2>/dev/null || echo 'not found')" +echo " Agent PID: $(pgrep -x spurd 2>/dev/null || echo 'not found')" +echo "" + # --- Test 1: Cluster health --- echo "--- Cluster Health ---" run_test "sinfo returns output" ${SPUR}/sinfo @@ -959,11 +987,9 @@ ctest_exit_code() { "${SPUR}/scontrol" show job "$1" 2>/dev/null | grep -oP 'ExitCode=\K[0-9-]+' } -# Dump debug info for a failed job +# Dump debug info for a failed container job (delegates to general helper) ctest_debug() { - local name="$1" job_id="$2" - echo " DEBUG ${name}:" - "${SPUR}/scontrol" show job "$job_id" 2>/dev/null | grep -E 'JobState|ExitCode|NodeList|Reason' || true + debug_fail "$1" "$2" } if [ "$CTEST_READY" = "1" ]; then