Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,32 @@ class ExecutorSettings(ExecutorSettingsBase):
"required": False,
},
)
status_command: Optional[str] = field(
default="sacct -X --parsable2 \
--clusters all \
--noheader --format=JobIdRaw,State \
--starttime {sacct_starttime} \
--endtime now --name {run_uuid}",
metadata={
"help": "The command to query the status of SLURM jobs. "
"This command should return one line for each job with "
"<raw/main_job_id>|<long_status_string>."
"If no accounting is enabled, an alternative is:",
# "squeue --states=all --format=%i|%T --noheader --name {run_uuid}",
"env_var": False,
"required": False,
},
)
cancel_command: Optional[str] = field(
default="scancel {jobids} --clusters=all",
metadata={
"help": "The command to cancel SLURM jobs. "
"This command should include the job IDs to cancel."
"In a federation the addiational argument --clusters=all might be used.",
"env_var": False,
"required": False,
},
)


# Required:
Expand Down Expand Up @@ -458,12 +484,10 @@ async def check_active_jobs(
# the more readable version ought to be re-adapted

# -X: only show main job, no substeps
sacct_command = f"""sacct -X --parsable2 \
--clusters all \
--noheader --format=JobIdRaw,State \
--starttime {sacct_starttime} \
--endtime now --name {self.run_uuid}"""

sacct_command = self.workflow.executor_settings.status_command.format(
sacct_starttime=sacct_starttime, run_uuid=self.run_uuid
)

# for better redability in verbose output
sacct_command = " ".join(shlex.split(sacct_command))

Expand Down Expand Up @@ -590,7 +614,7 @@ def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
# about 30 sec, but can be longer in extreme cases.
# Under 'normal' circumstances, 'scancel' is executed in
# virtually no time.
scancel_command = f"scancel {jobids} --clusters=all"
scancel_command = self.workflow.executor_settings.cancel_command.format(jobids=jobids)

subprocess.check_output(
scancel_command,
Expand Down
Loading