Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ class PlatformError(CylcError):
def __init__(
self,
message: str,
platform_name: str,
platform_name: str | None,
*,
ctx: 'Optional[SubFuncContext]' = None,
cmd: Union[str, Sequence[str], None] = None,
ret_code: Optional[int] = None,
out: Optional[str] = None,
err: Optional[str] = None
ctx: 'SubFuncContext | None' = None,
cmd: str | Sequence[str] | None = None,
ret_code: int | None = None,
out: str | None = None,
err: str | None = None
) -> None:
self.msg = message
self.platform_name = platform_name
Expand Down
16 changes: 8 additions & 8 deletions cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def get_platform(

Returns:
platform: A platform definition dictionary. Uses either
get_platform() or platform_name_from_job_info(), but to the
get_platform() or _platform_name_from_job_info(), but to the
user these look the same. This will be None if the platform
definition uses a subshell.

Expand Down Expand Up @@ -169,7 +169,7 @@ def get_platform(
task_job_section = task_conf['job'] if 'job' in task_conf else {}
task_remote_section = task_conf['remote'] if 'remote' in task_conf else {}
return platform_from_name(
platform_name_from_job_info(
_platform_name_from_job_info(
glbl_cfg().get(['platforms']),
task_job_section,
task_remote_section,
Expand Down Expand Up @@ -329,7 +329,7 @@ def get_platform_from_group(
return HOST_SELECTION_METHODS[method](platform_names)


def platform_name_from_job_info(
def _platform_name_from_job_info(
platforms: Union[dict, 'OrderedDictWithDefaults'],
job: Dict[str, Any],
remote: Dict[str, Any],
Expand Down Expand Up @@ -407,14 +407,14 @@ def platform_name_from_job_info(
... }
>>> job = {'batch system': 'slurm'}
>>> remote = {'host': 'localhost'}
>>> platform_name_from_job_info(platforms, job, remote)
>>> _platform_name_from_job_info(platforms, job, remote)
'sugar'
>>> remote = {}
>>> platform_name_from_job_info(platforms, job, remote)
>>> _platform_name_from_job_info(platforms, job, remote)
'sugar'
>>> remote ={'host': 'desktop92'}
>>> job = {}
>>> platform_name_from_job_info(platforms, job, remote)
>>> _platform_name_from_job_info(platforms, job, remote)
'desktop92'
"""

Expand Down Expand Up @@ -581,9 +581,9 @@ def fail_if_platform_and_host_conflict(
if 'platform' in task_conf and task_conf['platform']:
fail_items = [
f'\n * [{section}]{key}'
for section, keys in FORBIDDEN_WITH_PLATFORM.items()
for section, settings in FORBIDDEN_WITH_PLATFORM.items()
if section in task_conf
for key, _ in keys.items()
for key in settings.keys()
if (
key in task_conf[section] and
task_conf[section][key] is not None
Expand Down
42 changes: 18 additions & 24 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1341,7 +1341,7 @@ def release_tasks_to_run(self) -> bool:
"""
pre_prep_tasks: Set['TaskProxy'] = set()
if (
self.stop_mode is None
not self.stop_mode
and self.auto_restart_time is None
and self.reload_pending is False
):
Expand All @@ -1350,36 +1350,30 @@ def release_tasks_to_run(self) -> bool:
pre_prep_tasks.update(self.pool.tasks_to_trigger_now)
self.pool.tasks_to_trigger_now = set()

if self.is_paused:
# finish processing preparing tasks
pre_prep_tasks.update({
itask for itask in self.pool.get_tasks()
if itask.state(TASK_STATUS_PREPARING)
})
else:
if not self.is_paused:
# release queued tasks
pre_prep_tasks.update(self.pool.release_queued_tasks())

elif (
(
# Need to get preparing tasks to submit before auto restart
self.should_auto_restart_now()
and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL
) or (
# Need to get preparing tasks to submit before reload
self.reload_pending
)
if (
# Manually triggered tasks will be in preparing state and should
# be submitted even if paused (unless stopping).
self.is_paused and not self.stop_mode
) or (
# Need to get preparing tasks to submit before auto restart
self.should_auto_restart_now()
and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL
) or (
# Need to get preparing tasks to submit before reload
self.reload_pending
):
# finish processing preparing tasks first
pre_prep_tasks = {
itask for itask in self.pool.get_tasks()
pre_prep_tasks.update({
itask
for itask in self.pool.get_tasks()
if itask.state(TASK_STATUS_PREPARING)
}
and itask.waiting_on_job_prep
})
Comment on lines 1369 to 1373
Copy link
Member Author

@MetRonnie MetRonnie Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oliver-sanders This PR pre-dated #7054 and had basically the same fix.

However #7054 missed L1376 - I have consolidated the two in this PR.

One difference here is that I did not delete the check for itask.state(TASK_STATUS_PREPARING), I added the check for itask.waiting_on_job_prep in addition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK now waiting on @oliver-sanders to check that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any need to have itask.state(TASK_STATUS_PREPARING) in combination with itask.waiting_on_job_prep, but it can't do any harm.


# Return, if no tasks to submit.
else:
return False

if not pre_prep_tasks:
return False

Expand Down
14 changes: 3 additions & 11 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
NoPlatformsError,
PlatformError,
PlatformLookupError,
WorkflowConfigError,
)
from cylc.flow.hostuserutil import (
get_host,
Expand All @@ -70,6 +69,7 @@
from cylc.flow.pathutil import get_remote_workflow_run_job_dir
from cylc.flow.platforms import (
FORBIDDEN_WITH_PLATFORM,
fail_if_platform_and_host_conflict,
get_host_from_platform,
get_install_target_from_platform,
get_localhost_install_target,
Expand Down Expand Up @@ -1166,15 +1166,7 @@ def _prep_submit_task_job(
# - host exists - eval host_n
# remove at:
# Cylc8.x
if (
rtconfig['platform'] is not None and
rtconfig['remote']['host'] is not None
):
raise WorkflowConfigError(
"A mixture of Cylc 7 (host) and Cylc 8 (platform) "
"logic should not be used. In this case for the task "
f"\"{itask.identity}\" the following are not compatible:\n"
)
fail_if_platform_and_host_conflict(rtconfig, itask.tdef.name)

host_name, platform_name = None, None
try:
Expand Down Expand Up @@ -1204,8 +1196,8 @@ def _prep_submit_task_job(
)
return False
else:
# host/platform select not ready
if host_name is None and platform_name is None:
# host/platform select not ready
return None
elif (
host_name is None
Expand Down
7 changes: 3 additions & 4 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,8 +901,7 @@ def remove(self, itask: 'TaskProxy', reason: Optional[str] = None) -> None:
except KeyError:
pass
else:
with suppress(KeyError):
self.tasks_to_trigger_now.remove(itask)
self.tasks_to_trigger_now.discard(itask)
self.tasks_removed = True
self.active_tasks_changed = True
if not self.active_tasks[itask.point]:
Expand Down Expand Up @@ -1043,7 +1042,7 @@ def count_active_tasks(self):

return active_task_counter, pre_prep_tasks

def release_queued_tasks(self):
def release_queued_tasks(self) -> set['TaskProxy']:
"""Return list of queue-released tasks awaiting job prep.

Note:
Expand Down Expand Up @@ -1072,7 +1071,7 @@ def release_queued_tasks(self):
self.spawn_on_all_outputs(itask)

# Note: released and pre_prep_tasks can overlap
return list(set(released + pre_prep_tasks))
return set(released + pre_prep_tasks)

def get_min_point(self):
"""Return the minimum cycle point currently in the pool."""
Expand Down
9 changes: 4 additions & 5 deletions cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
Dict,
List,
NamedTuple,
Optional,
Set,
TYPE_CHECKING,
Tuple,
Expand Down Expand Up @@ -120,8 +119,8 @@ def __init__(self, workflow, proc_pool, bad_hosts, db_mgr, server):
self.server: WorkflowRuntimeServer = server

def _subshell_eval(
self, eval_str: str, command_pattern: re.Pattern
) -> Optional[str]:
self, eval_str: str | None, command_pattern: re.Pattern
) -> str | None:
"""Evaluate a platform or host from a possible subshell string.

Arguments:
Expand Down Expand Up @@ -175,7 +174,7 @@ def _subshell_eval(
# BACK COMPAT: references to "host"
# remove at:
# Cylc8.x
def eval_host(self, host_str: str) -> Optional[str]:
def eval_host(self, host_str: str | None) -> str | None:
"""Evaluate a host from a possible subshell string.

Args:
Expand All @@ -191,7 +190,7 @@ def eval_host(self, host_str: str) -> Optional[str]:
return 'localhost'
return host

def eval_platform(self, platform_str: str) -> Optional[str]:
def eval_platform(self, platform_str: str | None) -> str | None:
"""Evaluate a platform from a possible subshell string.

Args:
Expand Down
6 changes: 2 additions & 4 deletions tests/functional/platforms/12-ping-pong/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
platform = $(cat ${CYLC_WORKFLOW_RUN_DIR}/pretend-hall-info)

[[toggler]]
# Toggle the platform between localhost and the remote host
# using the content of a file, ${CYLC_WORKFLOW_RUN_DIR}/pretend-hall-info.
script = """
# Toggle the platform between localhost and the remote host
# using the content of a file, ${CYLC_WORKFLOW_RUN_DIR}/pretend-hall-info.
# between localhost and the remote.

if (( $CYLC_TASK_CYCLE_POINT % 2 == 1 )); then
echo ${REMOTE_PLATFORM} > ${CYLC_WORKFLOW_RUN_DIR}/pretend-hall-info
cylc message -- "changing platform to ${REMOTE_PLATFORM}"
Expand Down
72 changes: 72 additions & 0 deletions tests/functional/platforms/14-trigger-paused-subshell.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

# Triggering a task with a subshell platform setting while the workflow is
# paused should only evaluate the subshell once.
# Here we have a subshell command that alternates between two platforms on each
# call, to check the platform does not change during a manual trigger.
# https://github.com/cylc/cylc-flow/issues/6994

export REQUIRE_PLATFORM='loc:remote'

. "$(dirname "$0")/test_header"
set_test_number 11

install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"

local_host_name=$(hostname)
remote_host_name=$(cylc config -i "[platforms][${CYLC_TEST_PLATFORM}]hosts")
workflow_log="${WORKFLOW_RUN_DIR}/log/scheduler/log"

run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
cylc play "${WORKFLOW_NAME}" --pause
poll_grep_workflow_log "1/foo:waiting"

cylc trigger "${WORKFLOW_NAME}//1/foo"

log_scan "log-grep-01" "$workflow_log" 10 2 \
"\[1/foo/01:preparing\] submitted to localhost" \
"\[1/foo/01:.*\] (received)${local_host_name}" \
"\[1/foo/01:.*\] => succeeded"

cylc trigger "${WORKFLOW_NAME}//1/foo"

log_scan "log-grep-02" "$workflow_log" 10 2 \
"\[1/foo/02:preparing\] submitted to ${CYLC_TEST_PLATFORM}" \
"\[1/foo/02:.*\] (received)${remote_host_name}" \
"\[1/foo/02:.*\] => succeeded"

cylc trigger "${WORKFLOW_NAME}//1/foo"

log_scan "log-grep-03" "$workflow_log" 10 2 \
"\[1/foo/03:preparing\] submitted to localhost" \
"\[1/foo/03:.*\] (received)${local_host_name}" \
"\[1/foo/03:.*\] => succeeded"

cylc stop "${WORKFLOW_NAME}" --now --now

# Check DB as well:
sqlite3 "${WORKFLOW_RUN_DIR}/.service/db" \
"SELECT submit_num, platform_name FROM task_jobs" > task_jobs.out
cmp_ok task_jobs.out <<__EOF__
1|localhost
2|${CYLC_TEST_PLATFORM}
3|localhost
__EOF__

poll_workflow_stopped
purge
13 changes: 13 additions & 0 deletions tests/functional/platforms/14-trigger-paused-subshell/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!jinja2
[scheduling]
[[graph]]
R1 = foo:never # Allows triggering repeatedly without workflow shutting down

[runtime]
[[foo]]
platform = $("${CYLC_WORKFLOW_RUN_DIR}/toggle_platform.sh" {{ CYLC_TEST_PLATFORM }})
# Check what host we have truly run on:
script = cylc message -- "$(hostname)"

[[[outputs]]]
never = gonna_give_u_up
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

# Script that outputs the current platform written in the hall file and changes
# it to the other one. This ensures the platform subshell result will alternate
# each time it is called.

remote_platform=$1
hall_file="${CYLC_WORKFLOW_RUN_DIR}/pretend_hall_info"

if [[ ! -f "${hall_file}" ]]; then
current=localhost
else
current=$(cat "$hall_file")
fi

echo "$current"

if [[ "$current" == localhost ]]; then
echo "$remote_platform" > "$hall_file"
else
echo localhost > "$hall_file"
fi
5 changes: 4 additions & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,10 @@ async def _complete(

"""
if schd.is_paused and not allow_paused:
raise Exception("Cannot wait for completion of a paused scheduler")
raise Exception(
"You are waiting for a paused scheduler - if this is intended "
"then use `complete(..., allow_paused=True)`"
)

start_time = time()

Expand Down
Loading