diff --git a/crates/spurctld/src/cluster.rs b/crates/spurctld/src/cluster.rs index af495c3..02769d8 100644 --- a/crates/spurctld/src/cluster.rs +++ b/crates/spurctld/src/cluster.rs @@ -201,7 +201,7 @@ impl ClusterManager { /// Cancel a job. pub fn cancel_job(&self, job_id: JobId, _user: &str) -> anyhow::Result<()> { - let old_state = { + { let jobs = self.jobs.read(); let job = jobs .get(&job_id) @@ -209,8 +209,7 @@ impl ClusterManager { if job.state.is_terminal() { anyhow::bail!("job {} is already {:?}", job_id, job.state); } - job.state - }; + } // Use JobComplete (not JobStateChange) so that resource deallocation // fires for any allocated nodes. For pending jobs, allocated_nodes is empty @@ -782,20 +781,6 @@ impl ClusterManager { debug!(job_id, step_id, "step created"); } - /// Complete a job step. - pub fn complete_step(&self, job_id: JobId, step_id: u32, exit_code: i32) { - if let Some(step) = self.steps.write().get_mut(&(job_id, step_id)) { - step.state = if exit_code == 0 { - StepState::Completed - } else { - StepState::Failed - }; - step.exit_code = Some(exit_code); - step.end_time = Some(Utc::now()); - debug!(job_id, step_id, exit_code, "step completed"); - } - } - /// Get all steps for a job. pub fn get_steps(&self, job_id: JobId) -> Vec { self.steps @@ -981,16 +966,6 @@ impl ClusterManager { pending } - /// Find jobs by name and user (for singleton dependency). - pub fn get_jobs_by_name_user(&self, name: &str, user: &str) -> Vec { - self.jobs - .read() - .values() - .filter(|j| j.spec.name == name && j.spec.user == user) - .cloned() - .collect() - } - /// Create a new reservation. pub fn create_reservation(&self, res: Reservation) -> anyhow::Result<()> { let mut reservations = self.reservations.write(); @@ -1079,8 +1054,6 @@ impl ClusterManager { cluster_state: &spur_sched::traits::ClusterState, ) { use spur_core::job::PendingReason; - use spur_sched::backfill::BackfillScheduler; - use spur_sched::traits::Scheduler; let mut jobs = self.jobs.write(); diff --git a/crates/spurctld/src/scheduler_loop.rs b/crates/spurctld/src/scheduler_loop.rs index ce77eef..8c8f11d 100644 --- a/crates/spurctld/src/scheduler_loop.rs +++ b/crates/spurctld/src/scheduler_loop.rs @@ -890,7 +890,7 @@ pub async fn send_cancel_to_agents( #[cfg(test)] mod tests { use super::*; - use spur_core::job::{Job, JobSpec, JobState}; + use spur_core::job::{Job, JobSpec}; use spur_core::resource::{GpuLinkType, GpuResource, ResourceSet}; use std::collections::HashMap; diff --git a/crates/spurctld/src/server.rs b/crates/spurctld/src/server.rs index 16a216a..3537cab 100644 --- a/crates/spurctld/src/server.rs +++ b/crates/spurctld/src/server.rs @@ -6,6 +6,7 @@ use chrono::{DateTime, Utc}; use tokio::sync::Mutex; use tonic::metadata::MetadataValue; use tonic::{Request, Response, Status}; +use tracing::warn; use spur_core::reservation::Reservation; use spur_proto::proto::slurm_controller_client::SlurmControllerClient; @@ -116,14 +117,18 @@ impl SlurmController for ControllerService { request: Request, ) -> Result, Status> { if let Err(status) = self.check_leader(&request) { - { - let proxy = &self.leader_proxy; - let mut client = proxy.get_leader_client().await?; - let mut fwd = Request::new(request.into_inner()); - *fwd.metadata_mut() = Self::forwarded_metadata(); - return client.submit_job(fwd).await; + let proxy = &self.leader_proxy; + match proxy.get_leader_client().await { + Ok(mut client) => { + let mut fwd = Request::new(request.into_inner()); + *fwd.metadata_mut() = Self::forwarded_metadata(); + return client.submit_job(fwd).await; + } + Err(e) => { + warn!("failed to forward submit_job to leader: {e}"); + return Err(status); + } } - return Err(status); } let spec = request @@ -209,14 +214,18 @@ impl SlurmController for ControllerService { async fn cancel_job(&self, request: Request) -> Result, Status> { if let Err(status) = self.check_leader(&request) { - { - let proxy = &self.leader_proxy; - let mut client = proxy.get_leader_client().await?; - let mut fwd = Request::new(request.into_inner()); - *fwd.metadata_mut() = Self::forwarded_metadata(); - return client.cancel_job(fwd).await; + let proxy = &self.leader_proxy; + match proxy.get_leader_client().await { + Ok(mut client) => { + let mut fwd = Request::new(request.into_inner()); + *fwd.metadata_mut() = Self::forwarded_metadata(); + return client.cancel_job(fwd).await; + } + Err(e) => { + warn!("failed to forward cancel_job to leader: {e}"); + return Err(status); + } } - return Err(status); } let req = request.into_inner(); @@ -242,14 +251,18 @@ impl SlurmController for ControllerService { async fn update_job(&self, request: Request) -> Result, Status> { if let Err(status) = self.check_leader(&request) { - { - let proxy = &self.leader_proxy; - let mut client = proxy.get_leader_client().await?; - let mut fwd = Request::new(request.into_inner()); - *fwd.metadata_mut() = Self::forwarded_metadata(); - return client.update_job(fwd).await; + let proxy = &self.leader_proxy; + match proxy.get_leader_client().await { + Ok(mut client) => { + let mut fwd = Request::new(request.into_inner()); + *fwd.metadata_mut() = Self::forwarded_metadata(); + return client.update_job(fwd).await; + } + Err(e) => { + warn!("failed to forward update_job to leader: {e}"); + return Err(status); + } } - return Err(status); } let req = request.into_inner(); @@ -341,14 +354,18 @@ impl SlurmController for ControllerService { request: Request, ) -> Result, Status> { if let Err(status) = self.check_leader(&request) { - { - let proxy = &self.leader_proxy; - let mut client = proxy.get_leader_client().await?; - let mut fwd = Request::new(request.into_inner()); - *fwd.metadata_mut() = Self::forwarded_metadata(); - return client.update_node(fwd).await; + let proxy = &self.leader_proxy; + match proxy.get_leader_client().await { + Ok(mut client) => { + let mut fwd = Request::new(request.into_inner()); + *fwd.metadata_mut() = Self::forwarded_metadata(); + return client.update_node(fwd).await; + } + Err(e) => { + warn!("failed to forward update_node to leader: {e}"); + return Err(status); + } } - return Err(status); } let req = request.into_inner(); @@ -408,14 +425,18 @@ impl SlurmController for ControllerService { request: Request, ) -> Result, Status> { if let Err(status) = self.check_leader(&request) { - { - let proxy = &self.leader_proxy; - let mut client = proxy.get_leader_client().await?; - let mut fwd = Request::new(request.into_inner()); - *fwd.metadata_mut() = Self::forwarded_metadata(); - return client.register_agent(fwd).await; + let proxy = &self.leader_proxy; + match proxy.get_leader_client().await { + Ok(mut client) => { + let mut fwd = Request::new(request.into_inner()); + *fwd.metadata_mut() = Self::forwarded_metadata(); + return client.register_agent(fwd).await; + } + Err(e) => { + warn!("failed to forward register_agent to leader: {e}"); + return Err(status); + } } - return Err(status); } // Extract the remote IP from the gRPC connection as fallback @@ -476,14 +497,18 @@ impl SlurmController for ControllerService { request: Request, ) -> Result, Status> { if let Err(status) = self.check_leader(&request) { - { - let proxy = &self.leader_proxy; - let mut client = proxy.get_leader_client().await?; - let mut fwd = Request::new(request.into_inner()); - *fwd.metadata_mut() = Self::forwarded_metadata(); - return client.report_job_status(fwd).await; + let proxy = &self.leader_proxy; + match proxy.get_leader_client().await { + Ok(mut client) => { + let mut fwd = Request::new(request.into_inner()); + *fwd.metadata_mut() = Self::forwarded_metadata(); + return client.report_job_status(fwd).await; + } + Err(e) => { + warn!("failed to forward report_job_status to leader: {e}"); + return Err(status); + } } - return Err(status); } let req = request.into_inner(); @@ -504,15 +529,18 @@ impl SlurmController for ControllerService { request: Request, ) -> Result, Status> { if let Err(status) = self.check_leader(&request) { - { - let proxy = &self.leader_proxy; - let mut client = proxy.get_leader_client().await?; - let mut fwd = Request::new(request.into_inner()); - *fwd.metadata_mut() = Self::forwarded_metadata(); - return client.heartbeat(fwd).await; + let proxy = &self.leader_proxy; + match proxy.get_leader_client().await { + Ok(mut client) => { + let mut fwd = Request::new(request.into_inner()); + *fwd.metadata_mut() = Self::forwarded_metadata(); + return client.heartbeat(fwd).await; + } + Err(e) => { + warn!("failed to forward heartbeat to leader: {e}"); + return Err(status); + } } - #[allow(unreachable_code)] - return Err(status); } let req = request.into_inner(); @@ -563,14 +591,18 @@ impl SlurmController for ControllerService { request: Request, ) -> Result, Status> { if let Err(status) = self.check_leader(&request) { - { - let proxy = &self.leader_proxy; - let mut client = proxy.get_leader_client().await?; - let mut fwd = Request::new(request.into_inner()); - *fwd.metadata_mut() = Self::forwarded_metadata(); - return client.create_job_step(fwd).await; + let proxy = &self.leader_proxy; + match proxy.get_leader_client().await { + Ok(mut client) => { + let mut fwd = Request::new(request.into_inner()); + *fwd.metadata_mut() = Self::forwarded_metadata(); + return client.create_job_step(fwd).await; + } + Err(e) => { + warn!("failed to forward create_job_step to leader: {e}"); + return Err(status); + } } - return Err(status); } let req = request.into_inner(); @@ -619,14 +651,18 @@ impl SlurmController for ControllerService { request: Request, ) -> Result, Status> { if let Err(status) = self.check_leader(&request) { - { - let proxy = &self.leader_proxy; - let mut client = proxy.get_leader_client().await?; - let mut fwd = Request::new(request.into_inner()); - *fwd.metadata_mut() = Self::forwarded_metadata(); - return client.create_reservation(fwd).await; + let proxy = &self.leader_proxy; + match proxy.get_leader_client().await { + Ok(mut client) => { + let mut fwd = Request::new(request.into_inner()); + *fwd.metadata_mut() = Self::forwarded_metadata(); + return client.create_reservation(fwd).await; + } + Err(e) => { + warn!("failed to forward create_reservation to leader: {e}"); + return Err(status); + } } - return Err(status); } let req = request.into_inner(); @@ -663,14 +699,18 @@ impl SlurmController for ControllerService { request: Request, ) -> Result, Status> { if let Err(status) = self.check_leader(&request) { - { - let proxy = &self.leader_proxy; - let mut client = proxy.get_leader_client().await?; - let mut fwd = Request::new(request.into_inner()); - *fwd.metadata_mut() = Self::forwarded_metadata(); - return client.update_reservation(fwd).await; + let proxy = &self.leader_proxy; + match proxy.get_leader_client().await { + Ok(mut client) => { + let mut fwd = Request::new(request.into_inner()); + *fwd.metadata_mut() = Self::forwarded_metadata(); + return client.update_reservation(fwd).await; + } + Err(e) => { + warn!("failed to forward update_reservation to leader: {e}"); + return Err(status); + } } - return Err(status); } let req = request.into_inner(); @@ -694,14 +734,18 @@ impl SlurmController for ControllerService { request: Request, ) -> Result, Status> { if let Err(status) = self.check_leader(&request) { - { - let proxy = &self.leader_proxy; - let mut client = proxy.get_leader_client().await?; - let mut fwd = Request::new(request.into_inner()); - *fwd.metadata_mut() = Self::forwarded_metadata(); - return client.delete_reservation(fwd).await; + let proxy = &self.leader_proxy; + match proxy.get_leader_client().await { + Ok(mut client) => { + let mut fwd = Request::new(request.into_inner()); + *fwd.metadata_mut() = Self::forwarded_metadata(); + return client.delete_reservation(fwd).await; + } + Err(e) => { + warn!("failed to forward delete_reservation to leader: {e}"); + return Err(status); + } } - return Err(status); } let name = request.into_inner().name;