Skip to content

Commit e8cd8fa

Browse files
Add another layer to detect orphaned instances
1 parent e5f0f9c commit e8cd8fa

File tree

3 files changed

+51
-23
lines changed

3 files changed

+51
-23
lines changed

src/slurm_plugin/clustermgtd.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,7 @@ def _get_ec2_instances(self):
649649
time.sleep(5)
650650
log.info("Retrieving list of EC2 instances associated with the cluster")
651651
try:
652-
return self._instance_manager.get_cluster_instances(include_head_node=False, alive_states_only=True)
652+
return self._instance_manager.get_cluster_instances(include_head_node=False)
653653
except Exception as e:
654654
log.error("Failed when getting instance info from EC2 with exception %s", e)
655655
raise ClusterManager.EC2InstancesInfoUnavailable

src/slurm_plugin/fleet_manager.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,15 @@ def launch_ec2_instances(self, count, job_id=None):
193193
launch_params = self._evaluate_launch_params(count)
194194
assigned_nodes = self._launch_instances(launch_params)
195195
if len(assigned_nodes.get("Instances")) > 0:
196+
instance_ids = [instance.get("InstanceId") for instance in assigned_nodes.get("Instances") if instance.get("InstanceId") ]
196197
logger.info(
197198
"Launched the following instances %s",
198-
print_with_count([instance.get("InstanceId", "") for instance in assigned_nodes.get("Instances")]),
199+
print_with_count(instance_ids),
199200
)
200201
logger.debug("Launched instances information: %s", assigned_nodes.get("Instances"))
202+
running_nodes_file_path = "/etc/parallelcluster/slurm_plugin/running_nodes"
203+
with open(running_nodes_file_path, "a+") as f:
204+
f.writelines(instance_ids)
201205

202206
return [EC2Instance.from_describe_instance_data(instance_info) for instance_info in assigned_nodes["Instances"]]
203207

src/slurm_plugin/instance_manager.py

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -258,46 +258,70 @@ def get_unhealthy_cluster_instance_status(self, cluster_instance_ids):
258258
return list(instance_health_states.values())
259259

260260
@log_exception(logger, "getting cluster instances from EC2", raise_on_error=True)
261-
def get_cluster_instances(self, include_head_node=False, alive_states_only=True):
261+
def get_cluster_instances(self, include_head_node=False):
262262
"""
263263
Get instances that are associated with the cluster.
264264
265265
Instances without all the info set are ignored and not returned
266266
"""
267+
running_instances_from_file = {}
268+
running_instances_file_path = "/etc/parallelcluster/slurm_plugin/running_nodes"
269+
with open(running_instances_file_path, "a+") as f:
270+
running_instances_from_file.update(set(f.readlines()))
271+
272+
untracked_instances = running_instances_from_file.copy()
273+
267274
ec2_client = boto3.client("ec2", region_name=self._region, config=self._boto3_config)
268275
paginator = ec2_client.get_paginator("describe_instances")
269276
args = {
270277
"Filters": [{"Name": "tag:parallelcluster:cluster-name", "Values": [self._cluster_name]}],
271278
}
272-
if alive_states_only:
273-
args["Filters"].append({"Name": "instance-state-name", "Values": list(EC2_INSTANCE_ALIVE_STATES)})
279+
args["Filters"].append({"Name": "instance-state-name", "Values": list(EC2_INSTANCE_ALIVE_STATES)})
274280
if not include_head_node:
275281
args["Filters"].append({"Name": "tag:parallelcluster:node-type", "Values": ["Compute"]})
276282
response_iterator = paginator.paginate(PaginationConfig={"PageSize": BOTO3_PAGINATION_PAGE_SIZE}, **args)
277283
filtered_iterator = response_iterator.search("Reservations[].Instances[]")
278284

279285
instances = []
280286
for instance_info in filtered_iterator:
281-
try:
282-
private_ip, private_dns_name, all_private_ips = get_private_ip_address_and_dns_name(instance_info)
283-
instances.append(
284-
EC2Instance(
285-
instance_info["InstanceId"],
286-
private_ip,
287-
private_dns_name.split(".")[0],
288-
all_private_ips,
289-
instance_info["LaunchTime"],
290-
)
291-
)
292-
except Exception as e:
293-
logger.warning(
294-
"Ignoring instance %s because not all EC2 info are available, exception: %s, message: %s",
287+
untracked_instances.discard(instance_info["InstanceId"])
288+
self._create_ec2_instance_object(instance_info, instances)
289+
non_existing_instances = untracked_instances.copy()
290+
for instance_ids in self.chunk(list(untracked_instances),150):
291+
filters=[{"Name": "instance-id", "Values": instance_ids}, {"Name": "instance-state-name", "Values": list(EC2_INSTANCE_ALIVE_STATES)}]
292+
response_iterator = paginator.paginate(PaginationConfig={"PageSize": BOTO3_PAGINATION_PAGE_SIZE}, Filters=filters)
293+
filtered_iterator = response_iterator.search("Reservations[].Instances[]")
294+
for instance_info in filtered_iterator:
295+
non_existing_instances.discard(instance_info["InstanceId"])
296+
self._create_ec2_instance_object(instance_info, instances)
297+
with open(running_instances_file_path, "w") as f:
298+
f.writelines(list(running_instances_from_file - non_existing_instances))
299+
return instances
300+
301+
302+
def chunks(self, lst, n):
303+
"""Yield successive n-sized chunks from lst."""
304+
for i in range(0, len(lst), n):
305+
yield lst[i:i + n]
306+
def _create_ec2_instance_object(self, instance_info, instances):
307+
try:
308+
private_ip, private_dns_name, all_private_ips = get_private_ip_address_and_dns_name(instance_info)
309+
instances.append(
310+
EC2Instance(
295311
instance_info["InstanceId"],
296-
type(e).__name__,
297-
e,
312+
private_ip,
313+
private_dns_name.split(".")[0],
314+
all_private_ips,
315+
instance_info["LaunchTime"],
298316
)
299-
300-
return instances
317+
)
318+
except Exception as e:
319+
logger.warning(
320+
"Ignoring instance %s because not all EC2 info are available, exception: %s, message: %s",
321+
instance_info["InstanceId"],
322+
type(e).__name__,
323+
e,
324+
)
301325

302326
def terminate_all_compute_nodes(self, terminate_batch_size):
303327
try:

0 commit comments

Comments
 (0)