Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/7035.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed platform subshell expression evaluating more than once for tasks triggered in paused workflow.
12 changes: 6 additions & 6 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ class PlatformError(CylcError):
def __init__(
self,
message: str,
platform_name: str,
platform_name: str | None,
*,
ctx: 'Optional[SubFuncContext]' = None,
cmd: Union[str, Sequence[str], None] = None,
ret_code: Optional[int] = None,
out: Optional[str] = None,
err: Optional[str] = None
ctx: 'SubFuncContext | None' = None,
cmd: str | Sequence[str] | None = None,
ret_code: int | None = None,
out: str | None = None,
err: str | None = None
) -> None:
self.msg = message
self.platform_name = platform_name
Expand Down
16 changes: 8 additions & 8 deletions cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def get_platform(

Returns:
platform: A platform definition dictionary. Uses either
get_platform() or platform_name_from_job_info(), but to the
get_platform() or _platform_name_from_job_info(), but to the
user these look the same. This will be None if the platform
definition uses a subshell.

Expand Down Expand Up @@ -169,7 +169,7 @@ def get_platform(
task_job_section = task_conf['job'] if 'job' in task_conf else {}
task_remote_section = task_conf['remote'] if 'remote' in task_conf else {}
return platform_from_name(
platform_name_from_job_info(
_platform_name_from_job_info(
glbl_cfg().get(['platforms']),
task_job_section,
task_remote_section,
Expand Down Expand Up @@ -329,7 +329,7 @@ def get_platform_from_group(
return HOST_SELECTION_METHODS[method](platform_names)


def platform_name_from_job_info(
def _platform_name_from_job_info(
platforms: Union[dict, 'OrderedDictWithDefaults'],
job: Dict[str, Any],
remote: Dict[str, Any],
Expand Down Expand Up @@ -407,14 +407,14 @@ def platform_name_from_job_info(
... }
>>> job = {'batch system': 'slurm'}
>>> remote = {'host': 'localhost'}
>>> platform_name_from_job_info(platforms, job, remote)
>>> _platform_name_from_job_info(platforms, job, remote)
'sugar'
>>> remote = {}
>>> platform_name_from_job_info(platforms, job, remote)
>>> _platform_name_from_job_info(platforms, job, remote)
'sugar'
>>> remote ={'host': 'desktop92'}
>>> job = {}
>>> platform_name_from_job_info(platforms, job, remote)
>>> _platform_name_from_job_info(platforms, job, remote)
'desktop92'
"""

Expand Down Expand Up @@ -581,9 +581,9 @@ def fail_if_platform_and_host_conflict(
if 'platform' in task_conf and task_conf['platform']:
fail_items = [
f'\n * [{section}]{key}'
for section, keys in FORBIDDEN_WITH_PLATFORM.items()
for section, settings in FORBIDDEN_WITH_PLATFORM.items()
if section in task_conf
for key, _ in keys.items()
for key in settings.keys()
if (
key in task_conf[section] and
task_conf[section][key] is not None
Expand Down
42 changes: 18 additions & 24 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1341,7 +1341,7 @@ def release_tasks_to_run(self) -> bool:
"""
pre_prep_tasks: Set['TaskProxy'] = set()
if (
self.stop_mode is None
not self.stop_mode
and self.auto_restart_time is None
and self.reload_pending is False
):
Expand All @@ -1350,36 +1350,30 @@ def release_tasks_to_run(self) -> bool:
pre_prep_tasks.update(self.pool.tasks_to_trigger_now)
self.pool.tasks_to_trigger_now = set()

if self.is_paused:
# finish processing preparing tasks
pre_prep_tasks.update({
itask for itask in self.pool.get_tasks()
if itask.state(TASK_STATUS_PREPARING)
})
else:
if not self.is_paused:
# release queued tasks
pre_prep_tasks.update(self.pool.release_queued_tasks())

elif (
(
# Need to get preparing tasks to submit before auto restart
self.should_auto_restart_now()
and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL
) or (
# Need to get preparing tasks to submit before reload
self.reload_pending
)
if (
# Manually triggered tasks will be in preparing state and should
# be submitted even if paused (unless stopping).
self.is_paused and not self.stop_mode
) or (
# Need to get preparing tasks to submit before auto restart
self.should_auto_restart_now()
and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL
) or (
# Need to get preparing tasks to submit before reload
self.reload_pending
):
# finish processing preparing tasks first
pre_prep_tasks = {
itask for itask in self.pool.get_tasks()
pre_prep_tasks.update({
itask
for itask in self.pool.get_tasks()
if itask.state(TASK_STATUS_PREPARING)
}
and itask.waiting_on_job_prep
})
Comment on lines 1369 to 1373
Copy link
Member Author

@MetRonnie MetRonnie Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oliver-sanders This PR pre-dated #7054 and had basically the same fix.

However #7054 missed L1376 - I have consolidated the two in this PR.

One difference here is that I did not delete the check for itask.state(TASK_STATUS_PREPARING), I added the check for itask.waiting_on_job_prep in addition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK now waiting on @oliver-sanders to check that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any need to have itask.state(TASK_STATUS_PREPARING) in combination with itask.waiting_on_job_prep, but it can't do any harm.


# Return, if no tasks to submit.
else:
return False

if not pre_prep_tasks:
return False

Expand Down
58 changes: 34 additions & 24 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
NoPlatformsError,
PlatformError,
PlatformLookupError,
WorkflowConfigError,
)
from cylc.flow.hostuserutil import (
get_host,
Expand All @@ -70,6 +69,7 @@
from cylc.flow.pathutil import get_remote_workflow_run_job_dir
from cylc.flow.platforms import (
FORBIDDEN_WITH_PLATFORM,
fail_if_platform_and_host_conflict,
get_host_from_platform,
get_install_target_from_platform,
get_localhost_install_target,
Expand Down Expand Up @@ -1166,15 +1166,7 @@ def _prep_submit_task_job(
# - host exists - eval host_n
# remove at:
# Cylc8.x
if (
rtconfig['platform'] is not None and
rtconfig['remote']['host'] is not None
):
raise WorkflowConfigError(
"A mixture of Cylc 7 (host) and Cylc 8 (platform) "
"logic should not be used. In this case for the task "
f"\"{itask.identity}\" the following are not compatible:\n"
)
fail_if_platform_and_host_conflict(rtconfig, itask.tdef.name)

host_name, platform_name = None, None
try:
Expand All @@ -1194,37 +1186,41 @@ def _prep_submit_task_job(
)

except PlatformError as exc:
itask.waiting_on_job_prep = False
itask.summary['platforms_used'][itask.submit_num] = ''
# Retry delays, needed for the try_num
self._create_job_log_path(itask)
self._set_retry_timers(itask, rtconfig)
self._prep_submit_task_job_error(
itask, '(remote host select)', exc
)
self._prep_submit_task_job_platform_error(itask, rtconfig, exc)
return False
else:
# host/platform select not ready
if host_name is None and platform_name is None:
# host/platform select not ready
return None
elif (
host_name is None
and rtconfig['platform']
and rtconfig['platform'] != platform_name
):
LOG.debug(
msg = (
f"for task {itask.identity}: platform = "
f"{rtconfig['platform']} evaluated as {platform_name}"
f"{rtconfig['platform']} evaluated as '{platform_name}'"
)

if not platform_name:
self._prep_submit_task_job_platform_error(
itask, rtconfig, msg
)
return False
LOG.debug(msg)
elif (
platform_name is None
and rtconfig['remote']['host'] != host_name
):
LOG.debug(
msg = (
f"[{itask}] host = "
f"{rtconfig['remote']['host']} evaluated as {host_name}"
f"{rtconfig['remote']['host']} evaluated as '{host_name}'"
)
if not host_name:
self._prep_submit_task_job_platform_error(
itask, rtconfig, msg
)
return False
LOG.debug(msg)

try:
platform = cast(
Expand Down Expand Up @@ -1291,6 +1287,20 @@ def _prep_submit_task_job(
itask.local_job_file_path = local_job_file_path
return itask

def _prep_submit_task_job_platform_error(
self, itask: 'TaskProxy', rtconfig: dict, exc: Exception | str
):
"""Helper for self._prep_submit_task_job. On platform selection error.
"""
itask.waiting_on_job_prep = False
itask.summary['platforms_used'][itask.submit_num] = ''
# Retry delays, needed for the try_num
self._create_job_log_path(itask)
self._set_retry_timers(itask, rtconfig)
self._prep_submit_task_job_error(
itask, '(remote host select)', exc
)

def _prep_submit_task_job_error(
self,
itask: 'TaskProxy',
Expand Down
7 changes: 3 additions & 4 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,8 +901,7 @@ def remove(self, itask: 'TaskProxy', reason: Optional[str] = None) -> None:
except KeyError:
pass
else:
with suppress(KeyError):
self.tasks_to_trigger_now.remove(itask)
self.tasks_to_trigger_now.discard(itask)
self.tasks_removed = True
self.active_tasks_changed = True
if not self.active_tasks[itask.point]:
Expand Down Expand Up @@ -1043,7 +1042,7 @@ def count_active_tasks(self):

return active_task_counter, pre_prep_tasks

def release_queued_tasks(self):
def release_queued_tasks(self) -> set['TaskProxy']:
"""Return list of queue-released tasks awaiting job prep.

Note:
Expand Down Expand Up @@ -1072,7 +1071,7 @@ def release_queued_tasks(self):
self.spawn_on_all_outputs(itask)

# Note: released and pre_prep_tasks can overlap
return list(set(released + pre_prep_tasks))
return set(released + pre_prep_tasks)

def get_min_point(self):
"""Return the minimum cycle point currently in the pool."""
Expand Down
9 changes: 4 additions & 5 deletions cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
Dict,
List,
NamedTuple,
Optional,
Set,
TYPE_CHECKING,
Tuple,
Expand Down Expand Up @@ -120,8 +119,8 @@ def __init__(self, workflow, proc_pool, bad_hosts, db_mgr, server):
self.server: WorkflowRuntimeServer = server

def _subshell_eval(
self, eval_str: str, command_pattern: re.Pattern
) -> Optional[str]:
self, eval_str: str | None, command_pattern: re.Pattern
) -> str | None:
"""Evaluate a platform or host from a possible subshell string.

Arguments:
Expand Down Expand Up @@ -175,7 +174,7 @@ def _subshell_eval(
# BACK COMPAT: references to "host"
# remove at:
# Cylc8.x
def eval_host(self, host_str: str) -> Optional[str]:
def eval_host(self, host_str: str | None) -> str | None:
"""Evaluate a host from a possible subshell string.

Args:
Expand All @@ -191,7 +190,7 @@ def eval_host(self, host_str: str) -> Optional[str]:
return 'localhost'
return host

def eval_platform(self, platform_str: str) -> Optional[str]:
def eval_platform(self, platform_str: str | None) -> str | None:
"""Evaluate a platform from a possible subshell string.

Args:
Expand Down
18 changes: 11 additions & 7 deletions tests/functional/job-submission/19-platform_select.t
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#-------------------------------------------------------------------------------
# Test recovery of a failed host select command for a group of tasks.
. "$(dirname "$0")/test_header"
set_test_number 7
set_test_number 8

install_workflow "${TEST_NAME_BASE}"

Expand All @@ -29,24 +29,28 @@ logfile="${WORKFLOW_RUN_DIR}/log/scheduler/log"

# Check that host = $(cmd) is correctly evaluated
grep_ok \
"1/host_subshell/01:.* evaluated as improbable host name$" \
"1/host_subshell/01:.* evaluated as 'improbable host name'" \
"${logfile}"
grep_ok \
"1/localhost_subshell/01:.* evaluated as localhost$" \
"1/localhost_subshell/01:.* evaluated as 'localhost'" \
"${logfile}"

# Check that host = `cmd` is correctly evaluated
grep_ok \
"1/host_subshell_backticks/01:.* evaluated as improbable host name$" \
"1/host_subshell_backticks/01:.* evaluated as 'improbable host name'" \
"${logfile}"

# Check that platform = $(cmd) correctly evaluated
grep_ok \
"1/platform_subshell:.* evaluated as improbable platform name$" \
"1/platform_subshell:.* evaluated as 'improbable platform name'" \
"${logfile}"

grep_ok \
"1/platform_subshell_suffix:.* evaluated as prefix-middle-suffix$" \
"1/platform_subshell_empty:.* evaluated as ''" \
"${logfile}"

purge
grep_ok \
"1/platform_subshell_suffix:.* evaluated as 'prefix-middle-suffix'" \
"${logfile}"

# purge
4 changes: 4 additions & 0 deletions tests/functional/job-submission/19-platform_select/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ purpose = """
localhost_subshell
platform_subshell:submit-fail?
platform_no_subshell:submit-fail?
platform_subshell_empty:submit-fail?
platform_subshell_suffix:submit-fail?
host_subshell:submit-fail?
host_subshell_backticks:submit-fail?
Expand All @@ -36,6 +37,9 @@ purpose = """
[[platform_subshell]]
platform = $(echo "improbable platform name")

[[platform_subshell_empty]]
platform = $(echo "")

[[platform_subshell_suffix]]
platform = prefix-$( echo middle )-suffix

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
1/host_subshell_backticks -triggered off [] in flow 1
1/localhost_subshell -triggered off [] in flow 1
1/platform_subshell_suffix -triggered off [] in flow 1
1/platform_subshell_empty -triggered off [] in flow 1
Loading
Loading