Skip to content

Commit f42efd0

Browse files
polling: reset bad hosts on manual poll
* Closes #7029 * This reduces the pressure on #7001 (polling is not attempted if a platform runs out of hosts until the next run of reset-bad hosts) by making it easier for operators to reset bad hosts and recover their workflow in the event of platform outages.
1 parent 1a3014b commit f42efd0

File tree

4 files changed

+103
-14
lines changed

4 files changed

+103
-14
lines changed

cylc/flow/commands.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ async def poll_tasks(schd: 'Scheduler', tasks: Iterable[str]):
430430
matched, unmatched = schd.pool.id_match(ids, only_match_pool=True)
431431
_report_unmatched(unmatched)
432432
itasks = schd.pool.get_itasks(matched)
433-
schd.task_job_mgr.poll_task_jobs(itasks)
433+
schd.task_job_mgr.poll_task_jobs(itasks, manual_request=True)
434434
yield len(unmatched)
435435

436436

cylc/flow/task_job_mgr.py

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,10 @@ def kill_prep_task(self, itask: 'TaskProxy') -> None:
223223
self._prep_submit_task_job_error(itask, '(killed in job prep)', '')
224224

225225
def poll_task_jobs(
226-
self, itasks: 'Iterable[TaskProxy]', msg: str | None = None
226+
self,
227+
itasks: 'Iterable[TaskProxy]',
228+
msg: str | None = None,
229+
manual_request: bool = False,
227230
):
228231
"""Poll jobs of specified tasks.
229232
@@ -245,7 +248,8 @@ def poll_task_jobs(
245248
if itask.state.status != TASK_STATUS_WAITING
246249
],
247250
self._poll_task_jobs_callback,
248-
self._poll_task_jobs_callback_255
251+
self._poll_task_jobs_callback_255,
252+
continue_if_no_good_hosts=manual_request,
249253
)
250254

251255
def prep_submit_task_jobs(
@@ -918,13 +922,34 @@ def _poll_task_job_message_callback(self, itask, cmd_ctx, line):
918922
log_task_job_activity(ctx, self.workflow, itask.point, itask.tdef.name)
919923

920924
def _run_job_cmd(
921-
self, cmd_key, itasks, callback, callback_255
925+
self,
926+
cmd_key,
927+
itasks,
928+
callback,
929+
callback_255,
930+
continue_if_no_good_hosts=False,
922931
):
923932
"""Run job commands, e.g. poll, kill, etc.
924933
925934
Group itasks with their platform_name and host.
926935
Put a job command for each group to the multiprocess pool.
927936
937+
Args:
938+
cmd_key:
939+
Identifier for the command to run.
940+
itasks:
941+
List of task proxies to run the command against.
942+
callback:
943+
Callback to run on command completion.
944+
callback_255:
945+
Callback to run on SSH error.
946+
continue_if_no_good_hosts:
947+
If True, the bad hosts set will be reset in the event that
948+
there are no "good hosts" to run the command on. This should
949+
only be turned on for manual operations, e.g. manual poll. Use
950+
of this option for automatic commands may result in feedback
951+
loops between parallel opererations.
952+
928953
"""
929954
if not itasks:
930955
return
@@ -968,15 +993,22 @@ def _run_job_cmd(
968993
host = get_host_from_platform(
969994
platform, bad_hosts=self.bad_hosts
970995
)
971-
cmd = construct_ssh_cmd(
972-
cmd, platform, host
973-
)
974996
except NoHostsError:
975-
ctx.err = f'No available hosts for {platform["name"]}'
976-
LOG.debug(ctx)
977-
callback_255(ctx, itasks)
978-
continue
997+
if continue_if_no_good_hosts:
998+
# no hosts available for this platform
999+
# -> reset the bad hosts and try again
1000+
self.task_events_mgr.reset_bad_hosts()
1001+
host = get_host_from_platform(
1002+
platform, bad_hosts=self.bad_hosts
1003+
)
1004+
else:
1005+
ctx.err = f'No available hosts for {platform["name"]}'
1006+
LOG.debug(ctx)
1007+
callback_255(ctx, itasks)
1008+
continue
9791009
else:
1010+
1011+
cmd = construct_ssh_cmd(cmd, platform, host)
9801012
ctx = SubProcContext(cmd_key, cmd, host=host)
9811013

9821014
for itask in sorted(itasks, key=lambda task: task.identity):

tests/integration/conftest.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -453,9 +453,7 @@ def capture_polling():
453453
def _disable_polling(schd: 'Scheduler') -> 'Set[TaskProxy]':
454454
polled_tasks: 'Set[TaskProxy]' = set()
455455

456-
def run_job_cmd(
457-
_1, itasks, _3, _4=None
458-
):
456+
def run_job_cmd(_, itasks, *__, **___):
459457
polled_tasks.update(itasks)
460458
return itasks
461459

tests/integration/test_task_job_mgr.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@
2121
from unittest.mock import Mock
2222

2323
from cylc.flow import CYLC_LOG
24+
from cylc.flow.commands import poll_tasks, run_cmd
2425
from cylc.flow.job_runner_mgr import JOB_FILES_REMOVED_MESSAGE
26+
from cylc.flow.platforms import get_platform
2527
from cylc.flow.scheduler import Scheduler
2628
from cylc.flow.task_state import (
2729
TASK_STATUS_FAILED,
2830
TASK_STATUS_RUNNING,
31+
TASK_STATUS_SUBMITTED,
2932
)
3033

3134

@@ -260,6 +263,62 @@ async def test_poll_job_deleted_log_folder(
260263
)
261264

262265

266+
async def test_manual_poll_resets_bad_hosts(
267+
flow,
268+
scheduler,
269+
start,
270+
mock_glbl_cfg,
271+
):
272+
"""Manual poll should be attempted even if all hosts are "bad".
273+
274+
Automated polling will be skipped in the event that all of a platform's
275+
hosts are in the known "bad hosts" list.
276+
277+
Manual poll (similar to manual trigger), will reset the bad hosts list in
278+
this eventuality in order to ensure the operation is attempted anyway.
279+
280+
See https://github.com/cylc/cylc-flow/issues/7029
281+
"""
282+
mock_glbl_cfg(
283+
'cylc.flow.platforms.glbl_cfg',
284+
'''
285+
[platforms]
286+
[[my-remote]]
287+
hosts = abc
288+
''',
289+
)
290+
id_ = flow({
291+
'scheduling': {
292+
'graph': {
293+
'R1': 'foo',
294+
},
295+
},
296+
'runtime': {
297+
'foo': {
298+
'platform': 'my-remote',
299+
},
300+
},
301+
})
302+
schd: 'Scheduler' = scheduler(id_, run_mode='live')
303+
async with start(schd):
304+
# make it look like the task is submitted
305+
itask = schd.pool.get_tasks()[0]
306+
itask.platform = get_platform('my-remote')
307+
itask.state_reset(TASK_STATUS_SUBMITTED)
308+
309+
# mark all of the hosts as bad hosts
310+
schd.task_events_mgr.bad_hosts.update(itask.platform['hosts'])
311+
312+
# request a manual poll
313+
await run_cmd(poll_tasks(schd, [itask.tokens.relative_id]))
314+
315+
# the bad hosts set should be empty
316+
assert not schd.task_events_mgr.bad_hosts
317+
318+
# and a poll command should have been issued
319+
assert len(schd.proc_pool.queuings)
320+
321+
263322
async def test__prep_submit_task_job_impl_handles_all_old_platform_settings(
264323
flow: Fixture,
265324
scheduler: Fixture,

0 commit comments

Comments
 (0)