diff --git a/changes.d/7035.fix.md b/changes.d/7035.fix.md new file mode 100644 index 00000000000..dc25fc20560 --- /dev/null +++ b/changes.d/7035.fix.md @@ -0,0 +1 @@ +Fixed platform subshell expression evaluating more than once for tasks triggered in paused workflow. diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py index ee215b47c9d..7e55c014756 100644 --- a/cylc/flow/exceptions.py +++ b/cylc/flow/exceptions.py @@ -184,13 +184,13 @@ class PlatformError(CylcError): def __init__( self, message: str, - platform_name: str, + platform_name: str | None, *, - ctx: 'Optional[SubFuncContext]' = None, - cmd: Union[str, Sequence[str], None] = None, - ret_code: Optional[int] = None, - out: Optional[str] = None, - err: Optional[str] = None + ctx: 'SubFuncContext | None' = None, + cmd: str | Sequence[str] | None = None, + ret_code: int | None = None, + out: str | None = None, + err: str | None = None ) -> None: self.msg = message self.platform_name = platform_name diff --git a/cylc/flow/platforms.py b/cylc/flow/platforms.py index 87f64292cba..aed340c0423 100644 --- a/cylc/flow/platforms.py +++ b/cylc/flow/platforms.py @@ -128,7 +128,7 @@ def get_platform( Returns: platform: A platform definition dictionary. Uses either - get_platform() or platform_name_from_job_info(), but to the + get_platform() or _platform_name_from_job_info(), but to the user these look the same. This will be None if the platform definition uses a subshell. @@ -169,7 +169,7 @@ def get_platform( task_job_section = task_conf['job'] if 'job' in task_conf else {} task_remote_section = task_conf['remote'] if 'remote' in task_conf else {} return platform_from_name( - platform_name_from_job_info( + _platform_name_from_job_info( glbl_cfg().get(['platforms']), task_job_section, task_remote_section, @@ -329,7 +329,7 @@ def get_platform_from_group( return HOST_SELECTION_METHODS[method](platform_names) -def platform_name_from_job_info( +def _platform_name_from_job_info( platforms: Union[dict, 'OrderedDictWithDefaults'], job: Dict[str, Any], remote: Dict[str, Any], @@ -407,14 +407,14 @@ def platform_name_from_job_info( ... } >>> job = {'batch system': 'slurm'} >>> remote = {'host': 'localhost'} - >>> platform_name_from_job_info(platforms, job, remote) + >>> _platform_name_from_job_info(platforms, job, remote) 'sugar' >>> remote = {} - >>> platform_name_from_job_info(platforms, job, remote) + >>> _platform_name_from_job_info(platforms, job, remote) 'sugar' >>> remote ={'host': 'desktop92'} >>> job = {} - >>> platform_name_from_job_info(platforms, job, remote) + >>> _platform_name_from_job_info(platforms, job, remote) 'desktop92' """ @@ -581,9 +581,9 @@ def fail_if_platform_and_host_conflict( if 'platform' in task_conf and task_conf['platform']: fail_items = [ f'\n * [{section}]{key}' - for section, keys in FORBIDDEN_WITH_PLATFORM.items() + for section, settings in FORBIDDEN_WITH_PLATFORM.items() if section in task_conf - for key, _ in keys.items() + for key in settings.keys() if ( key in task_conf[section] and task_conf[section][key] is not None diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 8f2532558e0..d7197381336 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1341,7 +1341,7 @@ def release_tasks_to_run(self) -> bool: """ pre_prep_tasks: Set['TaskProxy'] = set() if ( - self.stop_mode is None + not self.stop_mode and self.auto_restart_time is None and self.reload_pending is False ): @@ -1350,36 +1350,29 @@ def release_tasks_to_run(self) -> bool: pre_prep_tasks.update(self.pool.tasks_to_trigger_now) self.pool.tasks_to_trigger_now = set() - if self.is_paused: - # finish processing preparing tasks - pre_prep_tasks.update({ - itask for itask in self.pool.get_tasks() - if itask.waiting_on_job_prep - }) - else: + if not self.is_paused: # release queued tasks pre_prep_tasks.update(self.pool.release_queued_tasks()) - elif ( - ( - # Need to get preparing tasks to submit before auto restart - self.should_auto_restart_now() - and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL - ) or ( - # Need to get preparing tasks to submit before reload - self.reload_pending - ) + if ( + # Manually triggered tasks will be preparing and should + # be submitted even if paused (unless workflow is stopping). + self.is_paused and not self.stop_mode + ) or ( + # Need to get preparing tasks to submit before auto restart + self.should_auto_restart_now() + and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL + ) or ( + # Need to get preparing tasks to submit before reload + self.reload_pending ): - # finish processing preparing tasks first - pre_prep_tasks = { - itask for itask in self.pool.get_tasks() - if itask.state(TASK_STATUS_PREPARING) - } + pre_prep_tasks.update({ + itask + for itask in self.pool.get_tasks() + if itask.waiting_on_job_prep + }) # Return, if no tasks to submit. - else: - return False - if not pre_prep_tasks: return False diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 47f0a17e7f5..996c5cb5014 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -56,7 +56,6 @@ NoPlatformsError, PlatformError, PlatformLookupError, - WorkflowConfigError, ) from cylc.flow.hostuserutil import ( get_host, @@ -70,6 +69,7 @@ from cylc.flow.pathutil import get_remote_workflow_run_job_dir from cylc.flow.platforms import ( FORBIDDEN_WITH_PLATFORM, + fail_if_platform_and_host_conflict, get_host_from_platform, get_install_target_from_platform, get_localhost_install_target, @@ -1168,15 +1168,7 @@ def _prep_submit_task_job( # - host exists - eval host_n # remove at: # Cylc8.x - if ( - rtconfig['platform'] is not None and - rtconfig['remote']['host'] is not None - ): - raise WorkflowConfigError( - "A mixture of Cylc 7 (host) and Cylc 8 (platform) " - "logic should not be used. In this case for the task " - f"\"{itask.identity}\" the following are not compatible:\n" - ) + fail_if_platform_and_host_conflict(rtconfig, itask.tdef.name) host_name, platform_name = None, None try: @@ -1196,37 +1188,41 @@ def _prep_submit_task_job( ) except PlatformError as exc: - itask.waiting_on_job_prep = False - itask.summary['platforms_used'][itask.submit_num] = '' - # Retry delays, needed for the try_num - self._create_job_log_path(itask) - self._set_retry_timers(itask, rtconfig) - self._prep_submit_task_job_error( - itask, '(remote host select)', exc - ) + self._prep_submit_task_job_platform_error(itask, rtconfig, exc) return False else: - # host/platform select not ready if host_name is None and platform_name is None: + # host/platform select not ready return None elif ( host_name is None and rtconfig['platform'] and rtconfig['platform'] != platform_name ): - LOG.debug( + msg = ( f"for task {itask.identity}: platform = " - f"{rtconfig['platform']} evaluated as {platform_name}" + f"{rtconfig['platform']} evaluated as '{platform_name}'" ) - + if not platform_name: + self._prep_submit_task_job_platform_error( + itask, rtconfig, msg + ) + return False + LOG.debug(msg) elif ( platform_name is None and rtconfig['remote']['host'] != host_name ): - LOG.debug( + msg = ( f"[{itask}] host = " - f"{rtconfig['remote']['host']} evaluated as {host_name}" + f"{rtconfig['remote']['host']} evaluated as '{host_name}'" ) + if not host_name: + self._prep_submit_task_job_platform_error( + itask, rtconfig, msg + ) + return False + LOG.debug(msg) try: platform = cast( @@ -1293,6 +1289,20 @@ def _prep_submit_task_job( itask.local_job_file_path = local_job_file_path return itask + def _prep_submit_task_job_platform_error( + self, itask: 'TaskProxy', rtconfig: dict, exc: Exception | str + ): + """Helper for self._prep_submit_task_job. On platform selection error. + """ + itask.waiting_on_job_prep = False + itask.summary['platforms_used'][itask.submit_num] = '' + # Retry delays, needed for the try_num + self._create_job_log_path(itask) + self._set_retry_timers(itask, rtconfig) + self._prep_submit_task_job_error( + itask, '(remote host select)', exc + ) + def _prep_submit_task_job_error( self, itask: 'TaskProxy', diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index f9168106de7..4a566b1af51 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -900,8 +900,7 @@ def remove(self, itask: 'TaskProxy', reason: Optional[str] = None) -> None: except KeyError: pass else: - with suppress(KeyError): - self.tasks_to_trigger_now.remove(itask) + self.tasks_to_trigger_now.discard(itask) self.tasks_removed = True self.active_tasks_changed = True if not self.active_tasks[itask.point]: @@ -1042,7 +1041,7 @@ def count_active_tasks(self): return active_task_counter, pre_prep_tasks - def release_queued_tasks(self): + def release_queued_tasks(self) -> set['TaskProxy']: """Return list of queue-released tasks awaiting job prep. Note: @@ -1071,7 +1070,7 @@ def release_queued_tasks(self): self.spawn_on_all_outputs(itask) # Note: released and pre_prep_tasks can overlap - return list(set(released + pre_prep_tasks)) + return set(released + pre_prep_tasks) def get_min_point(self): """Return the minimum cycle point currently in the pool.""" diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index d2a8a916097..45e21c32540 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -37,7 +37,6 @@ Dict, List, NamedTuple, - Optional, Set, TYPE_CHECKING, Tuple, @@ -120,8 +119,8 @@ def __init__(self, workflow, proc_pool, bad_hosts, db_mgr, server): self.server: WorkflowRuntimeServer = server def _subshell_eval( - self, eval_str: str, command_pattern: re.Pattern - ) -> Optional[str]: + self, eval_str: str | None, command_pattern: re.Pattern + ) -> str | None: """Evaluate a platform or host from a possible subshell string. Arguments: @@ -175,7 +174,7 @@ def _subshell_eval( # BACK COMPAT: references to "host" # remove at: # Cylc8.x - def eval_host(self, host_str: str) -> Optional[str]: + def eval_host(self, host_str: str | None) -> str | None: """Evaluate a host from a possible subshell string. Args: @@ -191,7 +190,7 @@ def eval_host(self, host_str: str) -> Optional[str]: return 'localhost' return host - def eval_platform(self, platform_str: str) -> Optional[str]: + def eval_platform(self, platform_str: str | None) -> str | None: """Evaluate a platform from a possible subshell string. Args: diff --git a/tests/functional/job-submission/19-platform_select.t b/tests/functional/job-submission/19-platform_select.t index 6f6b6df9d2b..fb94ecb3676 100755 --- a/tests/functional/job-submission/19-platform_select.t +++ b/tests/functional/job-submission/19-platform_select.t @@ -17,7 +17,7 @@ #------------------------------------------------------------------------------- # Test recovery of a failed host select command for a group of tasks. . "$(dirname "$0")/test_header" -set_test_number 7 +set_test_number 8 install_workflow "${TEST_NAME_BASE}" @@ -29,24 +29,28 @@ logfile="${WORKFLOW_RUN_DIR}/log/scheduler/log" # Check that host = $(cmd) is correctly evaluated grep_ok \ - "1/host_subshell/01:.* evaluated as improbable host name$" \ + "1/host_subshell/01:.* evaluated as 'improbable host name'" \ "${logfile}" grep_ok \ - "1/localhost_subshell/01:.* evaluated as localhost$" \ + "1/localhost_subshell/01:.* evaluated as 'localhost'" \ "${logfile}" # Check that host = `cmd` is correctly evaluated grep_ok \ - "1/host_subshell_backticks/01:.* evaluated as improbable host name$" \ + "1/host_subshell_backticks/01:.* evaluated as 'improbable host name'" \ "${logfile}" # Check that platform = $(cmd) correctly evaluated grep_ok \ - "1/platform_subshell:.* evaluated as improbable platform name$" \ + "1/platform_subshell:.* evaluated as 'improbable platform name'" \ "${logfile}" grep_ok \ - "1/platform_subshell_suffix:.* evaluated as prefix-middle-suffix$" \ + "1/platform_subshell_empty:.* evaluated as ''" \ "${logfile}" -purge +grep_ok \ + "1/platform_subshell_suffix:.* evaluated as 'prefix-middle-suffix'" \ + "${logfile}" + +# purge diff --git a/tests/functional/job-submission/19-platform_select/flow.cylc b/tests/functional/job-submission/19-platform_select/flow.cylc index 902eae68eb7..8a248ebe21e 100644 --- a/tests/functional/job-submission/19-platform_select/flow.cylc +++ b/tests/functional/job-submission/19-platform_select/flow.cylc @@ -17,6 +17,7 @@ purpose = """ localhost_subshell platform_subshell:submit-fail? platform_no_subshell:submit-fail? + platform_subshell_empty:submit-fail? platform_subshell_suffix:submit-fail? host_subshell:submit-fail? host_subshell_backticks:submit-fail? @@ -36,6 +37,9 @@ purpose = """ [[platform_subshell]] platform = $(echo "improbable platform name") + [[platform_subshell_empty]] + platform = $(echo "") + [[platform_subshell_suffix]] platform = prefix-$( echo middle )-suffix diff --git a/tests/functional/job-submission/19-platform_select/reference.log b/tests/functional/job-submission/19-platform_select/reference.log index 1b92c61a069..854550298a4 100644 --- a/tests/functional/job-submission/19-platform_select/reference.log +++ b/tests/functional/job-submission/19-platform_select/reference.log @@ -5,3 +5,4 @@ 1/host_subshell_backticks -triggered off [] in flow 1 1/localhost_subshell -triggered off [] in flow 1 1/platform_subshell_suffix -triggered off [] in flow 1 +1/platform_subshell_empty -triggered off [] in flow 1 diff --git a/tests/functional/platforms/04-host-to-platform-upgrade-fail-inherit.t b/tests/functional/platforms/04-host-to-platform-upgrade-fail-inherit.t index 8812fefc0de..38b9eaee0bd 100644 --- a/tests/functional/platforms/04-host-to-platform-upgrade-fail-inherit.t +++ b/tests/functional/platforms/04-host-to-platform-upgrade-fail-inherit.t @@ -19,7 +19,7 @@ # Child function not valid after inheritance. # Check for task failure at job-submit. . "$(dirname "$0")/test_header" -set_test_number 3 +set_test_number 2 create_test_global_config '' " # non-existent platform @@ -29,16 +29,11 @@ create_test_global_config '' " install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" -# Both of these cases should validate ok. -run_ok "${TEST_NAME_BASE}-validate" \ - cylc validate "${WORKFLOW_NAME}" - # Run the workflow workflow_run_fail "${TEST_NAME_BASE}-run" \ - cylc play --debug --no-detach "${WORKFLOW_NAME}" + cylc play --no-detach "${WORKFLOW_NAME}" -# Grep for inherit-fail to fail later at submit time -grep_ok "WorkflowConfigError:.*1/non-valid-child" \ +grep_ok "Task 'non-valid-child' has the following deprecated '\[runtime\]' setting(s)" \ "${TEST_NAME_BASE}-run.stderr" purge diff --git a/tests/functional/platforms/12-ping-pong/flow.cylc b/tests/functional/platforms/12-ping-pong/flow.cylc index bc238486a1f..578092e5aa5 100644 --- a/tests/functional/platforms/12-ping-pong/flow.cylc +++ b/tests/functional/platforms/12-ping-pong/flow.cylc @@ -14,11 +14,9 @@ platform = $(cat ${CYLC_WORKFLOW_RUN_DIR}/pretend-hall-info) [[toggler]] + # Toggle the platform between localhost and the remote host + # using the content of a file, ${CYLC_WORKFLOW_RUN_DIR}/pretend-hall-info. script = """ - # Toggle the platform between localhost and the remote host - # using the content of a file, ${CYLC_WORKFLOW_RUN_DIR}/pretend-hall-info. - # between localhost and the remote. - if (( $CYLC_TASK_CYCLE_POINT % 2 == 1 )); then echo ${REMOTE_PLATFORM} > ${CYLC_WORKFLOW_RUN_DIR}/pretend-hall-info cylc message -- "changing platform to ${REMOTE_PLATFORM}" diff --git a/tests/functional/platforms/14-trigger-paused-subshell.t b/tests/functional/platforms/14-trigger-paused-subshell.t new file mode 100644 index 00000000000..dc2d5e57415 --- /dev/null +++ b/tests/functional/platforms/14-trigger-paused-subshell.t @@ -0,0 +1,75 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +# Triggering a task with a subshell platform setting while the workflow is +# paused should only evaluate the subshell once. +# Here we have a subshell command that alternates between two platforms on each +# call, to check the platform does not change during a manual trigger. +# https://github.com/cylc/cylc-flow/issues/6994 + +export REQUIRE_PLATFORM='loc:remote' + +. "$(dirname "$0")/test_header" +set_test_number 11 + +install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" + +local_host_name=$(hostname) +remote_host_name=$(cylc config -i "[platforms][${CYLC_TEST_PLATFORM}]hosts") +remote_host_name=$(ssh -oStrictHostKeyChecking=no "$remote_host_name" hostname) +workflow_log="${WORKFLOW_RUN_DIR}/log/scheduler/log" +# shellcheck disable=SC2034 # LOG_SCAN_GREP_OPTS is not unused +LOG_SCAN_GREP_OPTS="-E" + +run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" +cylc play "${WORKFLOW_NAME}" --pause +poll_grep_workflow_log "1/foo:waiting" + +cylc trigger "${WORKFLOW_NAME}//1/foo" + +log_scan "log-grep-01" "$workflow_log" 10 2 \ + "\[1/foo/01:preparing\] submitted to localhost" \ + "\[1/foo/01:.*\] \(received\)${local_host_name}" \ + "\[1/foo/01:.*\] => succeeded" + +cylc trigger "${WORKFLOW_NAME}//1/foo" + +log_scan "log-grep-02" "$workflow_log" 10 2 \ + "\[1/foo/02:preparing\] submitted to ${CYLC_TEST_PLATFORM}" \ + "\[1/foo/02:.*\] \((received|polled)\)${remote_host_name}" \ + "\[1/foo/02:.*\] => succeeded" + +cylc trigger "${WORKFLOW_NAME}//1/foo" + +log_scan "log-grep-03" "$workflow_log" 10 2 \ + "\[1/foo/03:preparing\] submitted to localhost" \ + "\[1/foo/03:.*\] \(received\)${local_host_name}" \ + "\[1/foo/03:.*\] => succeeded" + +cylc stop "${WORKFLOW_NAME}" --now --now + +# Check DB as well: +sqlite3 "${WORKFLOW_RUN_DIR}/.service/db" \ + "SELECT submit_num, platform_name FROM task_jobs" > task_jobs.out +cmp_ok task_jobs.out <<__EOF__ +1|localhost +2|${CYLC_TEST_PLATFORM} +3|localhost +__EOF__ + +poll_workflow_stopped +purge diff --git a/tests/functional/platforms/14-trigger-paused-subshell/flow.cylc b/tests/functional/platforms/14-trigger-paused-subshell/flow.cylc new file mode 100644 index 00000000000..0993ec7b41e --- /dev/null +++ b/tests/functional/platforms/14-trigger-paused-subshell/flow.cylc @@ -0,0 +1,13 @@ +#!jinja2 +[scheduling] + [[graph]] + R1 = foo:never # Allows triggering repeatedly without workflow shutting down + +[runtime] + [[foo]] + platform = $("${CYLC_WORKFLOW_RUN_DIR}/toggle_platform.sh" {{ CYLC_TEST_PLATFORM }}) + # Check what host we have truly run on: + script = cylc message -- "$(hostname)" + + [[[outputs]]] + never = gonna_give_u_up diff --git a/tests/functional/platforms/14-trigger-paused-subshell/toggle_platform.sh b/tests/functional/platforms/14-trigger-paused-subshell/toggle_platform.sh new file mode 100755 index 00000000000..8910d6bc251 --- /dev/null +++ b/tests/functional/platforms/14-trigger-paused-subshell/toggle_platform.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +# Script that outputs the current platform written in the hall file and changes +# it to the other one. This ensures the platform subshell result will alternate +# each time it is called. + +remote_platform=$1 +hall_file="${CYLC_WORKFLOW_RUN_DIR}/pretend_hall_info" + +if [[ ! -f "${hall_file}" ]]; then + current=localhost +else + current=$(cat "$hall_file") +fi + +echo "$current" + +if [[ "$current" == localhost ]]; then + echo "$remote_platform" > "$hall_file" +else + echo localhost > "$hall_file" +fi diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 342bb1d58d0..228ceed49ab 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -627,7 +627,10 @@ async def _complete( """ if schd.is_paused and not allow_paused: - raise Exception("Cannot wait for completion of a paused scheduler") + raise Exception( + "You are waiting for a paused scheduler - if this is intended " + "then use `complete(..., allow_paused=True)`" + ) start_time = time() diff --git a/tests/integration/test_force_trigger.py b/tests/integration/test_force_trigger.py index 1ad169986e0..1e60074b88a 100644 --- a/tests/integration/test_force_trigger.py +++ b/tests/integration/test_force_trigger.py @@ -16,22 +16,17 @@ import asyncio import logging -import sys from typing import ( Any as Fixture, - Callable + Callable, ) -if sys.version_info[:2] >= (3, 11): - from asyncio import timeout as async_timeout -else: - from async_timeout import timeout as async_timeout import pytest from cylc.flow.commands import ( force_trigger_tasks, - reload_workflow, hold, + reload_workflow, resume, run_cmd, set_prereqs_and_outputs, @@ -47,7 +42,17 @@ ) -async def test_trigger_workflow_paused( +async def test_workflow_paused_simple( + one_conf, flow, scheduler, run, complete +): + """It should run triggered tasks even if the workflow is paused.""" + schd: Scheduler = scheduler(flow(one_conf), paused_start=True) + async with run(schd): + await run_cmd(force_trigger_tasks(schd, ['1/one'], ['1'])) + await complete(schd, '1/one', allow_paused=True, timeout=1) + + +async def test_workflow_paused_queues( flow: 'Fixture', scheduler: 'Fixture', start: 'Fixture', @@ -116,7 +121,8 @@ async def test_trigger_workflow_paused( async def test_trigger_group_whilst_paused(flow, scheduler, run, complete): - """Only group start tasks should run whilst the scheduler is paused. + """Only group start tasks should run if the group is triggered whilst + the scheduler is paused. Group start tasks have only off-group dependencies. @@ -124,14 +130,8 @@ async def test_trigger_group_whilst_paused(flow, scheduler, run, complete): prerequisites are satisfied once the workflow is resumed. """ - id_ = flow( - { - 'scheduling': { - 'graph': {'R1': 'a => b => c => d'}, - }, - } - ) - schd = scheduler(id_) + id_ = flow('a => b => c => d') + schd = scheduler(id_, paused_start=True) async with run(schd): # trigger the chain await run_cmd(force_trigger_tasks(schd, ['1/a'], [])) @@ -673,7 +673,7 @@ async def test_trigger_with_sequential_task(flow, scheduler, run, log_filter): schd = scheduler(id_, paused_start=False) async with run(schd): # wait for 2/foo:failed - async with async_timeout(5): + async with asyncio.timeout(5): while True: itask = schd.pool._get_task_by_id('2/foo') if itask and itask.state.outputs.is_message_complete('failed'): @@ -688,33 +688,13 @@ async def test_trigger_with_sequential_task(flow, scheduler, run, log_filter): ) # it should re-run - async with async_timeout(5): + async with asyncio.timeout(5): while True: if log_filter(contains='[2/foo/02:running] (received)failed'): break await asyncio.sleep(0) -async def test_trigger_whilst_paused_preparing(one, run, complete): - """It should run "preparing" tasks even if the workflow is paused. - - Remote init leaves tasks as preparing for a while. These must still be - pushed through to running, even if triggered while the workflow is paused. - - See https://github.com/cylc/cylc-flow/pull/6768 - - """ - async with run(one): - await run_cmd( - force_trigger_tasks( - one, [one.pool.get_tasks()[0].tokens.relative_id], ['1'] - ) - ) - - # 1/a should run even though the workflow is paused. - await complete(one, '1/one', allow_paused=True, timeout=1) - - async def test_trigger_with_task_selector(flow, scheduler, start, monkeypatch): """Test task matching with the trigger command. diff --git a/tests/integration/test_reload.py b/tests/integration/test_reload.py index 86ef0c72490..59d2fc0eacf 100644 --- a/tests/integration/test_reload.py +++ b/tests/integration/test_reload.py @@ -36,7 +36,6 @@ async def test_reload_waits_for_pending_tasks( scheduler, start, monkeypatch, - capture_submission, log_scan, ): """Reload should flush out preparing tasks and pause the workflow. @@ -47,9 +46,11 @@ async def test_reload_waits_for_pending_tasks( See https://github.com/cylc/cylc-flow/issues/5107 """ + # speed up the test: + monkeypatch.setattr('cylc.flow.scheduler.sleep', lambda *_: None) # a simple workflow with a single task id_ = flow('foo') - schd = scheduler(id_, paused_start=False) + schd: Scheduler = scheduler(id_, paused_start=False) # we will artificially push the task through these states state_seq = [ @@ -63,26 +64,26 @@ async def test_reload_waits_for_pending_tasks( # start the scheduler async with start(schd) as log: - # disable submission events to prevent anything from actually running - capture_submission(schd) + foo = schd.pool.get_tasks()[0] # set the task to go through some state changes - def change_state(_=0): - with suppress(IndexError): + def submit_task_jobs(*a, **k): + try: foo.state_reset(state_seq.pop(0)) + except IndexError: + foo.waiting_on_job_prep = False + return [foo] + monkeypatch.setattr( - 'cylc.flow.scheduler.sleep', - change_state + schd.task_job_mgr, 'submit_task_jobs', submit_task_jobs ) # the task should start as waiting - tasks = schd.pool.get_tasks() - assert len(tasks) == 1 - foo = tasks[0] - assert tasks[0].state(TASK_STATUS_WAITING) + assert foo.state(TASK_STATUS_WAITING) # put the task into the preparing state - change_state() + schd.release_tasks_to_run() + assert foo.state(TASK_STATUS_PREPARING) # reload the workflow await commands.run_cmd(commands.reload_workflow(schd)) @@ -96,12 +97,11 @@ def change_state(_=0): [ # the task should have entered the preparing state before the # reload was requested - '[1/foo:waiting(queued)] => preparing(queued)', + '[1/foo:waiting] => preparing', # the reload should have put the workflow into the paused state 'Pausing the workflow: Reloading workflow', # reload should have waited for the task to submit - '[1/foo/00:preparing(queued)]' - ' => submitted(queued)', + '[1/foo/00:preparing] => submitted', # before then reloading the workflow config 'Reloading the workflow definition.', # post-reload the workflow should have been resumed diff --git a/tests/unit/test_platforms.py b/tests/unit/test_platforms.py index c32b4b5e717..ef594acfab5 100644 --- a/tests/unit/test_platforms.py +++ b/tests/unit/test_platforms.py @@ -16,23 +16,31 @@ # # Tests for the platform lookup. +from typing import ( + Any, + Dict, + List, + Optional, + Type, +) + import pytest -from typing import Any, Dict, List, Optional, Type +from cylc.flow.exceptions import ( + GlobalConfigError, + PlatformLookupError, +) from cylc.flow.parsec.OrderedDict import OrderedDictWithDefaults from cylc.flow.platforms import ( + _platform_name_from_job_info, + _validate_single_host, + generic_items_match, + get_install_target_from_platform, + get_install_target_to_platforms_map, get_platform, get_platform_deprecated_settings, is_platform_definition_subshell, - platform_from_name, platform_name_from_job_info, - get_install_target_from_platform, - get_install_target_to_platforms_map, - generic_items_match, - _validate_single_host -) -from cylc.flow.exceptions import ( - PlatformLookupError, - GlobalConfigError + platform_from_name, ) from cylc.flow.run_modes import JOBLESS_MODES @@ -270,11 +278,11 @@ def test_similar_but_not_exact_match(): ] ) def test_platform_name_from_job_info_basic(job, remote, returns): - assert platform_name_from_job_info(PLATFORMS, job, remote) == returns + assert _platform_name_from_job_info(PLATFORMS, job, remote) == returns def test_platform_name_from_job_info_evaluated_hostname(): - result = platform_name_from_job_info( + result = _platform_name_from_job_info( PLATFORMS, {'batch system': 'background'}, {'host': '$(cat tiddles)'}, @@ -294,7 +302,7 @@ def test_platform_name_from_job_info_ordered_dict_comparison(): platform.defaults_['Made up key'] = {} platform.update(PLATFORMS['hpc1-bg']) platforms = {'hpc1-bg': platform, 'dobbie': PLATFORMS['sugar']} - assert platform_name_from_job_info(platforms, job, remote) == 'hpc1-bg' + assert _platform_name_from_job_info(platforms, job, remote) == 'hpc1-bg' # Cases where the error ought to be raised because no matching platform should @@ -321,7 +329,7 @@ def test_platform_name_from_job_info_ordered_dict_comparison(): ) def test_reverse_PlatformLookupError(job, remote): with pytest.raises(PlatformLookupError): - platform_name_from_job_info(PLATFORMS, job, remote) + _platform_name_from_job_info(PLATFORMS, job, remote) # An example of a global config with two Spice systems available @@ -359,7 +367,7 @@ def test_platform_name_from_job_info_two_spices( }, } - assert platform_name_from_job_info(platforms, job, remote) == returns + assert _platform_name_from_job_info(platforms, job, remote) == returns # An example of two platforms with the same hosts and job runner settings @@ -406,7 +414,7 @@ def test_platform_name_from_job_info_similar_platforms( 'job runner': 'background' }, } - assert platform_name_from_job_info(platforms, job, remote) == returns + assert _platform_name_from_job_info(platforms, job, remote) == returns # -----------------------------------------------------------------------------