Skip to content

Commit 962817c

Browse files
Keep track of launched instance IDs in a file
1. The only point of instance launch is `launch_ec2_instances`. Therefore, at the end of the function, this commit adds instance ids to a file to keep track 2. `get_cluster_instances` used to rely solely on retrieving instances with tags `parallelcluster:cluster-name` and "tag:parallelcluster:node-type". It ceases to function when users manually remove tags from instances. This commit, in addition to getting instances by tags, gets instances from the file written by `launch_ec2_instances`. At the end of the function, this commits remove non-existing instances from the file. 3. remove argument `alive_states_only` for code simplicity. With this commit, cluster scaling should work with/without tags. Therefore, logic of tags and logic of the file are redundant, therefore increase resilience. This commit is work-in-progress for the following reasons: 1. Code in `get_cluster_instances` could be simplified 2. Requires changes in CLI and Cookbook a. in CLI, the IAM policies for head node and clean up lambda are ``` { "Action": "ec2:TerminateInstances", "Resource": "*", "Effect": "Allow", "Sid": "EC2Terminate", "Condition": { "StringEquals": { "ec2:ResourceTag/parallelcluster:cluster-name": <cluster name> } }, }, ``` it should be changed to ``` { "Action": "ec2:TerminateInstances", "Resource": "*", "Effect": "Allow", "Sid": "EC2Terminate", "Condition": { "StringEquals": { "ec2:ResourceTag/aws:ec2launchtemplate:id": [ <Launch template id 1>, <Launch template id 2>, ... It should contain all launch templates of compute and login nodes. ] } } }, ``` b. Cookbook should create the file `/etc/parallelcluster/slurm_plugin/running_nodes` during config stage and set it to the owner to `pcluster-admin:pcluster-admin`. This is necessary because the node package does not have permission to create the file
1 parent e5f0f9c commit 962817c

File tree

3 files changed

+50
-23
lines changed

3 files changed

+50
-23
lines changed

src/slurm_plugin/clustermgtd.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,7 @@ def _get_ec2_instances(self):
649649
time.sleep(5)
650650
log.info("Retrieving list of EC2 instances associated with the cluster")
651651
try:
652-
return self._instance_manager.get_cluster_instances(include_head_node=False, alive_states_only=True)
652+
return self._instance_manager.get_cluster_instances(include_head_node=False)
653653
except Exception as e:
654654
log.error("Failed when getting instance info from EC2 with exception %s", e)
655655
raise ClusterManager.EC2InstancesInfoUnavailable

src/slurm_plugin/fleet_manager.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,15 @@ def launch_ec2_instances(self, count, job_id=None):
193193
launch_params = self._evaluate_launch_params(count)
194194
assigned_nodes = self._launch_instances(launch_params)
195195
if len(assigned_nodes.get("Instances")) > 0:
196+
instance_ids = [instance.get("InstanceId") for instance in assigned_nodes.get("Instances") if instance.get("InstanceId") ]
196197
logger.info(
197198
"Launched the following instances %s",
198-
print_with_count([instance.get("InstanceId", "") for instance in assigned_nodes.get("Instances")]),
199+
print_with_count(instance_ids),
199200
)
200201
logger.debug("Launched instances information: %s", assigned_nodes.get("Instances"))
202+
running_nodes_file_path = "/etc/parallelcluster/slurm_plugin/running_nodes"
203+
with open(running_nodes_file_path, "a") as f:
204+
f.write('\n'.join(instance_ids)+'\n')
201205

202206
return [EC2Instance.from_describe_instance_data(instance_info) for instance_info in assigned_nodes["Instances"]]
203207

src/slurm_plugin/instance_manager.py

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -258,46 +258,69 @@ def get_unhealthy_cluster_instance_status(self, cluster_instance_ids):
258258
return list(instance_health_states.values())
259259

