Skip to content

Commit 37103d6

Browse files
job log retrieval: configurable success condition
* Presently, job log retrieval is considered successful when the `job.out` file appears, and additionally the `job.err` file if the job failed. * This commit adds a new configuration to allow additional files to be specified. * This will help with a MO platform where the job epilogue file is written asynchronously, so may only appear some time after the `job.out` file.
1 parent cde2efc commit 37103d6

File tree

3 files changed

+143
-8
lines changed

3 files changed

+143
-8
lines changed

cylc/flow/cfgspec/globalcfg.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@
160160
logs appearing in their final location (due to the job runner)
161161
you can configure time intervals here to delay the first and
162162
subsequent retrieval attempts.
163+
164+
Job log retrieval will be retried until the expected job files have
165+
been retried, see
166+
:cylc:conf:`global.cylc[platforms][<platform name>]retrieve job log expected files`
167+
for details
163168
''')
164169
}
165170

@@ -1751,6 +1756,33 @@ def default_for(
17511756
retry delays``.
17521757
{replaces}
17531758
''')
1759+
Conf(
1760+
'retrieve job log expected files',
1761+
VDR.V_STRING_LIST,
1762+
'',
1763+
desc='''
1764+
Configure the log files that job log retrieval is expected to
1765+
return.
1766+
1767+
By default, job log retrieval is considered successful once
1768+
it has retrieved the "job.out" file, and additionally the
1769+
"job.err" file if the job failed.
1770+
1771+
Cylc will repeat job log retrieval according to the configured
1772+
:cylc:conf:`[..]retrieve job logs retry delays` until the
1773+
expected file(s) have been retrieved.
1774+
1775+
This configuration allows you to configure additional files
1776+
to add to this success condition.
1777+
1778+
The purpose of this configuration is to facilitate working with
1779+
files written asynchronously by job runners which may not be
1780+
created until after the job has succeeded. E.g, job report
1781+
or accounting files.
1782+
1783+
.. versionadded:: 8.7.0
1784+
''',
1785+
)
17541786
Conf('tail command template',
17551787
VDR.V_STRING, 'tail -n +1 --follow=name %(filename)s',
17561788
desc=f'''

cylc/flow/task_events_mgr.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727

2828
from contextlib import suppress
2929
from enum import Enum
30+
from functools import partial
3031
from logging import (
3132
DEBUG,
3233
INFO,
3334
getLevelName,
3435
)
36+
import logging
3537
import os
3638
from pathlib import Path
3739
import shlex
@@ -241,9 +243,12 @@ def log_task_job_activity(
241243
# selection command causes a submission failure, or if a waiting task
242244
# expires before a job log directory is otherwise needed.
243245
# (Don't log the exception content, it looks like a bug).
244-
LOG.info(ctx_str)
246+
level = logging.INFO
245247
if ctx.cmd and ctx.ret_code:
246-
LOG.error(ctx_str)
248+
level = logging.ERROR
249+
else:
250+
level = logging.DEBUG
251+
LOG.log(level, ctx_str)
247252

248253

249254
class EventData(Enum):
@@ -1212,6 +1217,8 @@ def _process_job_logs_retrieval(
12121217
self.unset_waiting_event_timer(id_key)
12131218
return
12141219

1220+
1221+
12151222
# construct the retrieval command
12161223
ssh_str = str(platform["ssh command"])
12171224
rsync_str = str(platform["retrieve job logs command"])
@@ -1239,15 +1246,17 @@ def _process_job_logs_retrieval(
12391246
# Local target
12401247
cmd.append(get_workflow_run_job_dir(schd.workflow) + "/")
12411248

1249+
expected_log_files = platform['retrieve job log expected files']
1250+
12421251
# schedule command
12431252
self.proc_pool.put_command(
12441253
SubProcContext(
12451254
ctx, cmd, env=dict(os.environ), id_keys=id_keys, host=host
12461255
),
12471256
bad_hosts=self.bad_hosts,
12481257
callback=self._job_logs_retrieval_callback,
1249-
callback_args=[schd],
1250-
callback_255=self._job_logs_retrieval_callback_255
1258+
callback_args=[schd, expected_log_files],
1259+
callback_255=self._job_logs_retrieval_callback_255,
12511260
)
12521261

12531262
def _job_logs_retrieval_callback_255(self, proc_ctx, schd) -> None:
@@ -1258,7 +1267,12 @@ def _job_logs_retrieval_callback_255(self, proc_ctx, schd) -> None:
12581267
timer = self._event_timers[key]
12591268
timer.reset()
12601269

1261-
def _job_logs_retrieval_callback(self, proc_ctx, schd) -> None:
1270+
def _job_logs_retrieval_callback(
1271+
self,
1272+
proc_ctx,
1273+
schd,
1274+
expected_log_files,
1275+
) -> None:
12621276
"""Call back when log job retrieval completes."""
12631277
if (
12641278
(proc_ctx.ret_code and LOG.isEnabledFor(DEBUG))
@@ -1273,6 +1287,7 @@ def _job_logs_retrieval_callback(self, proc_ctx, schd) -> None:
12731287
with suppress(TypeError):
12741288
if id_key.event not in 'succeeded':
12751289
fnames.append(JOB_LOG_ERR)
1290+
fnames.extend(expected_log_files or [])
12761291
fname_oks = {}
12771292
for fname in fnames:
12781293
fname_oks[fname] = os.path.exists(get_task_job_log(

tests/integration/test_task_events_mgr.py

Lines changed: 91 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,34 @@
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616

1717
import logging
18+
from pathlib import Path
1819
from types import SimpleNamespace
1920
from typing import Any as Fixture
2021

2122
import pytest
2223

24+
from cylc.flow import CYLC_LOG
2325
from cylc.flow.data_store_mgr import (
2426
JOBS,
2527
TASK_STATUS_WAITING,
2628
)
2729
from cylc.flow.id import Tokens
30+
from cylc.flow.network.resolvers import TaskMsg
2831
from cylc.flow.run_modes import RunMode
2932
from cylc.flow.scheduler import Scheduler
3033
from cylc.flow.task_events_mgr import (
3134
EventKey,
35+
TaskEventsManager,
3236
TaskJobLogsRetrieveContext,
3337
)
38+
from cylc.flow.task_job_logs import get_task_job_log
3439
from cylc.flow.task_state import (
3540
TASK_STATUS_PREPARING,
3641
TASK_STATUS_SUBMIT_FAILED,
3742
)
3843

39-
from cylc.flow.network.resolvers import TaskMsg
40-
4144
from .test_workflow_events import TEMPLATES
4245

43-
4446
# NOTE: we do not test custom event handlers here because these are tested
4547
# as a part of workflow validation (now also performed by cylc play)
4648

@@ -368,3 +370,89 @@ async def test_event_email_body(
368370
assert f'host: {mod_one.host}' in email_body
369371
assert f'port: {mod_one.server.port}' in email_body
370372
assert f'owner: {mod_one.owner}' in email_body
373+
374+
375+
async def test_job_log_retrieval(one: 'Scheduler', start, monkeypatch, capcall, caplog, mock_glbl_cfg, log_filter):
376+
mock_glbl_cfg(
377+
'cylc.flow.platforms.glbl_cfg',
378+
'''
379+
[platforms]
380+
[[localhost]]
381+
retrieve job log expected files = job.forty-two
382+
'''
383+
)
384+
# capture event timer calls
385+
386+
_remove_event_timer_calls = []
387+
def _remove_event_timer(id_key): # called if retrieval complete
388+
_remove_event_timer_calls.append(id_key)
389+
390+
_unset_waiting_event_timer_calls = []
391+
def _unset_waiting_event_timer(id_key): # called if retrieval incomplete
392+
_unset_waiting_event_timer_calls.append(id_key)
393+
394+
def job_logs_retrieve(*retrieved_files):
395+
"""Run simulated job log retrieval.
396+
397+
Any files specified will be created on the filesystem (simulating their
398+
retrieval).
399+
"""
400+
# request job log retrieval
401+
ctx = TaskJobLogsRetrieveContext(
402+
TaskEventsManager.HANDLER_JOB_LOGS_RETRIEVE, 'localhost', None
403+
)
404+
id_key = EventKey(
405+
handler='job-logs-retrieve',
406+
event='succeeded',
407+
message='succeeded',
408+
tokens=one.pool.get_tasks()[0].tokens,
409+
)
410+
one.task_events_mgr._process_job_logs_retrieval(
411+
one,
412+
ctx,
413+
[id_key],
414+
)
415+
416+
# simulate job log retrieval
417+
ctx, _, callback, callback_args, *__ = one.proc_pool.queuings.popleft()
418+
for fname in ('job-activity.log', *retrieved_files):
419+
# create the job log dir and any requested files
420+
job_log_file = Path(get_task_job_log(
421+
one.workflow,
422+
id_key.tokens['cycle'],
423+
id_key.tokens['task'],
424+
id_key.tokens['job'],
425+
fname,
426+
))
427+
job_log_file.parent.mkdir(parents=True, exist_ok=True)
428+
job_log_file.touch()
429+
ctx.ret_code = 0
430+
callback(ctx, *callback_args)
431+
432+
async with start(one):
433+
one.task_events_mgr.remove_event_timer = _remove_event_timer
434+
one.task_events_mgr.unset_waiting_event_timer = _unset_waiting_event_timer
435+
caplog.set_level(logging.DEBUG, CYLC_LOG)
436+
437+
# run retrieval -> no files are retrieval
438+
caplog.clear()
439+
job_logs_retrieve()
440+
assert log_filter(
441+
contains='File(s) not retrieved: job.forty-two job.out'
442+
)
443+
assert len(_unset_waiting_event_timer_calls) == 1
444+
assert len(_remove_event_timer_calls) == 0
445+
446+
# run retrieval -> "job.forty-two" is retrieved
447+
caplog.clear()
448+
job_logs_retrieve('job.forty-two')
449+
assert log_filter(contains='File(s) not retrieved: job.out')
450+
assert len(_unset_waiting_event_timer_calls) == 2
451+
assert len(_remove_event_timer_calls) == 0
452+
453+
# run retrieval -> "job.out" is retrieved
454+
caplog.clear()
455+
job_logs_retrieve('job.out')
456+
assert not log_filter(contains='File(s) not retrieved')
457+
assert len(_unset_waiting_event_timer_calls) == 2
458+
assert len(_remove_event_timer_calls) == 1

0 commit comments

Comments
 (0)