Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 2 additions & 29 deletions crates/spurctld/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,16 +201,15 @@ impl ClusterManager {

/// Cancel a job.
pub fn cancel_job(&self, job_id: JobId, _user: &str) -> anyhow::Result<()> {
let old_state = {
{
let jobs = self.jobs.read();
let job = jobs
.get(&job_id)
.ok_or_else(|| anyhow::anyhow!("job {} not found", job_id))?;
if job.state.is_terminal() {
anyhow::bail!("job {} is already {:?}", job_id, job.state);
}
job.state
};
}

// Use JobComplete (not JobStateChange) so that resource deallocation
// fires for any allocated nodes. For pending jobs, allocated_nodes is empty
Expand Down Expand Up @@ -782,20 +781,6 @@ impl ClusterManager {
debug!(job_id, step_id, "step created");
}

/// Complete a job step.
pub fn complete_step(&self, job_id: JobId, step_id: u32, exit_code: i32) {
if let Some(step) = self.steps.write().get_mut(&(job_id, step_id)) {
step.state = if exit_code == 0 {
StepState::Completed
} else {
StepState::Failed
};
step.exit_code = Some(exit_code);
step.end_time = Some(Utc::now());
debug!(job_id, step_id, exit_code, "step completed");
}
}

/// Get all steps for a job.
pub fn get_steps(&self, job_id: JobId) -> Vec<JobStep> {
self.steps
Expand Down Expand Up @@ -981,16 +966,6 @@ impl ClusterManager {
pending
}

/// Find jobs by name and user (for singleton dependency).
pub fn get_jobs_by_name_user(&self, name: &str, user: &str) -> Vec<Job> {
self.jobs
.read()
.values()
.filter(|j| j.spec.name == name && j.spec.user == user)
.cloned()
.collect()
}

/// Create a new reservation.
pub fn create_reservation(&self, res: Reservation) -> anyhow::Result<()> {
let mut reservations = self.reservations.write();
Expand Down Expand Up @@ -1079,8 +1054,6 @@ impl ClusterManager {
cluster_state: &spur_sched::traits::ClusterState,
) {
use spur_core::job::PendingReason;
use spur_sched::backfill::BackfillScheduler;
use spur_sched::traits::Scheduler;

let mut jobs = self.jobs.write();

Expand Down
2 changes: 1 addition & 1 deletion crates/spurctld/src/scheduler_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ pub async fn send_cancel_to_agents(
#[cfg(test)]
mod tests {
use super::*;
use spur_core::job::{Job, JobSpec, JobState};
use spur_core::job::{Job, JobSpec};
use spur_core::resource::{GpuLinkType, GpuResource, ResourceSet};
use std::collections::HashMap;

Expand Down
200 changes: 122 additions & 78 deletions crates/spurctld/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use chrono::{DateTime, Utc};
use tokio::sync::Mutex;
use tonic::metadata::MetadataValue;
use tonic::{Request, Response, Status};
use tracing::warn;

use spur_core::reservation::Reservation;
use spur_proto::proto::slurm_controller_client::SlurmControllerClient;
Expand Down Expand Up @@ -116,14 +117,18 @@ impl SlurmController for ControllerService {
request: Request<SubmitJobRequest>,
) -> Result<Response<SubmitJobResponse>, Status> {
if let Err(status) = self.check_leader(&request) {
{
let proxy = &self.leader_proxy;
let mut client = proxy.get_leader_client().await?;
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.submit_job(fwd).await;
let proxy = &self.leader_proxy;
match proxy.get_leader_client().await {
Ok(mut client) => {
let mut fwd = Request::new(request.into_inner());
Comment thread
shiv-tyagi marked this conversation as resolved.
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.submit_job(fwd).await;
}
Err(e) => {
warn!("failed to forward submit_job to leader: {e}");
return Err(status);
Comment thread
shiv-tyagi marked this conversation as resolved.
}
}
return Err(status);
}

let spec = request
Expand Down Expand Up @@ -209,14 +214,18 @@ impl SlurmController for ControllerService {

async fn cancel_job(&self, request: Request<CancelJobRequest>) -> Result<Response<()>, Status> {
if let Err(status) = self.check_leader(&request) {
{
let proxy = &self.leader_proxy;
let mut client = proxy.get_leader_client().await?;
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.cancel_job(fwd).await;
let proxy = &self.leader_proxy;
match proxy.get_leader_client().await {
Ok(mut client) => {
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.cancel_job(fwd).await;
}
Err(e) => {
warn!("failed to forward cancel_job to leader: {e}");
return Err(status);
}
}
return Err(status);
}

let req = request.into_inner();
Expand All @@ -242,14 +251,18 @@ impl SlurmController for ControllerService {

async fn update_job(&self, request: Request<UpdateJobRequest>) -> Result<Response<()>, Status> {
if let Err(status) = self.check_leader(&request) {
{
let proxy = &self.leader_proxy;
let mut client = proxy.get_leader_client().await?;
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.update_job(fwd).await;
let proxy = &self.leader_proxy;
match proxy.get_leader_client().await {
Ok(mut client) => {
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.update_job(fwd).await;
}
Err(e) => {
warn!("failed to forward update_job to leader: {e}");
return Err(status);
}
}
return Err(status);
}

let req = request.into_inner();
Expand Down Expand Up @@ -341,14 +354,18 @@ impl SlurmController for ControllerService {
request: Request<UpdateNodeRequest>,
) -> Result<Response<()>, Status> {
if let Err(status) = self.check_leader(&request) {
{
let proxy = &self.leader_proxy;
let mut client = proxy.get_leader_client().await?;
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.update_node(fwd).await;
let proxy = &self.leader_proxy;
match proxy.get_leader_client().await {
Ok(mut client) => {
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.update_node(fwd).await;
}
Err(e) => {
warn!("failed to forward update_node to leader: {e}");
return Err(status);
}
}
return Err(status);
}

let req = request.into_inner();
Expand Down Expand Up @@ -408,14 +425,18 @@ impl SlurmController for ControllerService {
request: Request<RegisterAgentRequest>,
) -> Result<Response<RegisterAgentResponse>, Status> {
if let Err(status) = self.check_leader(&request) {
{
let proxy = &self.leader_proxy;
let mut client = proxy.get_leader_client().await?;
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.register_agent(fwd).await;
let proxy = &self.leader_proxy;
match proxy.get_leader_client().await {
Ok(mut client) => {
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.register_agent(fwd).await;
}
Err(e) => {
warn!("failed to forward register_agent to leader: {e}");
return Err(status);
}
}
return Err(status);
}

// Extract the remote IP from the gRPC connection as fallback
Expand Down Expand Up @@ -476,14 +497,18 @@ impl SlurmController for ControllerService {
request: Request<ReportJobStatusRequest>,
) -> Result<Response<()>, Status> {
if let Err(status) = self.check_leader(&request) {
{
let proxy = &self.leader_proxy;
let mut client = proxy.get_leader_client().await?;
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.report_job_status(fwd).await;
let proxy = &self.leader_proxy;
match proxy.get_leader_client().await {
Ok(mut client) => {
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.report_job_status(fwd).await;
}
Err(e) => {
warn!("failed to forward report_job_status to leader: {e}");
return Err(status);
}
}
return Err(status);
}

let req = request.into_inner();
Expand All @@ -504,15 +529,18 @@ impl SlurmController for ControllerService {
request: Request<HeartbeatRequest>,
) -> Result<Response<HeartbeatResponse>, Status> {
if let Err(status) = self.check_leader(&request) {
{
let proxy = &self.leader_proxy;
let mut client = proxy.get_leader_client().await?;
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.heartbeat(fwd).await;
let proxy = &self.leader_proxy;
match proxy.get_leader_client().await {
Ok(mut client) => {
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.heartbeat(fwd).await;
}
Err(e) => {
warn!("failed to forward heartbeat to leader: {e}");
return Err(status);
}
}
#[allow(unreachable_code)]
return Err(status);
}

let req = request.into_inner();
Expand Down Expand Up @@ -563,14 +591,18 @@ impl SlurmController for ControllerService {
request: Request<CreateJobStepRequest>,
) -> Result<Response<CreateJobStepResponse>, Status> {
if let Err(status) = self.check_leader(&request) {
{
let proxy = &self.leader_proxy;
let mut client = proxy.get_leader_client().await?;
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.create_job_step(fwd).await;
let proxy = &self.leader_proxy;
match proxy.get_leader_client().await {
Ok(mut client) => {
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.create_job_step(fwd).await;
}
Err(e) => {
warn!("failed to forward create_job_step to leader: {e}");
return Err(status);
}
}
return Err(status);
}

let req = request.into_inner();
Expand Down Expand Up @@ -619,14 +651,18 @@ impl SlurmController for ControllerService {
request: Request<CreateReservationRequest>,
) -> Result<Response<()>, Status> {
if let Err(status) = self.check_leader(&request) {
{
let proxy = &self.leader_proxy;
let mut client = proxy.get_leader_client().await?;
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.create_reservation(fwd).await;
let proxy = &self.leader_proxy;
match proxy.get_leader_client().await {
Ok(mut client) => {
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.create_reservation(fwd).await;
}
Err(e) => {
warn!("failed to forward create_reservation to leader: {e}");
return Err(status);
}
}
return Err(status);
}

let req = request.into_inner();
Expand Down Expand Up @@ -663,14 +699,18 @@ impl SlurmController for ControllerService {
request: Request<UpdateReservationRequest>,
) -> Result<Response<()>, Status> {
if let Err(status) = self.check_leader(&request) {
{
let proxy = &self.leader_proxy;
let mut client = proxy.get_leader_client().await?;
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.update_reservation(fwd).await;
let proxy = &self.leader_proxy;
match proxy.get_leader_client().await {
Ok(mut client) => {
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.update_reservation(fwd).await;
}
Err(e) => {
warn!("failed to forward update_reservation to leader: {e}");
return Err(status);
}
}
return Err(status);
}

let req = request.into_inner();
Expand All @@ -694,14 +734,18 @@ impl SlurmController for ControllerService {
request: Request<DeleteReservationRequest>,
) -> Result<Response<()>, Status> {
if let Err(status) = self.check_leader(&request) {
{
let proxy = &self.leader_proxy;
let mut client = proxy.get_leader_client().await?;
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.delete_reservation(fwd).await;
let proxy = &self.leader_proxy;
match proxy.get_leader_client().await {
Ok(mut client) => {
let mut fwd = Request::new(request.into_inner());
*fwd.metadata_mut() = Self::forwarded_metadata();
return client.delete_reservation(fwd).await;
}
Err(e) => {
warn!("failed to forward delete_reservation to leader: {e}");
return Err(status);
}
}
return Err(status);
}

let name = request.into_inner().name;
Expand Down
Loading