This repository was archived by the owner on Aug 9, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathcharm.py
executable file
·339 lines (280 loc) · 12.5 KB
/
charm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
#!/usr/bin/env python3
# Copyright 2020-2024 Omnivector, LLC.
# See LICENSE file for licensing details.
"""Slurmd Operator Charm."""
import logging
import socket
from dataclasses import fields
from typing import Any, Dict
from charms.operator_libs_linux.v0.juju_systemd_notices import ( # type: ignore[import-untyped]
ServiceStartedEvent,
ServiceStoppedEvent,
SystemdNotices,
)
from interface_slurmctld import (
Slurmctld,
SlurmctldAvailableEvent,
)
from ops import (
ActionEvent,
ActiveStatus,
BlockedStatus,
CharmBase,
ConfigChangedEvent,
InstallEvent,
StoredState,
UpdateStatusEvent,
WaitingStatus,
main,
)
from slurm_conf_editor import Node, Partition
from slurmd_ops import SlurmdManager
from utils import slurmd
logger = logging.getLogger(__name__)
class SlurmdCharm(CharmBase):
"""Slurmd lifecycle events."""
_stored = StoredState()
def __init__(self, *args, **kwargs):
"""Init _stored attributes and interfaces, observe events."""
super().__init__(*args, **kwargs)
self._stored.set_default(
munge_key=str(),
new_node=True,
nhc_conf=str(),
nhc_params=str(),
slurm_installed=False,
slurmctld_available=False,
slurmctld_host=str(),
user_supplied_node_parameters={},
user_supplied_partition_parameters={},
)
self._slurmd_manager = SlurmdManager()
self._slurmctld = Slurmctld(self, "slurmctld")
self._systemd_notices = SystemdNotices(self, ["slurmd"])
event_handler_bindings = {
self.on.install: self._on_install,
self.on.update_status: self._on_update_status,
self.on.config_changed: self._on_config_changed,
self._slurmctld.on.slurmctld_available: self._on_slurmctld_available,
self._slurmctld.on.slurmctld_unavailable: self._on_slurmctld_unavailable,
self.on.service_slurmd_started: self._on_slurmd_started,
self.on.service_slurmd_stopped: self._on_slurmd_stopped,
self.on.node_configured_action: self._on_node_configured_action,
self.on.node_config_action: self._on_node_config_action_event,
}
for event, handler in event_handler_bindings.items():
self.framework.observe(event, handler)
def _on_install(self, event: InstallEvent) -> None:
"""Perform installation operations for slurmd."""
self.unit.status = WaitingStatus("Installing slurmd")
if self._slurmd_manager.install():
self.unit.set_workload_version(self._slurmd_manager.version())
slurmd.override_service()
self._systemd_notices.subscribe()
self._stored.slurm_installed = True
else:
self.unit.status = BlockedStatus("Error installing slurmd")
event.defer()
self._check_status()
def _on_config_changed(self, event: ConfigChangedEvent) -> None:
"""Handle charm configuration changes."""
if nhc_conf := self.model.config.get("nhc-conf"):
if nhc_conf != self._stored.nhc_conf:
self._stored.nhc_conf = nhc_conf
self._slurmd_manager.render_nhc_config(nhc_conf)
user_supplied_partition_parameters = self.model.config.get("partition-config")
if self.model.unit.is_leader():
if user_supplied_partition_parameters is not None:
tmp_params = {}
try:
tmp_params = {
item.split("=")[0]: item.split("=")[1]
for item in str(user_supplied_partition_parameters).split()
}
except IndexError:
logger.error(
"Error parsing partition-config. Please use KEY1=VALUE KEY2=VALUE."
)
return
# Validate the user supplied params are valid params.
for parameter in tmp_params:
if parameter not in [
partition_parameter.name for partition_parameter in fields(Partition)
]:
logger.error(
f"Invalid user supplied partition configuration parameter: {parameter}."
)
return
self._stored.user_supplied_partition_parameters = tmp_params
if self._slurmctld.is_joined:
self._slurmctld.set_partition()
def _on_update_status(self, event: UpdateStatusEvent) -> None:
"""Handle update status."""
self._check_status()
def _on_slurmctld_available(self, event: SlurmctldAvailableEvent) -> None:
"""Retrieve the slurmctld_available event data and store in charm state."""
if self._stored.slurm_installed is not True:
event.defer()
return
if (slurmctld_host := event.slurmctld_host) != self._stored.slurmctld_host:
if slurmctld_host is not None:
slurmd.override_default(slurmctld_host)
self._stored.slurmctld_host = slurmctld_host
logger.debug(f"slurmctld_host={slurmctld_host}")
else:
logger.debug("'slurmctld_host' not in event data.")
return
if (munge_key := event.munge_key) != self._stored.munge_key:
if munge_key is not None:
self._stored.munge_key = munge_key
self._slurmd_manager.write_munge_key(munge_key)
logger.debug(f"munge_key={munge_key}")
else:
logger.debug("'munge_key' not in event data.")
return
if (nhc_params := event.nhc_params) != self._stored.nhc_params:
if nhc_params is not None:
self._stored.nhc_params = nhc_params
self._slurmd_manager.render_nhc_wrapper(nhc_params)
logger.debug(f"nhc_params={nhc_params}")
else:
logger.debug("'nhc_params' not in event data.")
return
logger.debug(
"#### Storing slurmctld_available event relation data in charm StoredState." ""
)
self._stored.slurmctld_available = True
# Restart munged and slurmd after we write the event data to their respective locations.
if self._slurmd_manager.restart_munged():
logger.debug("## Munge restarted successfully")
else:
logger.error("## Unable to restart munge")
slurmd.restart()
self._check_status()
def _on_slurmctld_unavailable(self, event) -> None:
"""Stop slurmd and set slurmctld_available = False when we lose slurmctld."""
logger.debug("## Slurmctld unavailable")
self._stored.slurmctld_available = False
self._stored.nhc_params = ""
self._stored.munge_key = ""
self._stored.slurmctld_host = ""
slurmd.stop()
self._check_status()
def _on_slurmd_started(self, _: ServiceStartedEvent) -> None:
"""Handle event emitted by systemd after slurmd daemon successfully starts."""
self.unit.status = ActiveStatus()
def _on_slurmd_stopped(self, _: ServiceStoppedEvent) -> None:
"""Handle event emitted by systemd after slurmd daemon is stopped."""
self.unit.status = BlockedStatus("slurmd not running")
def _on_node_configured_action(self, _: ActionEvent) -> None:
"""Remove node from DownNodes and mark as active."""
# Trigger reconfiguration of slurmd node.
self._new_node = False
self._slurmctld.set_node()
slurmd.restart()
logger.debug("### This node is not new anymore")
def _on_show_nhc_config(self, event: ActionEvent) -> None:
"""Show current nhc.conf."""
nhc_conf = self._slurmd_manager.get_nhc_config()
event.set_results({"nhc.conf": nhc_conf})
def _on_node_config_action_event(self, event: ActionEvent) -> None:
"""Get or set the user_supplied_node_conifg.
Return the node config if the `node-config` parameter is not specified, otherwise
parse, validate, and store the input of the `node-config` parameter in stored state.
Lastly, update slurmctld if there are updates to the node config.
"""
valid_config = True
config_supplied = False
if (user_supplied_node_parameters := event.params.get("parameters")) is not None:
config_supplied = True
# Parse the user supplied node-config.
node_parameters_tmp = {}
try:
node_parameters_tmp = {
item.split("=")[0]: item.split("=")[1]
for item in user_supplied_node_parameters.split()
}
except IndexError:
logger.error(
"Invalid node parameters specified. Please use KEY1=VAL KEY2=VAL format."
)
valid_config = False
# Validate the user supplied params are valid params.
for param in node_parameters_tmp:
if param not in [node_param.name for node_param in fields(Node)]:
logger.error(f"Invalid user supplied node parameter: {param}.")
valid_config = False
# Validate the user supplied params have valid keys.
for k, v in node_parameters_tmp.items():
if v == "":
logger.error(f"Invalid user supplied node parameter: {k}={v}.")
valid_config = False
if valid_config:
if (node_parameters := node_parameters_tmp) != self._user_supplied_node_parameters:
self._user_supplied_node_parameters = node_parameters
self._slurmctld.set_node()
results = {
"node-parameters": " ".join(
[f"{k}={v}" for k, v in self.get_node()["node_parameters"].items()]
)
}
if config_supplied is True:
results["user-supplied-node-parameters-accepted"] = f"{valid_config}"
event.set_results(results)
@property
def hostname(self) -> str:
"""Return the hostname."""
return socket.gethostname().split(".")[0]
@property
def _user_supplied_node_parameters(self) -> dict[Any, Any]:
"""Return the user_supplied_node_parameters from stored state."""
return self._stored.user_supplied_node_parameters # type: ignore[return-value]
@_user_supplied_node_parameters.setter
def _user_supplied_node_parameters(self, node_parameters: dict) -> None:
"""Set the node_parameters in stored state."""
self._stored.user_supplied_node_parameters = node_parameters
@property
def _new_node(self) -> bool:
"""Get the new_node from stored state."""
return True if self._stored.new_node is True else False
@_new_node.setter
def _new_node(self, new_node: bool) -> None:
"""Set the new_node in stored state."""
self._stored.new_node = new_node
def _check_status(self) -> bool:
"""Check if we have all needed components.
- slurmd installed
- slurmctld available and working
- munge key configured and working
"""
if self._stored.slurm_installed is not True:
self.unit.status = BlockedStatus("Error installing slurmd")
return False
if self._slurmctld.is_joined is not True:
self.unit.status = BlockedStatus("Need relations: slurmctld")
return False
if self._stored.slurmctld_available is not True:
self.unit.status = WaitingStatus("Waiting on: slurmctld")
return False
if not self._slurmd_manager.check_munged():
self.unit.status = BlockedStatus("Error configuring munge key")
return False
return True
def get_node(self) -> Dict[Any, Any]:
"""Get the node from stored state."""
node = {
"node_parameters": {
**self._slurmd_manager.get_node_config(),
**self._user_supplied_node_parameters,
},
"new_node": self._new_node,
}
logger.debug(f"Node Configuration: {node}")
return node
def get_partition(self) -> Dict[Any, Any]:
"""Return the partition."""
partition = {self.app.name: {**{"State": "UP"}, **self._stored.user_supplied_partition_parameters}} # type: ignore[dict-item]
logger.debug(f"partition={partition}")
return partition
if __name__ == "__main__": # pragma: nocover
main.main(SlurmdCharm)