Skip to content

Commit 5aae063

Browse files
job log retrieval: configurable success condition
* Presently, job log retrieval is considered successful when the `job.out` file appears, and additionally the `job.err` file if the job failed. * This commit adds a new configuration to allow additional files to be specified. * This will help with a MO platform where the job epilogue file is written asynchronously, so may only appear some time after the `job.out` file.
1 parent cde2efc commit 5aae063

File tree

4 files changed

+155
-8
lines changed

4 files changed

+155
-8
lines changed

changes.d/7052.feat.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Make the list of files used to determine the success of job log retrieval
2+
commands configurable.

cylc/flow/cfgspec/globalcfg.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,12 @@
160160
logs appearing in their final location (due to the job runner)
161161
you can configure time intervals here to delay the first and
162162
subsequent retrieval attempts.
163+
164+
Job log retrieval will be retried until the expected job files have
165+
been retried, see
166+
:cylc:conf:`
167+
global.cylc[platforms][<platform name>]retrieve job log expected files`
168+
for details
163169
''')
164170
}
165171

@@ -1751,6 +1757,33 @@ def default_for(
17511757
retry delays``.
17521758
{replaces}
17531759
''')
1760+
Conf(
1761+
'retrieve job log expected files',
1762+
VDR.V_STRING_LIST,
1763+
'',
1764+
desc='''
1765+
Configure the log files that job log retrieval is expected to
1766+
return.
1767+
1768+
By default, job log retrieval is considered successful once
1769+
it has retrieved the "job.out" file, and additionally the
1770+
"job.err" file if the job failed.
1771+
1772+
Cylc will repeat job log retrieval according to the configured
1773+
:cylc:conf:`[..]retrieve job logs retry delays` until the
1774+
expected file(s) have been retrieved.
1775+
1776+
This configuration allows you to configure additional files
1777+
to add to this success condition.
1778+
1779+
The purpose of this configuration is to facilitate working with
1780+
files written asynchronously by job runners which may not be
1781+
created until after the job has succeeded. E.g, job report
1782+
or accounting files.
1783+
1784+
.. versionadded:: 8.7.0
1785+
''',
1786+
)
17541787
Conf('tail command template',
17551788
VDR.V_STRING, 'tail -n +1 --follow=name %(filename)s',
17561789
desc=f'''

cylc/flow/task_events_mgr.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
INFO,
3333
getLevelName,
3434
)
35+
import logging
3536
import os
3637
from pathlib import Path
3738
import shlex
@@ -241,9 +242,12 @@ def log_task_job_activity(
241242
# selection command causes a submission failure, or if a waiting task
242243
# expires before a job log directory is otherwise needed.
243244
# (Don't log the exception content, it looks like a bug).
244-
LOG.info(ctx_str)
245+
level = logging.INFO
245246
if ctx.cmd and ctx.ret_code:
246-
LOG.error(ctx_str)
247+
level = logging.ERROR
248+
else:
249+
level = logging.DEBUG
250+
LOG.log(level, ctx_str)
247251

248252

249253
class EventData(Enum):
@@ -1239,15 +1243,17 @@ def _process_job_logs_retrieval(
12391243
# Local target
12401244
cmd.append(get_workflow_run_job_dir(schd.workflow) + "/")
12411245

1246+
expected_log_files = platform['retrieve job log expected files']
1247+
12421248
# schedule command
12431249
self.proc_pool.put_command(
12441250
SubProcContext(
12451251
ctx, cmd, env=dict(os.environ), id_keys=id_keys, host=host
12461252
),
12471253
bad_hosts=self.bad_hosts,
12481254
callback=self._job_logs_retrieval_callback,
1249-
callback_args=[schd],
1250-
callback_255=self._job_logs_retrieval_callback_255
1255+
callback_args=[schd, expected_log_files],
1256+
callback_255=self._job_logs_retrieval_callback_255,
12511257
)
12521258

12531259
def _job_logs_retrieval_callback_255(self, proc_ctx, schd) -> None:
@@ -1258,7 +1264,12 @@ def _job_logs_retrieval_callback_255(self, proc_ctx, schd) -> None:
12581264
timer = self._event_timers[key]
12591265
timer.reset()
12601266

1261-
def _job_logs_retrieval_callback(self, proc_ctx, schd) -> None:
1267+
def _job_logs_retrieval_callback(
1268+
self,
1269+
proc_ctx,
1270+
schd,
1271+
expected_log_files,
1272+
) -> None:
12621273
"""Call back when log job retrieval completes."""
12631274
if (
12641275
(proc_ctx.ret_code and LOG.isEnabledFor(DEBUG))
@@ -1273,6 +1284,7 @@ def _job_logs_retrieval_callback(self, proc_ctx, schd) -> None:
12731284
with suppress(TypeError):
12741285
if id_key.event not in 'succeeded':
12751286
fnames.append(JOB_LOG_ERR)
1287+
fnames.extend(expected_log_files or [])
12761288
fname_oks = {}
12771289
for fname in fnames:
12781290
fname_oks[fname] = os.path.exists(get_task_job_log(

tests/integration/test_task_events_mgr.py

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,34 @@
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616

1717
import logging
18+
from pathlib import Path
1819
from types import SimpleNamespace
1920
from typing import Any as Fixture
2021

2122
import pytest
2223

24+
from cylc.flow import CYLC_LOG
2325
from cylc.flow.data_store_mgr import (
2426
JOBS,
2527
TASK_STATUS_WAITING,
2628
)
2729
from cylc.flow.id import Tokens
30+
from cylc.flow.network.resolvers import TaskMsg
2831
from cylc.flow.run_modes import RunMode
2932
from cylc.flow.scheduler import Scheduler
3033
from cylc.flow.task_events_mgr import (
3134
EventKey,
35+
TaskEventsManager,
3236
TaskJobLogsRetrieveContext,
3337
)
38+
from cylc.flow.task_job_logs import get_task_job_log
3439
from cylc.flow.task_state import (
3540
TASK_STATUS_PREPARING,
3641
TASK_STATUS_SUBMIT_FAILED,
3742
)
3843

39-
from cylc.flow.network.resolvers import TaskMsg
40-
4144
from .test_workflow_events import TEMPLATES
4245

43-
4446
# NOTE: we do not test custom event handlers here because these are tested
4547
# as a part of workflow validation (now also performed by cylc play)
4648

@@ -368,3 +370,101 @@ async def test_event_email_body(
368370
assert f'host: {mod_one.host}' in email_body
369371
assert f'port: {mod_one.server.port}' in email_body
370372
assert f'owner: {mod_one.owner}' in email_body
373+
374+
375+
async def test_job_log_retrieval_success_condition(
376+
one: 'Scheduler', start, caplog, mock_glbl_cfg, log_filter
377+
):
378+
"""Test the success condition for job log retrieval.
379+
380+
Job log retrieval may be retried automatically if configured.
381+
382+
It will stop when the configured list of files has been retrieved.
383+
"""
384+
mock_glbl_cfg(
385+
'cylc.flow.platforms.glbl_cfg',
386+
'''
387+
[platforms]
388+
[[localhost]]
389+
retrieve job log expected files = job.forty-two
390+
'''
391+
)
392+
# capture event timer calls
393+
_remove_event_timer_calls = []
394+
_unset_waiting_event_timer_calls = []
395+
396+
# called if retrieval complete
397+
def _remove_event_timer(id_key):
398+
_remove_event_timer_calls.append(id_key)
399+
400+
# called if retrieval incomplete
401+
def _unset_waiting_event_timer(id_key):
402+
_unset_waiting_event_timer_calls.append(id_key)
403+
404+
def job_logs_retrieve(*retrieved_files):
405+
"""Run simulated job log retrieval.
406+
407+
Any files specified will be created on the filesystem (simulating their
408+
retrieval).
409+
"""
410+
# request job log retrieval
411+
ctx = TaskJobLogsRetrieveContext(
412+
TaskEventsManager.HANDLER_JOB_LOGS_RETRIEVE, 'localhost', None
413+
)
414+
id_key = EventKey(
415+
handler='job-logs-retrieve',
416+
event='succeeded',
417+
message='succeeded',
418+
tokens=one.pool.get_tasks()[0].tokens,
419+
)
420+
one.task_events_mgr._process_job_logs_retrieval(
421+
one,
422+
ctx,
423+
[id_key],
424+
)
425+
426+
# simulate job log retrieval
427+
ctx, _, callback, callback_args, *__ = one.proc_pool.queuings.popleft()
428+
for fname in ('job-activity.log', *retrieved_files):
429+
# create the job log dir and any requested files
430+
job_log_file = Path(get_task_job_log(
431+
one.workflow,
432+
id_key.tokens['cycle'],
433+
id_key.tokens['task'],
434+
id_key.tokens['job'],
435+
fname,
436+
))
437+
job_log_file.parent.mkdir(parents=True, exist_ok=True)
438+
job_log_file.touch()
439+
ctx.ret_code = 0
440+
callback(ctx, *callback_args)
441+
442+
async with start(one):
443+
one.task_events_mgr.remove_event_timer = _remove_event_timer
444+
one.task_events_mgr.unset_waiting_event_timer = (
445+
_unset_waiting_event_timer
446+
)
447+
caplog.set_level(logging.DEBUG, CYLC_LOG)
448+
449+
# run retrieval -> no files are retrieval
450+
caplog.clear()
451+
job_logs_retrieve()
452+
assert log_filter(
453+
contains='File(s) not retrieved: job.forty-two job.out'
454+
)
455+
assert len(_unset_waiting_event_timer_calls) == 1
456+
assert len(_remove_event_timer_calls) == 0
457+
458+
# run retrieval -> "job.forty-two" is retrieved
459+
caplog.clear()
460+
job_logs_retrieve('job.forty-two')
461+
assert log_filter(contains='File(s) not retrieved: job.out')
462+
assert len(_unset_waiting_event_timer_calls) == 2
463+
assert len(_remove_event_timer_calls) == 0
464+
465+
# run retrieval -> "job.out" is retrieved
466+
caplog.clear()
467+
job_logs_retrieve('job.out')
468+
assert not log_filter(contains='File(s) not retrieved')
469+
assert len(_unset_waiting_event_timer_calls) == 2
470+
assert len(_remove_event_timer_calls) == 1

0 commit comments

Comments
 (0)