5
5
from cProfile import Profile
6
6
7
7
from django .core .management .base import BaseCommand , CommandError
8
+ from simple_history .utils import bulk_update_with_history , bulk_create_with_history
8
9
9
10
from coldfront .core .resource .models import ResourceType , ResourceAttribute , ResourceAttributeType , AttributeType , Resource
10
11
from coldfront .core .project .models import Project
@@ -27,9 +28,9 @@ def get_output_from_file(self, file_path):
27
28
values = re .sub (r'\s+' , ' ' , line ).strip ().split (' ' )
28
29
yield dict (zip (keys , values ))
29
30
except FileNotFoundError :
30
- print (f"File at { file_path } does not exist. Cant simulate output." )
31
+ logger . error (f"File at { file_path } does not exist. Cant simulate output." )
31
32
except IOError as e :
32
- print (f"An error occurred: { e } " )
33
+ logger . error (f"An error occurred: { e } " )
33
34
34
35
def add_arguments (self , parser ):
35
36
parser .add_argument ("-e" , "--environment" , help = "Environment, use dev to simulate output" )
@@ -51,9 +52,9 @@ def calculate_gpu_count(gres_value):
51
52
return reduce (lambda x , y : x + y ,[int (gpu_info .split (':' )[2 ].replace ('(S' ,'' )) for gpu_info in gpu_list ])
52
53
53
54
def calculate_cpu_count (row ):
54
- if row .get ('S:C:T ' , None ) is None :
55
+ if row .get ('s:c:t ' , None ) is None :
55
56
return 0
56
- cpu_count = row .get ('S:C:T ' ).split (':' )[1 ]
57
+ cpu_count = row .get ('s:c:t ' ).split (':' )[1 ]
57
58
return int (cpu_count )
58
59
59
60
def calculate_owner_value (project_list , row ):
@@ -69,47 +70,110 @@ def calculate_owner_value(project_list, row):
69
70
70
71
env = options ['environment' ] or 'production'
71
72
if 'dev' in env :
72
- output = self .get_output_from_file (os .path .join (os .getcwd (), 'coldfront/plugins/slurm/management/commands/sinfo .txt' ))
73
+ output = self .get_output_from_file (os .path .join (os .getcwd (), 'coldfront/plugins/slurm/management/commands/sinfo_output .txt' ))
73
74
else :
74
75
output = slurm_get_nodes_info ()
75
- print (f'Running on { env } mode' )
76
+ logger . debug (f'Running on { env } mode' )
76
77
project_list = Project .objects .all ()
77
- compute_node , compute_node_created = ResourceType .objects .get_or_create (name = 'Compute Node' , description = 'Compute Node' )
78
- partition_resource_type , partition_created = ResourceType .objects .get_or_create (name = 'Cluster Partition' , description = 'Cluster Partition' )
78
+ compute_node = ResourceType .objects .get (name = 'Compute Node' )
79
+ attribute_type_name_list = ['GPU Count' , 'Core Count' , 'Features' , 'Owner' , 'ServiceEnd' ]
80
+ partition_resource_type = ResourceType .objects .get (name = 'Cluster Partition' )
79
81
gpu_count_attribute_type = ResourceAttributeType .objects .get (name = 'GPU Count' )
80
82
core_count_attribute_type = ResourceAttributeType .objects .get (name = 'Core Count' )
81
83
features_attribute_type = ResourceAttributeType .objects .get (name = 'Features' )
82
84
owner_attribute_type = ResourceAttributeType .objects .get (name = 'Owner' )
83
85
service_end_attribute_type = ResourceAttributeType .objects .get (name = 'ServiceEnd' )
86
+ existing_resource_attributes = list (ResourceAttribute .objects .filter (
87
+ resource_attribute_type__name__in = attribute_type_name_list ,
88
+ resource__resource_type__name = 'Compute Node'
89
+ ).values_list ('pk' , 'resource__name' , 'resource_attribute_type__name' )
90
+ )
91
+ existing_resource_attributes_check = [f'{ resource_att [1 ]} { resource_att [2 ]} ' for resource_att in existing_resource_attributes ]
92
+ existing_resource_attributes_pk_map = {f'{ resource_att [1 ]} { resource_att [2 ]} ' : resource_att [0 ] for resource_att in existing_resource_attributes }
84
93
processed_resources = set ()
85
- bulk_process_resource_attribute = []
94
+ bulk_update_resource_attribute = []
95
+ bulk_create_resource_attribute = []
86
96
bulk_update_resource = []
97
+ processed_resource_attribute = []
87
98
for row in output :
88
99
new_resource , compute_node_created_created = Resource .objects .get_or_create (name = row ['nodelist' ], defaults = {'is_allocatable' :False , 'resource_type' :compute_node })
89
100
Resource .objects .get_or_create (name = row ['partition' ], defaults = {'resource_type' :partition_resource_type })
90
- bulk_process_resource_attribute .append (ResourceAttribute (resource_attribute_type = gpu_count_attribute_type , resource = new_resource , value = calculate_gpu_count (row ['gres' ])))
91
- bulk_process_resource_attribute .append (ResourceAttribute (resource_attribute_type = core_count_attribute_type , resource = new_resource , value = calculate_cpu_count (row )))
92
- bulk_process_resource_attribute .append (ResourceAttribute (resource_attribute_type = features_attribute_type , resource = new_resource , value = row .get ('avail_features' , '(null)' )))
93
- bulk_process_resource_attribute .append (ResourceAttribute (resource_attribute_type = owner_attribute_type , resource = new_resource , value = calculate_owner_value (project_list , row )))
101
+
102
+ gpu_count = ResourceAttribute (resource_attribute_type = gpu_count_attribute_type , resource = new_resource , value = calculate_gpu_count (row ['gres' ]))
103
+ gpu_count_key = f"{ row ['nodelist' ]} { gpu_count_attribute_type .name } "
104
+ if gpu_count_key in existing_resource_attributes_check :
105
+ gpu_count .pk = existing_resource_attributes_pk_map [gpu_count_key ]
106
+ bulk_update_resource_attribute .append (gpu_count )
107
+ else :
108
+ if gpu_count_key not in processed_resource_attribute :
109
+ bulk_create_resource_attribute .append (gpu_count )
110
+ processed_resource_attribute .append (gpu_count_key )
111
+
112
+ core_count = ResourceAttribute (resource_attribute_type = core_count_attribute_type , resource = new_resource , value = calculate_cpu_count (row ))
113
+ core_count_key = f"{ row ['nodelist' ]} { core_count_attribute_type .name } "
114
+ if core_count_key in existing_resource_attributes_check :
115
+ core_count .pk = existing_resource_attributes_pk_map [core_count_key ]
116
+ bulk_update_resource_attribute .append (core_count )
117
+ else :
118
+ if core_count_key not in processed_resource_attribute :
119
+ bulk_create_resource_attribute .append (core_count )
120
+ processed_resource_attribute .append (core_count_key )
121
+
122
+ features = ResourceAttribute (resource_attribute_type = features_attribute_type , resource = new_resource , value = row .get ('avail_features' , '(null)' ))
123
+ features_key = f"{ row ['nodelist' ]} { features_attribute_type .name } "
124
+ if features_key in existing_resource_attributes_check :
125
+ features .pk = existing_resource_attributes_pk_map [features_key ]
126
+ bulk_update_resource_attribute .append (features )
127
+ else :
128
+ if features_key not in processed_resource_attribute :
129
+ bulk_create_resource_attribute .append (features )
130
+ processed_resource_attribute .append (features_key )
131
+
132
+ owner = ResourceAttribute (resource_attribute_type = owner_attribute_type , resource = new_resource , value = calculate_owner_value (project_list , row ))
133
+ owner_key = f"{ row ['nodelist' ]} { owner_attribute_type .name } "
134
+ if owner_key in existing_resource_attributes_check :
135
+ owner .pk = existing_resource_attributes_pk_map [owner_key ]
136
+ bulk_update_resource_attribute .append (owner )
137
+ else :
138
+ if owner_key not in processed_resource_attribute :
139
+ bulk_create_resource_attribute .append (owner )
140
+ processed_resource_attribute .append (owner_key )
141
+
94
142
if new_resource .is_available is False :
95
- bulk_update_resource .append (Resource (name = row ['nodelist' ], is_available = True , resource_type = compute_node ))
96
- bulk_process_resource_attribute .append (ResourceAttribute (resource = new_resource , value = None , resource_attribute_type = service_end_attribute_type ))
143
+ new_resource .is_available = True
144
+ bulk_update_resource .append (new_resource )
145
+ service_end_pk = existing_resource_attributes_pk_map [f"{ row ['nodelist' ]} { service_end_attribute_type .name } " ]
146
+ bulk_update_resource_attribute .append (ResourceAttribute (resource = new_resource , value = None , resource_attribute_type = service_end_attribute_type , pk = service_end_pk ))
97
147
processed_resources .add (new_resource .name )
98
148
try :
99
- ResourceAttribute .objects .bulk_create (bulk_process_resource_attribute , update_conflicts = True , unique_fields = [], update_fields = ['value' ])
100
- Resource .objects .bulk_create (bulk_update_resource , update_conflicts = True , unique_fields = [], update_fields = ['is_available' ])
149
+ logger .debug (f'Updating { len (bulk_update_resource_attribute )} ResourceAttribute records' )
150
+ bulk_update_with_history (bulk_update_resource_attribute , ResourceAttribute , ['value' ], batch_size = 500 , default_change_reason = 'slurm_manage_resource command' )
151
+ logger .debug (f'Updating { len (bulk_update_resource )} Resource records' )
152
+ bulk_update_with_history (bulk_update_resource , Resource , ['is_available' ], batch_size = 500 , default_change_reason = 'slurm_manage_resource command' )
153
+ logger .debug (f'Creating { len (bulk_create_resource_attribute )} ResourceAttribute records' )
154
+ bulk_create_with_history (bulk_create_resource_attribute , ResourceAttribute , batch_size = 500 , default_change_reason = 'slurm_manage_resource command' )
101
155
except Exception as e :
102
- logger .error (f'Error processing resources info: { str (e )} ' )
156
+ logger .debug (f'Error processing resources info: { str (e )} ' )
103
157
raise
104
- bulk_process_resource_attribute = []
158
+ bulk_update_resource_attribute = []
159
+ bulk_create_resource_attribute = []
105
160
bulk_update_resource = []
106
161
for resource_to_delete in Resource .objects .exclude (name__in = list (processed_resources )).filter (is_available = True , resource_type = compute_node ):
107
162
resource_to_delete .is_available = False
108
163
bulk_update_resource .append (resource_to_delete )
109
- bulk_process_resource_attribute .append (ResourceAttribute (resource = resource_to_delete , value = str (datetime .now ()), resource_attribute_type = service_end_attribute_type ))
164
+ service_end = ResourceAttribute (resource = resource_to_delete , value = str (datetime .now ()), resource_attribute_type = service_end_attribute_type )
165
+ if f"{ resource_to_delete .name } { service_end_attribute_type .name } " in existing_resource_attributes_check :
166
+ service_end .pk = existing_resource_attributes_pk_map [f"{ resource_to_delete .name } { service_end_attribute_type .name } " ]
167
+ bulk_update_resource_attribute .append (service_end )
168
+ else :
169
+ bulk_create_resource_attribute .append (service_end )
110
170
try :
111
- ResourceAttribute .objects .bulk_create (bulk_process_resource_attribute , update_conflicts = True , unique_fields = [], update_fields = ['value' ])
112
- Resource .objects .bulk_create (bulk_update_resource , update_conflicts = True , unique_fields = [], update_fields = ['is_available' ])
171
+ logger .debug (f'Decommissioning { len (bulk_update_resource )} Resource records' )
172
+ bulk_update_with_history (bulk_update_resource , Resource , ['is_available' ], batch_size = 500 , default_change_reason = 'slurm_manage_resource command' )
173
+ logger .debug (f'Creating { len (bulk_create_resource_attribute )} ServiceEnd ResourceAttribute records' )
174
+ bulk_create_with_history (bulk_create_resource_attribute , ResourceAttribute , batch_size = 500 , default_change_reason = 'slurm_manage_resource command' )
175
+ logger .debug (f'Updating { len (bulk_update_resource_attribute )} ServiceEnd ResourceAttribute records' )
176
+ bulk_update_with_history (bulk_update_resource_attribute , ResourceAttribute , ['value' ], batch_size = 500 , default_change_reason = 'slurm_manage_resource command' )
113
177
except Exception as e :
114
178
logger .error (f'Error cleaning up resources: { str (e )} ' )
115
- raise
179
+ raise
0 commit comments