1414
1515import pytest
1616from assertpy import assert_that
17+ from remote_command_executor import RemoteCommandExecutor
1718from retrying import retry
1819from tags_utils import (
1920 convert_tags_dicts_to_tags_list ,
2627)
2728from time_utils import minutes , seconds
2829
30+ from tests .common .schedulers_common import SlurmCommands
2931from tests .common .utils import get_installed_parallelcluster_version
3032
3133
@@ -64,15 +66,34 @@ def test_tag_propagation(pcluster_config_reader, clusters_factory, scheduler, os
6466 # Checks for tag propagation
6567 _check_tag_propagation (cluster , scheduler , os , volume_name )
6668
69+ if scheduler == "slurm" :
70+ _test_queue_and_compute_resources_tags (cluster , pcluster_config_reader , scheduler , os , volume_name )
71+
6772
6873@retry (wait_fixed = seconds (20 ), stop_max_delay = minutes (5 ))
6974def _wait_for_compute_fleet_start (cluster ):
7075 compute_nodes = cluster .get_cluster_instance_ids (node_type = "Compute" )
7176 assert_that (compute_nodes ).is_length (1 )
7277
7378
74- def _check_tag_propagation (cluster , scheduler , os , volume_name ):
75- config_file_tags = {"ConfigFileTag" : "ConfigFileTagValue" }
79+ def _check_tag_propagation (cluster , scheduler , os , volume_name , queue_tags = None , compute_resource_tags = None ):
80+ config_file_tags = {
81+ "ConfigFileTag" : "ConfigFileTagValue" ,
82+ "QueueOverrideTag" : "ClusterLevelValue" ,
83+ "ComputeOverrideTag" : "ClusterLevelValue" ,
84+ }
85+ if not queue_tags :
86+ queue_tags = {
87+ "QueueTag" : "QueueValue" ,
88+ "QueueOverrideTag" : "QueueLevelValue" ,
89+ "ComputeOverrideTag" : "QueueLevelValue" ,
90+ }
91+ if not compute_resource_tags :
92+ compute_resource_tags = {
93+ "ComputeResourceTag" : "ComputeResourceValue" ,
94+ "ComputeOverrideTag" : "ComputeLevelValue" ,
95+ }
96+
7697 version_tags = {"parallelcluster:version" : get_installed_parallelcluster_version ()}
7798 cluster_name_tags = {"parallelcluster:cluster-name" : cluster .name }
7899 test_cases = [
@@ -101,7 +122,7 @@ def _check_tag_propagation(cluster, scheduler, os, volume_name):
101122 "expected_tags" : (
102123 cluster_name_tags ,
103124 {"Name" : "Compute" , "parallelcluster:node-type" : "Compute" },
104- config_file_tags ,
125+ { ** config_file_tags , ** queue_tags , ** compute_resource_tags } ,
105126 ),
106127 "skip" : scheduler == "awsbatch" ,
107128 },
@@ -111,7 +132,7 @@ def _check_tag_propagation(cluster, scheduler, os, volume_name):
111132 "expected_tags" : (
112133 cluster_name_tags ,
113134 {"parallelcluster:node-type" : "Compute" },
114- config_file_tags if scheduler == "slurm" else { },
135+ { ** config_file_tags , ** queue_tags , ** compute_resource_tags },
115136 ),
116137 "tag_getter_kwargs" : {"cluster" : cluster , "os" : os },
117138 "skip" : scheduler == "awsbatch" ,
@@ -135,3 +156,37 @@ def _check_tag_propagation(cluster, scheduler, os, volume_name):
135156 observed_tags = tag_getter (** tag_getter_args )
136157 expected_tags = test_case ["expected_tags" ]
137158 assert_that (observed_tags ).contains (* convert_tags_dicts_to_tags_list (expected_tags ))
159+
160+
161+ def _test_queue_and_compute_resources_tags (cluster , pcluster_config_reader , scheduler , os , volume_name ):
162+ # Test update queue level tags and compute resource level tags with queue update strategy
163+ command_executor = RemoteCommandExecutor (cluster )
164+ slurm_commands = SlurmCommands (command_executor )
165+ result = slurm_commands .submit_command ("sleep infinity" , constraint = "static" )
166+ job_id = slurm_commands .assert_job_submitted (result .stdout )
167+ slurm_commands .wait_job_running (job_id )
168+
169+ # Updates cluster with new configuration
170+ updated_cluster_config = pcluster_config_reader (
171+ config_file = "pcluster.config.update.queue_update.yaml" , volume_name = volume_name
172+ )
173+ cluster .update (str (updated_cluster_config ))
174+
175+ slurm_commands .assert_job_state (job_id , "RUNNING" )
176+ # check nodes still have old tags before replacement
177+ _check_tag_propagation (cluster , scheduler , os , volume_name )
178+
179+ # requeue job to launch new instances for nodes
180+ command_executor .run_remote_command (f"scontrol requeue { job_id } " )
181+ slurm_commands .wait_job_running (job_id )
182+ # check new instances have new tags
183+ new_queue_tags = {
184+ "QueueTagUpdate" : "QueueValueUpdate" ,
185+ "QueueOverrideTag" : "QueueLevelValueUpdate" ,
186+ "ComputeOverrideTag" : "QueueLevelValueUpdate" ,
187+ }
188+ new_compute_resource_tags = {
189+ "ComputeResourceTagUpdate" : "ComputeResourceValueUpdate" ,
190+ "ComputeOverrideTag" : "ComputeLevelValueUpdate" ,
191+ }
192+ _check_tag_propagation (cluster , scheduler , os , volume_name , new_queue_tags , new_compute_resource_tags )
0 commit comments