Skip to content

Commit 86bfca3

Browse files
committed
nodewatcher - slurm: integrate new get_pending_jobs_info function
This adds some logic to correctly filter out jobs that cannot be executed due to cluster limits Signed-off-by: Francesco De Martino <[email protected]>
1 parent bfd5763 commit 86bfca3

File tree

3 files changed

+73
-48
lines changed

3 files changed

+73
-48
lines changed

common/slurm.py

Lines changed: 0 additions & 22 deletions
This file was deleted.

nodewatcher/plugins/slurm.py

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import logging
1313
import subprocess
1414

15-
from common.slurm import PENDING_RESOURCES_REASONS
15+
from common.schedulers.slurm_commands import PENDING_RESOURCES_REASONS, get_pending_jobs_info
1616
from common.utils import check_command_output, run_command
1717

1818
log = logging.getLogger(__name__)
@@ -33,33 +33,16 @@ def hasJobs(hostname):
3333

3434

3535
def hasPendingJobs(instance_properties, max_size):
36-
command = "/opt/slurm/bin/squeue -t PD --noheader -o '%c-%r'"
37-
38-
# Command outputs the pending jobs in the queue in the following format
39-
# 8-Resources
40-
# 8-Priority
41-
# 8-PartitionNodeLimit
4236
try:
43-
node_slots = _get_node_slots()
44-
output = check_command_output(command)
45-
has_pending = False
46-
for line in output.split("\n"):
47-
line_arr = line.split("-")
48-
if len(line_arr) == 2:
49-
required_slots = int(line_arr[0])
50-
pending_code = line_arr[1]
51-
log.info("required_slots %s pending_code %s", required_slots, pending_code)
52-
if pending_code in PENDING_RESOURCES_REASONS and required_slots <= node_slots:
53-
has_pending = True
54-
break
55-
56-
error = False
37+
pending_jobs = get_pending_jobs_info(
38+
max_slots_filter=instance_properties.get("slots"),
39+
max_nodes_filter=max_size,
40+
filter_by_pending_reasons=PENDING_RESOURCES_REASONS,
41+
)
42+
return len(pending_jobs) > 0, False
5743
except Exception as e:
58-
log.warning("Failed when checking for pending jobs with exception %s. Assuming no pending jobs.", e)
59-
error = True
60-
has_pending = False
61-
62-
return has_pending, error
44+
log.error("Failed when checking if node is down with exception %s. Reporting no pending jobs.", e)
45+
return False, True
6346

6447

6548
def lockHost(hostname, unlock=False):
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance
4+
# with the License. A copy of the License is located at
5+
#
6+
# http://aws.amazon.com/apache2.0/
7+
#
8+
# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
9+
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
import pytest
12+
13+
from assertpy import assert_that
14+
from common.schedulers.slurm_commands import PENDING_RESOURCES_REASONS, SlurmJob
15+
from nodewatcher.plugins.slurm import hasPendingJobs
16+
17+
18+
@pytest.mark.parametrize(
19+
"pending_jobs, expected_result",
20+
[
21+
(
22+
[SlurmJob(id="72", state="PD", nodes=5, cpus_total=15, cpus_min_per_node=3, pending_reason="Priority")],
23+
(True, False),
24+
),
25+
(
26+
[
27+
SlurmJob(
28+
id="72", state="PD", nodes=5, cpus_total=15, cpus_min_per_node=3, pending_reason="Resources"
29+
), # 5 3-slot tasks
30+
SlurmJob(
31+
id="73", state="PD", nodes=1, cpus_total=1, cpus_min_per_node=1, pending_reason="Resources"
32+
), # 1 1-slot task
33+
SlurmJob(
34+
id="74", state="PD", nodes=2, cpus_total=2, cpus_min_per_node=1, pending_reason="Resources"
35+
), # 2 1-slot tasks forced on 2 nodes
36+
SlurmJob(
37+
id="75", state="PD", nodes=3, cpus_total=12, cpus_min_per_node=4, pending_reason="Resources"
38+
), # 3 4-slot tasks
39+
SlurmJob(
40+
id="76", state="PD", nodes=1, cpus_total=3, cpus_min_per_node=1, pending_reason="Resources"
41+
), # 3 1-slot tasks
42+
],
43+
(True, False),
44+
),
45+
([], (False, False)),
46+
(Exception, (False, True)),
47+
],
48+
ids=["single_job", "multiple_jobs", "no_jobs", "failure"],
49+
)
50+
def test_has_pending_jobs(pending_jobs, expected_result, mocker):
51+
if pending_jobs is Exception:
52+
mock = mocker.patch("nodewatcher.plugins.slurm.get_pending_jobs_info", side_effect=Exception(), autospec=True)
53+
else:
54+
mock = mocker.patch("nodewatcher.plugins.slurm.get_pending_jobs_info", return_value=pending_jobs, autospec=True)
55+
56+
instance_properties = {"slots": 4}
57+
max_cluster_size = 10
58+
59+
assert_that(hasPendingJobs(instance_properties, max_cluster_size)).is_equal_to(expected_result)
60+
mock.assert_called_with(
61+
filter_by_pending_reasons=PENDING_RESOURCES_REASONS,
62+
max_nodes_filter=max_cluster_size,
63+
max_slots_filter=instance_properties["slots"],
64+
)

0 commit comments

Comments
 (0)