Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/spur-cli/src/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ async fn cmd_import_podman(image: &str) -> Result<()> {
/// The tar contains a manifest.json listing layer tarballs.
fn extract_docker_save_tar(tar_data: &[u8], rootfs: &str) -> Result<()> {
use flate2::read::GzDecoder;
use std::io::Read;

let dest = std::path::Path::new(rootfs);
let mut archive = tar::Archive::new(tar_data);
Expand Down
3 changes: 3 additions & 0 deletions crates/spur-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ fn main() -> anyhow::Result<()> {
"sattach" => return runtime.block_on(sattach::main()),
"scrontab" => return runtime.block_on(scrontab::main()),
"smd" => return runtime.block_on(smd::main()),
"net" => return runtime.block_on(net::main()),
"image" => return runtime.block_on(image::main()),
"exec" => return runtime.block_on(exec::main()),
_ => {}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/spur-core/src/dependency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ mod tests {
#[test]
fn test_afterok_satisfied() {
let dep_job = make_job(100, JobState::Completed);
let mut job = Job::new(
let job = Job::new(
1,
JobSpec {
name: "test".into(),
Expand Down
5 changes: 3 additions & 2 deletions crates/spur-core/src/qos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
//!
//! Checks per-QOS limits before allowing a job to be scheduled.

use crate::accounting::{Qos, QosLimits, TresRecord, TresType};
use crate::job::{Job, JobState, PendingReason};
use crate::accounting::{Qos, TresRecord, TresType};
use crate::job::{Job, PendingReason};

/// Result of QOS limit check.
#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -86,6 +86,7 @@ pub fn qos_adjusted_priority(base_priority: u32, qos: &Qos) -> u32 {
#[cfg(test)]
mod tests {
use super::*;
use crate::accounting::QosLimits;
use crate::job::JobSpec;

fn make_qos(max_jobs: Option<u32>, max_wall: Option<u32>) -> Qos {
Expand Down
1 change: 0 additions & 1 deletion crates/spur-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ mod types;

use std::ffi::{CStr, CString};
use std::os::raw::{c_char, c_int, c_uint};
use std::ptr;
use std::sync::OnceLock;

use types::*;
Expand Down
11 changes: 5 additions & 6 deletions crates/spur-k8s/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,10 +782,9 @@ fn is_nvidia_gpu(gpu_type: &str) -> bool {

/// Build a gres string from GPU count and type.
pub fn gpu_request_to_gres(count: u32, gpu_type: Option<&str>) -> String {
match gpu_type {
Some(t) if !t.is_empty() => format!("gpu:{}:{}", t, count),
_ => format!("gpu:{}", count),
}
let t = gpu_type.unwrap_or("any");
let t = if t.is_empty() { "any" } else { t };
format!("gpu:{}:{}", t, count)
}

#[cfg(test)]
Expand All @@ -797,8 +796,8 @@ mod tests {
#[test]
fn test_gpu_request_to_gres() {
assert_eq!(gpu_request_to_gres(8, Some("mi300x")), "gpu:mi300x:8");
assert_eq!(gpu_request_to_gres(4, None), "gpu:4");
assert_eq!(gpu_request_to_gres(2, Some("")), "gpu:2");
assert_eq!(gpu_request_to_gres(4, None), "gpu:any:4");
assert_eq!(gpu_request_to_gres(2, Some("")), "gpu:any:2");
}

#[test]
Expand Down
6 changes: 4 additions & 2 deletions crates/spur-k8s/src/crd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,10 @@ pub struct SpurJobStatus {
pub fn to_core_job_spec(spec: &SpurJobSpec, user: &str) -> spur_core::job::JobSpec {
let mut gres = Vec::new();
if spec.gpus.count > 0 {
let gpu_type = spec.gpus.gpu_type.as_deref().unwrap_or("any");
gres.push(format!("gpu:{}:{}", gpu_type, spec.gpus.count));
gres.push(crate::agent::gpu_request_to_gres(
spec.gpus.count,
spec.gpus.gpu_type.as_deref(),
));
}

let memory_per_node_mb = spec
Expand Down
2 changes: 0 additions & 2 deletions crates/spur-net/src/oci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ async fn pull_and_extract(image_ref: &ImageRef, rootfs_dir: &Path) -> anyhow::Re
for (i, layer) in manifest.layers.iter().enumerate() {
let digest = layer.digest.clone();
let size = layer.size;
let media_type = layer.media_type.clone();
let cache_path = cache_dir.join(digest.replace(':', "_"));

// Check layer cache
Expand Down Expand Up @@ -442,7 +441,6 @@ fn load_docker_config_auth(registry: &str) -> Option<RegistryCredentials> {
for key in keys_to_try {
if let Some(entry) = auths.get(key) {
if let Some(auth_b64) = entry.get("auth").and_then(|a| a.as_str()) {
use std::io::Read;
let decoded = base64_decode(auth_b64)?;
let (user, pass) = decoded.split_once(':')?;
return Some(RegistryCredentials {
Expand Down
15 changes: 2 additions & 13 deletions crates/spur-sched/src/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ impl BackfillScheduler {
&self,
job: &Job,
nodes: &[Node],
partitions: &[spur_core::partition::Partition],
reservations: &[Reservation],
) -> Vec<usize> {
let partition_name = job.spec.partition.as_deref();
Expand Down Expand Up @@ -208,12 +207,7 @@ impl Scheduler for BackfillScheduler {
}
let all_have_nodes = indices.iter().all(|&idx| {
let job = &pending[idx];
let suitable = self.find_suitable_nodes(
job,
cluster.nodes,
cluster.partitions,
cluster.reservations,
);
let suitable = self.find_suitable_nodes(job, cluster.nodes, cluster.reservations);
suitable.len() >= job.spec.num_nodes as usize
});
if !all_have_nodes {
Expand All @@ -231,12 +225,7 @@ impl Scheduler for BackfillScheduler {
if skip_indices.contains(&job_idx) {
continue;
}
let suitable = self.find_suitable_nodes(
job,
cluster.nodes,
cluster.partitions,
cluster.reservations,
);
let suitable = self.find_suitable_nodes(job, cluster.nodes, cluster.reservations);
if suitable.is_empty() {
continue;
}
Expand Down
8 changes: 5 additions & 3 deletions crates/spur-spank/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
//! the spank_* callback API (11 hooks, ~12 API functions).

use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::ffi::CStr;
use std::os::raw::{c_char, c_int};
use std::path::{Path, PathBuf};

use tracing::{debug, error, info, warn};
use tracing::{debug, info, warn};

/// SPANK callback hook points (matches Slurm's spank.h).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -158,11 +158,12 @@ impl SpankHost {

match func {
Ok(f) => {
debug!(plugin = %plugin.name, hook = symbol, "invoking SPANK hook");
debug!(plugin = %plugin.name, path = %plugin.path.display(), hook = symbol, "invoking SPANK hook");
let rc = unsafe { f(handle_ptr, 0, std::ptr::null_mut()) };
if rc != 0 {
warn!(
plugin = %plugin.name,
path = %plugin.path.display(),
hook = symbol,
rc,
"SPANK hook returned error"
Expand All @@ -178,6 +179,7 @@ impl SpankHost {
// Plugin doesn't implement this hook — that's fine
debug!(
plugin = %plugin.name,
path = %plugin.path.display(),
hook = symbol,
"SPANK hook not found, skipping"
);
Expand Down
1 change: 0 additions & 1 deletion crates/spurdbd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ path = "src/main.rs"
[dependencies]
spur-proto = { path = "../spur-proto" }
spur-core = { path = "../spur-core" }
spur-sched = { path = "../spur-sched" }
tokio = { workspace = true }
tonic = { workspace = true }
prost-types = { workspace = true }
Expand Down
15 changes: 0 additions & 15 deletions crates/spurdbd/src/fairshare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,6 @@ pub fn compute_fairshare(
factors
}

/// Compute effective priority for a job given fair-share data.
pub fn effective_priority(
base_priority: u32,
fair_share_factor: f64,
age_minutes: i64,
partition_tier: u32,
) -> u32 {
spur_sched::priority::effective_priority(
base_priority,
fair_share_factor,
age_minutes,
partition_tier,
)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
27 changes: 26 additions & 1 deletion crates/spurrestd/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,17 @@ async fn get_jobs(
.await
.map_err(|e| error_response(&format!("connection failed: {}", e)))?;

let states: Vec<i32> = query
.state
.iter()
.flat_map(|s| s.split(','))
.filter_map(|s| parse_job_state(s.trim()))
.map(|s| s as i32)
.collect();

let resp = client
.get_jobs(spur_proto::proto::GetJobsRequest {
states: Vec::new(),
states,
user: query.user.unwrap_or_default(),
partition: query.partition.unwrap_or_default(),
account: query.account.unwrap_or_default(),
Expand Down Expand Up @@ -456,3 +464,20 @@ fn node_state_name(state: i32) -> &'static str {
_ => "unknown",
}
}

fn parse_job_state(s: &str) -> Option<spur_proto::proto::JobState> {
use spur_proto::proto::JobState;
match s.to_uppercase().as_str() {
"PD" | "PENDING" => Some(JobState::JobPending),
"R" | "RUNNING" => Some(JobState::JobRunning),
"CG" | "COMPLETING" => Some(JobState::JobCompleting),
"CD" | "COMPLETED" => Some(JobState::JobCompleted),
"F" | "FAILED" => Some(JobState::JobFailed),
"CA" | "CANCELLED" => Some(JobState::JobCancelled),
"TO" | "TIMEOUT" => Some(JobState::JobTimeout),
"NF" | "NODE_FAIL" => Some(JobState::JobNodeFail),
"PR" | "PREEMPTED" => Some(JobState::JobPreempted),
"S" | "SUSPENDED" => Some(JobState::JobSuspended),
_ => None,
}
}
34 changes: 30 additions & 4 deletions deploy/bare-metal/cluster_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,23 @@ wait_job() {
echo "(timeout after ${timeout}s)"
}

debug_fail() {
local name="$1" job_id="$2"
echo " DEBUG ${name} (job ${job_id}):"
"${SPUR}/scontrol" show job "$job_id" 2>/dev/null \
| grep -E 'JobState|ExitCode|NodeList|Reason|Partition|Priority' \
| sed 's/^/ /' || true
echo " CLUSTER STATE:"
echo " Nodes:"
"${SPUR}/sinfo" 2>/dev/null | head -20 | sed 's/^/ /' || true
echo " Queue:"
"${SPUR}/squeue" -t all 2>/dev/null | head -20 | sed 's/^/ /' || true
echo " Controller logs (last 15 lines):"
journalctl -u spurctld --no-pager -n 15 2>/dev/null | sed 's/^/ /' || true
echo " Agent logs (last 10 lines):"
journalctl -u spurd --no-pager -n 10 2>/dev/null | sed 's/^/ /' || true
}

job_state() {
local job_id="$1"
# squeue data: when whitespace-collapsed, fields are:
Expand Down Expand Up @@ -106,6 +123,17 @@ echo " Spur MI300X Cluster Integration Tests"
echo "============================================"
echo ""

echo "--- Cluster Baseline ---"
echo " Date: $(date -u '+%Y-%m-%d %H:%M:%S UTC')"
echo " Controller: ${SPUR_CONTROLLER_ADDR:-http://localhost:6817}"
echo " Nodes:"
"${SPUR}/sinfo" 2>/dev/null | sed 's/^/ /' || echo " (sinfo failed)"
echo " Queue:"
"${SPUR}/squeue" -t all 2>/dev/null | sed 's/^/ /' || echo " (squeue failed)"
echo " Controller PID: $(pgrep -x spurctld 2>/dev/null || echo 'not found')"
echo " Agent PID: $(pgrep -x spurd 2>/dev/null || echo 'not found')"
echo ""

# --- Test 1: Cluster health ---
echo "--- Cluster Health ---"
run_test "sinfo returns output" ${SPUR}/sinfo
Expand Down Expand Up @@ -959,11 +987,9 @@ ctest_exit_code() {
"${SPUR}/scontrol" show job "$1" 2>/dev/null | grep -oP 'ExitCode=\K[0-9-]+'
}

# Dump debug info for a failed job
# Dump debug info for a failed container job (delegates to general helper)
ctest_debug() {
local name="$1" job_id="$2"
echo " DEBUG ${name}:"
"${SPUR}/scontrol" show job "$job_id" 2>/dev/null | grep -E 'JobState|ExitCode|NodeList|Reason' || true
debug_fail "$1" "$2"
}

if [ "$CTEST_READY" = "1" ]; then
Expand Down
Loading