diff --git a/crates/spurd/src/agent_server.rs b/crates/spurd/src/agent_server.rs index 7e64ed8..413a352 100644 --- a/crates/spurd/src/agent_server.rs +++ b/crates/spurd/src/agent_server.rs @@ -10,7 +10,7 @@ use tracing::{error, info, warn}; use tokio_stream::wrappers::ReceiverStream; -use spur_proto::proto::slurm_agent_server::{SlurmAgent, SlurmAgentServer}; +use spur_proto::proto::slurm_agent_server::SlurmAgent; use spur_proto::proto::*; use spur_sched::cons_tres::{AllocationResult, NodeAllocation}; @@ -116,6 +116,7 @@ impl AgentService { i32, crate::container::RootfsMode, Option, + Option, )> = Vec::new(); for (job_id, tracked) in jobs.iter_mut() { @@ -127,6 +128,7 @@ impl AgentService { exit_code, tracked.rootfs_mode.clone(), tracked.allocation.take(), + tracked.job.take_cgroup(), )); } Ok(None) => {} @@ -136,9 +138,12 @@ impl AgentService { } } - for (job_id, _exit_code, mode, alloc) in &completed { + for (job_id, _exit_code, mode, alloc, cgroup) in &completed { jobs.remove(job_id); crate::container::cleanup_rootfs(*job_id, mode); + if let Some(cgroup) = cgroup { + crate::executor::cleanup_cgroup(cgroup); + } // Release GPU/CPU allocation if let Some(alloc) = alloc { allocation.lock().await.release(alloc); @@ -156,7 +161,7 @@ impl AgentService { // Invoke SPANK TaskExit and JobEpilog hooks for completed jobs if let Some(ref spank_host) = *spank { - for (job_id, _exit_code, _mode, _alloc) in &completed { + for (job_id, _exit_code, _mode, _alloc, _cgroup) in &completed { if let Err(e) = spank_host.invoke_hook(SpankHook::TaskExit) { warn!(job_id, error = %e, "SPANK TaskExit hook failed"); } @@ -166,7 +171,7 @@ impl AgentService { } } - for (job_id, exit_code, _mode, _alloc) in &completed { + for (job_id, exit_code, _mode, _alloc, _cgroup) in &completed { report_completion(&controller_addr, *job_id, *exit_code).await; } } @@ -781,7 +786,6 @@ impl SlurmAgent for AgentService { // Drop privilege if requested (and we're root). Mirrors the privilege // drop in launch_job's non-namespace path. if req.uid > 0 && nix::unistd::geteuid().is_root() { - use std::os::unix::process::CommandExt; let target_uid = req.uid; let target_gid = req.gid; unsafe { @@ -1120,11 +1124,6 @@ impl AgentService { } } -pub fn create_server(reporter: Arc) -> SlurmAgentServer { - let service = AgentService::new(reporter); - SlurmAgentServer::new(service) -} - #[cfg(test)] impl TrackedJob { fn dummy(_pid: u32) -> Self { @@ -1134,7 +1133,6 @@ impl TrackedJob { .expect("failed to spawn dummy process"); Self { job: executor::RunningJob::Managed { - job_id: 0, child, cgroup_path: None, }, diff --git a/crates/spurd/src/container.rs b/crates/spurd/src/container.rs index f5d59ec..51c6b1a 100644 --- a/crates/spurd/src/container.rs +++ b/crates/spurd/src/container.rs @@ -1228,17 +1228,6 @@ fn sanitize_name(name: &str) -> String { .replace(':', "+") } -/// Check if a binary is on PATH. -fn which(name: &str) -> bool { - std::process::Command::new("which") - .arg(name) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .status() - .map(|s| s.success()) - .unwrap_or(false) -} - #[cfg(test)] mod tests { use super::*; @@ -1759,18 +1748,6 @@ mod tests { cleanup_rootfs(999998, &RootfsMode::Overlay); } - // --- Which --- - - #[test] - fn test_which_finds_bash() { - assert!(which("bash")); - } - - #[test] - fn test_which_not_found() { - assert!(!which("nonexistent-binary-that-doesnt-exist-xyz")); - } - // --- run_hooks --- #[test] diff --git a/crates/spurd/src/executor.rs b/crates/spurd/src/executor.rs index b8a8c81..abfecfb 100644 --- a/crates/spurd/src/executor.rs +++ b/crates/spurd/src/executor.rs @@ -4,67 +4,21 @@ use std::os::fd::{FromRawFd, OwnedFd, RawFd}; use std::os::unix::io::AsRawFd; use std::path::{Path, PathBuf}; use std::process::Stdio; -use std::sync::Arc; use anyhow::{bail, Context}; use nix::sys::signal::{self, Signal}; use nix::unistd::Pid; use tokio::process::Command; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info, warn}; use spur_core::job::JobId; -use spur_proto::proto::slurm_controller_client::SlurmControllerClient; use spur_spank::SpankHost; use crate::container::ContainerConfig; -use crate::reporter::NodeReporter; /// Cgroup root for slurmd-managed jobs. const CGROUP_ROOT: &str = "/sys/fs/cgroup/spur"; -/// Job execution loop: polls controller for assigned jobs and runs them. -pub async fn job_execution_loop(reporter: Arc) { - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5)); - let mut running: HashMap = HashMap::new(); - - loop { - interval.tick().await; - - // Check status of running jobs - let mut completed = Vec::new(); - for (job_id, rj) in running.iter_mut() { - if let Ok(Some(status)) = rj.try_wait() { - info!(job_id, exit_code = status, "job completed"); - completed.push((*job_id, status)); - } - } - - // Report completions to controller - for (job_id, exit_code) in &completed { - running.remove(job_id); - if let Err(e) = report_completion(&reporter.controller_addr, *job_id, *exit_code).await - { - warn!(job_id, error = %e, "failed to report job completion"); - } - } - - // Poll for new jobs (via GetJobs filtered to RUNNING state for our node) - // In a full implementation, the controller pushes jobs via streaming. - // For now, we rely on the scheduler_loop in slurmctld to track assignments. - } -} - -/// Report job completion back to the controller. -async fn report_completion( - controller_addr: &str, - job_id: JobId, - exit_code: i32, -) -> anyhow::Result<()> { - // Stub: completion is reported via ReportJobStatus RPC from the agent server. - debug!(job_id, exit_code, "would report completion to controller"); - Ok(()) -} - pub struct ContainerLaunchConfig { pub config: ContainerConfig, pub rootfs: PathBuf, @@ -74,13 +28,11 @@ pub struct ContainerLaunchConfig { pub enum RunningJob { /// Non-container jobs managed by tokio::process::Child. Managed { - job_id: JobId, child: tokio::process::Child, cgroup_path: Option, }, /// Container jobs: raw fork with optional pidfd for PID-recycling safety. Forked { - job_id: JobId, pid: i32, /// Holds a kernel reference preventing PID recycling. None on kernels < 5.3. _pidfd: Option, @@ -105,13 +57,6 @@ impl RunningJob { } } - pub fn job_id(&self) -> JobId { - match self { - RunningJob::Managed { job_id, .. } => *job_id, - RunningJob::Forked { job_id, .. } => *job_id, - } - } - /// Non-blocking check for process exit. Returns exit code if done. pub fn try_wait(&mut self) -> anyhow::Result> { match self { @@ -423,7 +368,6 @@ pub async fn launch_job( // unshare(2) to fail with EPERM since the unprivileged user lacks // CAP_SYS_ADMIN. if uid > 0 && nix::unistd::geteuid().is_root() && !use_namespaces { - use std::os::unix::process::CommandExt; let target_uid = uid; let target_gid = gid; unsafe { @@ -454,7 +398,6 @@ pub async fn launch_job( .map(|v| v == "1" || v == "true") .unwrap_or(false); if enable_seccomp { - use std::os::unix::process::CommandExt; unsafe { cmd.pre_exec(|| { if let Err(e) = crate::seccomp::apply_seccomp_filter() { @@ -471,7 +414,6 @@ pub async fn launch_job( .map(|v| v == "1" || v == "true") .unwrap_or(false); if enable_landlock { - use std::os::unix::process::CommandExt; unsafe { cmd.pre_exec(move || { if let Err(e) = crate::landlock::apply_landlock_rules(&work_dir_for_landlock) { @@ -498,11 +440,7 @@ pub async fn launch_job( "job process spawned" ); - Ok(RunningJob::Managed { - job_id, - child, - cgroup_path, - }) + Ok(RunningJob::Managed { child, cgroup_path }) } /// Set up a cgroups v2 hierarchy for a job. @@ -596,7 +534,7 @@ fn move_to_cgroup(cgroup_path: &Path, pid: u32) -> bool { } /// Clean up a job's cgroup. -fn cleanup_cgroup(cgroup_path: &Path) { +pub fn cleanup_cgroup(cgroup_path: &Path) { // Kill any remaining processes if let Ok(pids) = std::fs::read_to_string(cgroup_path.join("cgroup.procs")) { for pid_str in pids.lines() { @@ -608,15 +546,10 @@ fn cleanup_cgroup(cgroup_path: &Path) { // Remove cgroup directory if let Err(e) = std::fs::remove_dir(cgroup_path) { - debug!(error = %e, path = %cgroup_path.display(), "failed to remove cgroup"); + warn!(error = %e, path = %cgroup_path.display(), "failed to remove cgroup"); } } -/// Send a signal to a running job. -pub fn signal_job(job: &RunningJob, sig: Signal) -> anyhow::Result<()> { - job.kill_signal(sig) -} - /// Recursively signal a process and all its descendants (children first). fn kill_process_tree(pid: i32, sig: Signal) { let children = get_child_pids(pid); @@ -666,17 +599,6 @@ async fn run_hook( Ok(()) } -/// Run epilog after job completion (best-effort). -pub async fn run_epilog(job_id: JobId, work_dir: &str) { - if let Ok(epilog) = std::env::var("SPUR_EPILOG") { - if !epilog.is_empty() { - if let Err(e) = run_hook(&epilog, job_id, work_dir, "epilog").await { - warn!(job_id, error = %e, "epilog failed"); - } - } - } -} - /// Resolve output path patterns (%j → job_id, etc.) fn resolve_output_path(pattern: &str, job_id: JobId, work_dir: &str) -> String { let resolved = if pattern.is_empty() { @@ -716,7 +638,7 @@ async fn launch_container_job( cpus: u32, memory_mb: u64, cpu_ids: &[u32], - work_dir: &str, + _work_dir: &str, ) -> anyhow::Result { let cgroup_path = setup_cgroup(job_id, cpus, memory_mb, cpu_ids)?; @@ -883,7 +805,6 @@ async fn launch_container_job( ); Ok(RunningJob::Forked { - job_id, pid: child_pid, _pidfd: pidfd, cgroup_path, diff --git a/crates/spurd/src/gpu.rs b/crates/spurd/src/gpu.rs index 72bc090..a84b21e 100644 --- a/crates/spurd/src/gpu.rs +++ b/crates/spurd/src/gpu.rs @@ -1,6 +1,6 @@ use spur_core::resource::{GpuLinkType, GpuResource}; use std::path::Path; -use tracing::{debug, warn}; +use tracing::debug; /// Discover GPUs from sysfs (works for both AMD and NVIDIA). pub fn discover_gpus() -> Vec { @@ -158,7 +158,7 @@ fn read_amd_vram_mb(device_path: &Path) -> u64 { 0 } -fn detect_amd_topology(device_path: &Path, device_id: u32) -> (Vec, GpuLinkType) { +fn detect_amd_topology(device_path: &Path, _device_id: u32) -> (Vec, GpuLinkType) { // Check for xGMI links via /sys/class/drm/cardN/device/amdgpu_xgmi_* let xgmi_hive = device_path.join("xgmi_hive_info"); if xgmi_hive.exists() { diff --git a/crates/spurd/src/landlock.rs b/crates/spurd/src/landlock.rs index d28382d..de3ec81 100644 --- a/crates/spurd/src/landlock.rs +++ b/crates/spurd/src/landlock.rs @@ -12,7 +12,7 @@ //! restriction for those. use std::path::Path; -use tracing::{debug, info, warn}; +use tracing::{debug, info}; // Landlock ABI constants (from linux/landlock.h) const LANDLOCK_CREATE_RULESET: i64 = 444; diff --git a/crates/spurd/src/reporter.rs b/crates/spurd/src/reporter.rs index 3bdbc69..2046a13 100644 --- a/crates/spurd/src/reporter.rs +++ b/crates/spurd/src/reporter.rs @@ -1,8 +1,7 @@ -use std::path::Path; use std::sync::atomic::{AtomicU64, Ordering}; use anyhow::Context; -use spur_core::resource::{GpuLinkType, GpuResource, ResourceSet}; +use spur_core::resource::{GpuLinkType, ResourceSet}; use spur_proto::proto::slurm_controller_client::SlurmControllerClient; use spur_proto::proto::{RegisterAgentRequest, ResourceSet as ProtoResourceSet}; use tracing::{debug, info, warn};