1
+ import logging
2
+ import os
3
+ import re
4
+ from functools import reduce
5
+ from cProfile import Profile
6
+
7
+ from django .core .management .base import BaseCommand , CommandError
8
+
9
+ from coldfront .core .resource .models import ResourceType , ResourceAttribute , ResourceAttributeType , AttributeType , Resource
10
+ from coldfront .core .project .models import Project
11
+ from coldfront .plugins .slurm .utils import slurm_get_nodes_info
12
+ from django .utils .datetime_safe import datetime
13
+
14
+ logger = logging .getLogger (__name__ )
15
+
16
+ class Command (BaseCommand ):
17
+ help = 'Manage slurm resources from sinfo output'
18
+
19
+ def get_output_from_file (self , file_path ):
20
+ try :
21
+ keys = None
22
+ with open (file_path , 'r' ) as output_file :
23
+ for line in output_file :
24
+ if keys is None :
25
+ keys = re .sub (r'\s+' , ' ' , line ).strip ().lower ().split (' ' )
26
+ else :
27
+ values = re .sub (r'\s+' , ' ' , line ).strip ().split (' ' )
28
+ yield dict (zip (keys , values ))
29
+ except FileNotFoundError :
30
+ print (f"File at { file_path } does not exist. Cant simulate output." )
31
+ except IOError as e :
32
+ print (f"An error occurred: { e } " )
33
+
34
+
35
+ def add_arguments (self , parser ):
36
+ parser .add_argument ("-e" , "--environment" , help = "Environment, use dev to simulate output" )
37
+ parser .add_argument ('--profile' , action = 'store_true' , default = False )
38
+
39
+ def handle (self , * args , ** options ):
40
+ if options .get ('profile' , False ):
41
+ profiler = Profile ()
42
+ profiler .runcall (self ._handle , * args , ** options )
43
+ profiler .print_stats ()
44
+ else :
45
+ self ._handle (* args , ** options )
46
+
47
+ def _handle (self , * args , ** options ):
48
+ def calculate_gpu_count (gres_value ):
49
+ if 'null' in gres_value :
50
+ return 0
51
+ gpu_list = gres_value .split (',' )
52
+ return reduce (lambda x , y : x + y ,[int (gpu_info .split (':' )[2 ].replace ('(S' ,'' )) for gpu_info in gpu_list ])
53
+
54
+ def calculate_cpu_count (row ):
55
+ if row .get ('S:C:T' , None ) is None :
56
+ return 0
57
+ cpu_count = row .get ('S:C:T' ).split (':' )[1 ]
58
+ return int (cpu_count )
59
+
60
+ def calculate_owner_value (project_list , row ):
61
+ owner_name = ''
62
+ project_name_list = [project .title for project in project_list ]
63
+ owner_lists = row .get ('groups' , '' ).split (',' )
64
+ owner_project = [name_owner for name_owner in owner_lists if name_owner in project_name_list ]
65
+ if len (owner_project ) > 0 :
66
+ return owner_project [0 ]
67
+ if {'cluster_users' , 'slurm-admin' }.issubset (set (owner_lists )):
68
+ return 'FASRC'
69
+ return owner_name
70
+
71
+ env = options ['environment' ] or 'production'
72
+ if 'dev' in env :
73
+ output = self .get_output_from_file (os .path .join (os .getcwd (), 'coldfront/plugins/slurm/management/commands/sinfo.txt' ))
74
+ else :
75
+ output = slurm_get_nodes_info ()
76
+ print (f'Running on { env } mode' )
77
+ project_list = Project .objects .all ()
78
+ compute_node , compute_node_created = ResourceType .objects .get_or_create (name = 'Compute Node' , description = 'Compute Node' )
79
+ partition_resource_type , partition_created = ResourceType .objects .get_or_create (name = 'Cluster Partition' , description = 'Cluster Partition' )
80
+ int_attribute_type = AttributeType .objects .get (name = 'Int' )
81
+ text_attribute_type = AttributeType .objects .get (name = 'Text' )
82
+ gpu_count_attribute_type , gpu_count_created = ResourceAttributeType .objects .get_or_create (name = 'GPU Count' , defaults = {'attribute_type' : int_attribute_type })
83
+ core_count_attribute_type , core_count_created = ResourceAttributeType .objects .get_or_create (name = 'Core Count' , defaults = {'attribute_type' : int_attribute_type })
84
+ features_attribute_type , features_created = ResourceAttributeType .objects .get_or_create (name = 'Features' , defaults = {'attribute_type' : text_attribute_type })
85
+ owner_attribute_type , owner_created = ResourceAttributeType .objects .get_or_create (name = 'Owner' , defaults = {'attribute_type' : text_attribute_type })
86
+ service_end_attribute_type , service_end_created = ResourceAttributeType .objects .get_or_create (name = 'ServiceEnd' , defaults = {'attribute_type' : text_attribute_type })
87
+ processed_resources = set ()
88
+ bulk_process_resource_attribute = []
89
+ bulk_update_resource = []
90
+ for row in output :
91
+ new_resource , compute_node_created_created = Resource .objects .get_or_create (name = row ['nodelist' ], defaults = {'is_allocatable' :False , 'resource_type' :compute_node })
92
+ Resource .objects .get_or_create (name = row ['partition' ], defaults = {'resource_type' :partition_resource_type })
93
+ bulk_process_resource_attribute .append (ResourceAttribute (resource_attribute_type = gpu_count_attribute_type , resource = new_resource , value = calculate_gpu_count (row ['gres' ])))
94
+ bulk_process_resource_attribute .append (ResourceAttribute (resource_attribute_type = core_count_attribute_type , resource = new_resource , value = calculate_cpu_count (row )))
95
+ bulk_process_resource_attribute .append (ResourceAttribute (resource_attribute_type = features_attribute_type , resource = new_resource , value = row .get ('avail_features' , '(null)' )))
96
+ bulk_process_resource_attribute .append (ResourceAttribute (resource_attribute_type = owner_attribute_type , resource = new_resource , value = calculate_owner_value (project_list , row )))
97
+ if new_resource .is_available is False :
98
+ bulk_update_resource .append (Resource (name = row ['nodelist' ], is_available = True ))
99
+ bulk_process_resource_attribute .append (ResourceAttribute (resource = new_resource , value = ' ' , resource_attribute_type = service_end_attribute_type ))
100
+ processed_resources .add (new_resource .name )
101
+ ResourceAttribute .objects .bulk_create (bulk_process_resource_attribute , update_conflicts = True , unique_fields = [], update_fields = ['value' ])
102
+ Resource .objects .bulk_create (bulk_update_resource , update_conflicts = True , unique_fields = [], update_fields = ['is_available' ])
103
+ bulk_process_resource_attribute = []
104
+ bulk_update_resource = []
105
+ for resource_to_delete in Resource .objects .exclude (name__in = list (processed_resources )).filter (is_available = True , resource_type = compute_node ):
106
+ bulk_update_resource .append (Resource (name = resource_to_delete .name , is_available = False ))
107
+ bulk_process_resource_attribute .append (ResourceAttribute (resource = resource_to_delete , value = str (datetime .now ()), resource_attribute_type = service_end_attribute_type ))
108
+ ResourceAttribute .objects .bulk_create (bulk_process_resource_attribute , update_conflicts = True , unique_fields = [], update_fields = ['value' ])
109
+ Resource .objects .bulk_create (bulk_update_resource , update_conflicts = True , unique_fields = [], update_fields = ['is_available' ])
0 commit comments