260260
@log_exception(logger, "getting cluster instances from EC2", raise_on_error=True)
261-
def get_cluster_instances(self, include_head_node=False, alive_states_only=True):
261+
def get_cluster_instances(self, include_head_node=False):
262262
"""
263263
Get instances that are associated with the cluster.
264264
265265
Instances without all the info set are ignored and not returned
266266
"""
267+
running_instances_from_file = set()
268+
running_instances_file_path = "/etc/parallelcluster/slurm_plugin/running_nodes"
269+
with open(running_instances_file_path, "r") as f:
270+
running_instances_from_file.update(set([line.strip() for line in f.readlines() if line.strip()]))
271+
untracked_instances = running_instances_from_file.copy()
272+
267273
ec2_client = boto3.client("ec2", region_name=self._region, config=self._boto3_config)
268274
paginator = ec2_client.get_paginator("describe_instances")
269275
args = {
270276
"Filters": [{"Name": "tag:parallelcluster:cluster-name", "Values": [self._cluster_name]}],
271277
}
272-
if alive_states_only:
273-
args["Filters"].append({"Name": "instance-state-name", "Values": list(EC2_INSTANCE_ALIVE_STATES)})
278+
args["Filters"].append({"Name": "instance-state-name", "Values": list(EC2_INSTANCE_ALIVE_STATES)})
274279
if not include_head_node:
275280
args["Filters"].append({"Name": "tag:parallelcluster:node-type", "Values": ["Compute"]})
276281
response_iterator = paginator.paginate(PaginationConfig={"PageSize": BOTO3_PAGINATION_PAGE_SIZE}, **args)
277282
filtered_iterator = response_iterator.search("Reservations[].Instances[]")
278283

279284
instances = []
280285
for instance_info in filtered_iterator:
281-
try:
282-
private_ip, private_dns_name, all_private_ips = get_private_ip_address_and_dns_name(instance_info)
283-
instances.append(
284-
EC2Instance(
285-
instance_info["InstanceId"],
286-
private_ip,
287-
private_dns_name.split(".")[0],
288-
all_private_ips,
289-
instance_info["LaunchTime"],
290-
)
291-
)
292-
except Exception as e:
293-
logger.warning(
294-
"Ignoring instance %s because not all EC2 info are available, exception: %s, message: %s",
286+
untracked_instances.discard(instance_info["InstanceId"])
287+
self._create_ec2_instance_object(instance_info, instances)
288+
non_existing_instances = untracked_instances.copy()
289+
for instance_ids in self.chunks(list(untracked_instances),150):
290+
filters=[{"Name": "instance-id", "Values": instance_ids}, {"Name": "instance-state-name", "Values": list(EC2_INSTANCE_ALIVE_STATES)}]
291+
response_iterator = paginator.paginate(PaginationConfig={"PageSize": BOTO3_PAGINATION_PAGE_SIZE}, Filters=filters)
292+
filtered_iterator = response_iterator.search("Reservations[].Instances[]")
293+
for instance_info in filtered_iterator:
294+
non_existing_instances.discard(instance_info["InstanceId"])
295+
self._create_ec2_instance_object(instance_info, instances)
296+
with open(running_instances_file_path, "w") as f:
297+
f.write('\n'.join(list(running_instances_from_file - non_existing_instances))+'\n')
298+
return instances
299+
300+
301+
def chunks(self, lst, n):
302+
"""Yield successive n-sized chunks from lst."""
303+
for i in range(0, len(lst), n):
304+
yield lst[i:i + n]
305+
def _create_ec2_instance_object(self, instance_info, instances):
306+
try:
307+
private_ip, private_dns_name, all_private_ips = get_private_ip_address_and_dns_name(instance_info)
308+
instances.append(
309+
EC2Instance(
295310
instance_info["InstanceId"],
296-
type(e).__name__,
297-
e,
311+
private_ip,
312+
private_dns_name.split(".")[0],
313+
all_private_ips,
314+
instance_info["LaunchTime"],
298315
)
299-
300-
return instances
316+
)
317+
except Exception as e:
318+
logger.warning(
319+
"Ignoring instance %s because not all EC2 info are available, exception: %s, message: %s",
320+
instance_info["InstanceId"],
321+
type(e).__name__,
322+
e,
323+
)
301324

302325
def terminate_all_compute_nodes(self, terminate_batch_size):
303326
try:

0 commit comments

Comments
 (0)