Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions crates/spurd/src/agent_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tracing::{error, info, warn};

use tokio_stream::wrappers::ReceiverStream;

use spur_proto::proto::slurm_agent_server::{SlurmAgent, SlurmAgentServer};
use spur_proto::proto::slurm_agent_server::SlurmAgent;
use spur_proto::proto::*;

use spur_sched::cons_tres::{AllocationResult, NodeAllocation};
Expand Down Expand Up @@ -116,6 +116,7 @@ impl AgentService {
i32,
crate::container::RootfsMode,
Option<AllocationResult>,
Option<std::path::PathBuf>,
)> = Vec::new();

for (job_id, tracked) in jobs.iter_mut() {
Expand All @@ -127,6 +128,7 @@ impl AgentService {
exit_code,
tracked.rootfs_mode.clone(),
tracked.allocation.take(),
tracked.job.take_cgroup(),
));
}
Ok(None) => {}
Expand All @@ -136,9 +138,12 @@ impl AgentService {
}
}

for (job_id, _exit_code, mode, alloc) in &completed {
for (job_id, _exit_code, mode, alloc, cgroup) in &completed {
jobs.remove(job_id);
crate::container::cleanup_rootfs(*job_id, mode);
if let Some(cgroup) = cgroup {
crate::executor::cleanup_cgroup(cgroup);
Comment thread
shiv-tyagi marked this conversation as resolved.
}
// Release GPU/CPU allocation
if let Some(alloc) = alloc {
allocation.lock().await.release(alloc);
Expand All @@ -156,7 +161,7 @@ impl AgentService {

// Invoke SPANK TaskExit and JobEpilog hooks for completed jobs
if let Some(ref spank_host) = *spank {
for (job_id, _exit_code, _mode, _alloc) in &completed {
for (job_id, _exit_code, _mode, _alloc, _cgroup) in &completed {
if let Err(e) = spank_host.invoke_hook(SpankHook::TaskExit) {
warn!(job_id, error = %e, "SPANK TaskExit hook failed");
}
Expand All @@ -166,7 +171,7 @@ impl AgentService {
}
}

for (job_id, exit_code, _mode, _alloc) in &completed {
for (job_id, exit_code, _mode, _alloc, _cgroup) in &completed {
report_completion(&controller_addr, *job_id, *exit_code).await;
}
}
Expand Down Expand Up @@ -781,7 +786,6 @@ impl SlurmAgent for AgentService {
// Drop privilege if requested (and we're root). Mirrors the privilege
// drop in launch_job's non-namespace path.
if req.uid > 0 && nix::unistd::geteuid().is_root() {
use std::os::unix::process::CommandExt;
let target_uid = req.uid;
let target_gid = req.gid;
unsafe {
Expand Down Expand Up @@ -1120,11 +1124,6 @@ impl AgentService {
}
}

pub fn create_server(reporter: Arc<NodeReporter>) -> SlurmAgentServer<AgentService> {
let service = AgentService::new(reporter);
SlurmAgentServer::new(service)
}

#[cfg(test)]
impl TrackedJob {
fn dummy(_pid: u32) -> Self {
Expand All @@ -1134,7 +1133,6 @@ impl TrackedJob {
.expect("failed to spawn dummy process");
Self {
job: executor::RunningJob::Managed {
job_id: 0,
child,
cgroup_path: None,
},
Expand Down
23 changes: 0 additions & 23 deletions crates/spurd/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1228,17 +1228,6 @@ fn sanitize_name(name: &str) -> String {
.replace(':', "+")
}

/// Check if a binary is on PATH.
fn which(name: &str) -> bool {
std::process::Command::new("which")
.arg(name)
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.map(|s| s.success())
.unwrap_or(false)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -1759,18 +1748,6 @@ mod tests {
cleanup_rootfs(999998, &RootfsMode::Overlay);
}

// --- Which ---

#[test]
fn test_which_finds_bash() {
assert!(which("bash"));
}

#[test]
fn test_which_not_found() {
assert!(!which("nonexistent-binary-that-doesnt-exist-xyz"));
}

// --- run_hooks ---

#[test]
Expand Down
89 changes: 5 additions & 84 deletions crates/spurd/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,67 +4,21 @@ use std::os::fd::{FromRawFd, OwnedFd, RawFd};
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::Arc;

use anyhow::{bail, Context};
use nix::sys::signal::{self, Signal};
use nix::unistd::Pid;
use tokio::process::Command;
use tracing::{debug, error, info, warn};
use tracing::{debug, info, warn};

use spur_core::job::JobId;
use spur_proto::proto::slurm_controller_client::SlurmControllerClient;
use spur_spank::SpankHost;

use crate::container::ContainerConfig;
use crate::reporter::NodeReporter;

/// Cgroup root for slurmd-managed jobs.
const CGROUP_ROOT: &str = "/sys/fs/cgroup/spur";

/// Job execution loop: polls controller for assigned jobs and runs them.
pub async fn job_execution_loop(reporter: Arc<NodeReporter>) {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
let mut running: HashMap<JobId, RunningJob> = HashMap::new();

loop {
interval.tick().await;

// Check status of running jobs
let mut completed = Vec::new();
for (job_id, rj) in running.iter_mut() {
if let Ok(Some(status)) = rj.try_wait() {
info!(job_id, exit_code = status, "job completed");
completed.push((*job_id, status));
}
}

// Report completions to controller
for (job_id, exit_code) in &completed {
running.remove(job_id);
if let Err(e) = report_completion(&reporter.controller_addr, *job_id, *exit_code).await
{
warn!(job_id, error = %e, "failed to report job completion");
}
}

// Poll for new jobs (via GetJobs filtered to RUNNING state for our node)
// In a full implementation, the controller pushes jobs via streaming.
// For now, we rely on the scheduler_loop in slurmctld to track assignments.
}
}

/// Report job completion back to the controller.
async fn report_completion(
controller_addr: &str,
job_id: JobId,
exit_code: i32,
) -> anyhow::Result<()> {
// Stub: completion is reported via ReportJobStatus RPC from the agent server.
debug!(job_id, exit_code, "would report completion to controller");
Ok(())
}

pub struct ContainerLaunchConfig {
pub config: ContainerConfig,
pub rootfs: PathBuf,
Expand All @@ -74,13 +28,11 @@ pub struct ContainerLaunchConfig {
pub enum RunningJob {
/// Non-container jobs managed by tokio::process::Child.
Managed {
job_id: JobId,
child: tokio::process::Child,
cgroup_path: Option<PathBuf>,
},
/// Container jobs: raw fork with optional pidfd for PID-recycling safety.
Forked {
job_id: JobId,
pid: i32,
/// Holds a kernel reference preventing PID recycling. None on kernels < 5.3.
_pidfd: Option<OwnedFd>,
Expand All @@ -105,13 +57,6 @@ impl RunningJob {
}
}

pub fn job_id(&self) -> JobId {
match self {
RunningJob::Managed { job_id, .. } => *job_id,
RunningJob::Forked { job_id, .. } => *job_id,
}
}

/// Non-blocking check for process exit. Returns exit code if done.
pub fn try_wait(&mut self) -> anyhow::Result<Option<i32>> {
match self {
Expand Down Expand Up @@ -423,7 +368,6 @@ pub async fn launch_job(
// unshare(2) to fail with EPERM since the unprivileged user lacks
// CAP_SYS_ADMIN.
if uid > 0 && nix::unistd::geteuid().is_root() && !use_namespaces {
use std::os::unix::process::CommandExt;
let target_uid = uid;
let target_gid = gid;
unsafe {
Expand Down Expand Up @@ -454,7 +398,6 @@ pub async fn launch_job(
.map(|v| v == "1" || v == "true")
.unwrap_or(false);
if enable_seccomp {
use std::os::unix::process::CommandExt;
unsafe {
cmd.pre_exec(|| {
if let Err(e) = crate::seccomp::apply_seccomp_filter() {
Expand All @@ -471,7 +414,6 @@ pub async fn launch_job(
.map(|v| v == "1" || v == "true")
.unwrap_or(false);
if enable_landlock {
use std::os::unix::process::CommandExt;
unsafe {
cmd.pre_exec(move || {
if let Err(e) = crate::landlock::apply_landlock_rules(&work_dir_for_landlock) {
Expand All @@ -498,11 +440,7 @@ pub async fn launch_job(
"job process spawned"
);

Ok(RunningJob::Managed {
job_id,
child,
cgroup_path,
})
Ok(RunningJob::Managed { child, cgroup_path })
}

/// Set up a cgroups v2 hierarchy for a job.
Expand Down Expand Up @@ -596,7 +534,7 @@ fn move_to_cgroup(cgroup_path: &Path, pid: u32) -> bool {
}

/// Clean up a job's cgroup.
fn cleanup_cgroup(cgroup_path: &Path) {
pub fn cleanup_cgroup(cgroup_path: &Path) {
// Kill any remaining processes
if let Ok(pids) = std::fs::read_to_string(cgroup_path.join("cgroup.procs")) {
for pid_str in pids.lines() {
Expand All @@ -608,15 +546,10 @@ fn cleanup_cgroup(cgroup_path: &Path) {

// Remove cgroup directory
if let Err(e) = std::fs::remove_dir(cgroup_path) {
debug!(error = %e, path = %cgroup_path.display(), "failed to remove cgroup");
warn!(error = %e, path = %cgroup_path.display(), "failed to remove cgroup");
}
}

/// Send a signal to a running job.
pub fn signal_job(job: &RunningJob, sig: Signal) -> anyhow::Result<()> {
job.kill_signal(sig)
}

/// Recursively signal a process and all its descendants (children first).
fn kill_process_tree(pid: i32, sig: Signal) {
let children = get_child_pids(pid);
Expand Down Expand Up @@ -666,17 +599,6 @@ async fn run_hook(
Ok(())
}

/// Run epilog after job completion (best-effort).
pub async fn run_epilog(job_id: JobId, work_dir: &str) {
if let Ok(epilog) = std::env::var("SPUR_EPILOG") {
if !epilog.is_empty() {
if let Err(e) = run_hook(&epilog, job_id, work_dir, "epilog").await {
warn!(job_id, error = %e, "epilog failed");
}
}
}
}

/// Resolve output path patterns (%j → job_id, etc.)
fn resolve_output_path(pattern: &str, job_id: JobId, work_dir: &str) -> String {
let resolved = if pattern.is_empty() {
Expand Down Expand Up @@ -716,7 +638,7 @@ async fn launch_container_job(
cpus: u32,
memory_mb: u64,
cpu_ids: &[u32],
work_dir: &str,
_work_dir: &str,
) -> anyhow::Result<RunningJob> {
let cgroup_path = setup_cgroup(job_id, cpus, memory_mb, cpu_ids)?;

Expand Down Expand Up @@ -883,7 +805,6 @@ async fn launch_container_job(
);

Ok(RunningJob::Forked {
job_id,
pid: child_pid,
_pidfd: pidfd,
cgroup_path,
Expand Down
4 changes: 2 additions & 2 deletions crates/spurd/src/gpu.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use spur_core::resource::{GpuLinkType, GpuResource};
use std::path::Path;
use tracing::{debug, warn};
use tracing::debug;

/// Discover GPUs from sysfs (works for both AMD and NVIDIA).
pub fn discover_gpus() -> Vec<GpuResource> {
Expand Down Expand Up @@ -158,7 +158,7 @@ fn read_amd_vram_mb(device_path: &Path) -> u64 {
0
}

fn detect_amd_topology(device_path: &Path, device_id: u32) -> (Vec<u32>, GpuLinkType) {
fn detect_amd_topology(device_path: &Path, _device_id: u32) -> (Vec<u32>, GpuLinkType) {
// Check for xGMI links via /sys/class/drm/cardN/device/amdgpu_xgmi_*
let xgmi_hive = device_path.join("xgmi_hive_info");
if xgmi_hive.exists() {
Expand Down
2 changes: 1 addition & 1 deletion crates/spurd/src/landlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//! restriction for those.

use std::path::Path;
use tracing::{debug, info, warn};
use tracing::{debug, info};

// Landlock ABI constants (from linux/landlock.h)
const LANDLOCK_CREATE_RULESET: i64 = 444;
Expand Down
3 changes: 1 addition & 2 deletions crates/spurd/src/reporter.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};

use anyhow::Context;
use spur_core::resource::{GpuLinkType, GpuResource, ResourceSet};
use spur_core::resource::{GpuLinkType, ResourceSet};
use spur_proto::proto::slurm_controller_client::SlurmControllerClient;
use spur_proto::proto::{RegisterAgentRequest, ResourceSet as ProtoResourceSet};
use tracing::{debug, info, warn};
Expand Down
Loading