Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/7035.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed platform subshell expression evaluating more than once for tasks triggered in paused workflow.
12 changes: 6 additions & 6 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ class PlatformError(CylcError):
def __init__(
self,
message: str,
platform_name: str,
platform_name: str | None,
*,
ctx: 'Optional[SubFuncContext]' = None,
cmd: Union[str, Sequence[str], None] = None,
ret_code: Optional[int] = None,
out: Optional[str] = None,
err: Optional[str] = None
ctx: 'SubFuncContext | None' = None,
cmd: str | Sequence[str] | None = None,
ret_code: int | None = None,
out: str | None = None,
err: str | None = None
) -> None:
self.msg = message
self.platform_name = platform_name
Expand Down
16 changes: 8 additions & 8 deletions cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def get_platform(

Returns:
platform: A platform definition dictionary. Uses either
get_platform() or platform_name_from_job_info(), but to the
get_platform() or _platform_name_from_job_info(), but to the
user these look the same. This will be None if the platform
definition uses a subshell.

Expand Down Expand Up @@ -169,7 +169,7 @@ def get_platform(
task_job_section = task_conf['job'] if 'job' in task_conf else {}
task_remote_section = task_conf['remote'] if 'remote' in task_conf else {}
return platform_from_name(
platform_name_from_job_info(
_platform_name_from_job_info(
glbl_cfg().get(['platforms']),
task_job_section,
task_remote_section,
Expand Down Expand Up @@ -329,7 +329,7 @@ def get_platform_from_group(
return HOST_SELECTION_METHODS[method](platform_names)


def platform_name_from_job_info(
def _platform_name_from_job_info(
platforms: Union[dict, 'OrderedDictWithDefaults'],
job: Dict[str, Any],
remote: Dict[str, Any],
Expand Down Expand Up @@ -407,14 +407,14 @@ def platform_name_from_job_info(
... }
>>> job = {'batch system': 'slurm'}
>>> remote = {'host': 'localhost'}
>>> platform_name_from_job_info(platforms, job, remote)
>>> _platform_name_from_job_info(platforms, job, remote)
'sugar'
>>> remote = {}
>>> platform_name_from_job_info(platforms, job, remote)
>>> _platform_name_from_job_info(platforms, job, remote)
'sugar'
>>> remote ={'host': 'desktop92'}
>>> job = {}
>>> platform_name_from_job_info(platforms, job, remote)
>>> _platform_name_from_job_info(platforms, job, remote)
'desktop92'
"""

Expand Down Expand Up @@ -581,9 +581,9 @@ def fail_if_platform_and_host_conflict(
if 'platform' in task_conf and task_conf['platform']:
fail_items = [
f'\n * [{section}]{key}'
for section, keys in FORBIDDEN_WITH_PLATFORM.items()
for section, settings in FORBIDDEN_WITH_PLATFORM.items()
if section in task_conf
for key, _ in keys.items()
for key in settings.keys()
if (
key in task_conf[section] and
task_conf[section][key] is not None
Expand Down
43 changes: 18 additions & 25 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1341,7 +1341,7 @@ def release_tasks_to_run(self) -> bool:
"""
pre_prep_tasks: Set['TaskProxy'] = set()
if (
self.stop_mode is None
not self.stop_mode
and self.auto_restart_time is None
and self.reload_pending is False
):
Expand All @@ -1350,36 +1350,29 @@ def release_tasks_to_run(self) -> bool:
pre_prep_tasks.update(self.pool.tasks_to_trigger_now)
self.pool.tasks_to_trigger_now = set()

if self.is_paused:
# finish processing preparing tasks
pre_prep_tasks.update({
itask for itask in self.pool.get_tasks()
if itask.waiting_on_job_prep
})
else:
if not self.is_paused:
# release queued tasks
pre_prep_tasks.update(self.pool.release_queued_tasks())

elif (
(
# Need to get preparing tasks to submit before auto restart
self.should_auto_restart_now()
and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL
) or (
# Need to get preparing tasks to submit before reload
self.reload_pending
)
if (
# Manually triggered tasks will be preparing and should
# be submitted even if paused (unless workflow is stopping).
self.is_paused and not self.stop_mode
) or (
# Need to get preparing tasks to submit before auto restart
self.should_auto_restart_now()
and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL
) or (
# Need to get preparing tasks to submit before reload
self.reload_pending
):
# finish processing preparing tasks first
pre_prep_tasks = {
itask for itask in self.pool.get_tasks()
if itask.state(TASK_STATUS_PREPARING)
}
pre_prep_tasks.update({
itask
for itask in self.pool.get_tasks()
if itask.waiting_on_job_prep
})

# Return, if no tasks to submit.
else:
return False

if not pre_prep_tasks:
return False

Expand Down
58 changes: 34 additions & 24 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
NoPlatformsError,
PlatformError,
PlatformLookupError,
WorkflowConfigError,
)
from cylc.flow.hostuserutil import (
get_host,
Expand All @@ -70,6 +69,7 @@
from cylc.flow.pathutil import get_remote_workflow_run_job_dir
from cylc.flow.platforms import (
FORBIDDEN_WITH_PLATFORM,
fail_if_platform_and_host_conflict,
get_host_from_platform,
get_install_target_from_platform,
get_localhost_install_target,
Expand Down Expand Up @@ -1168,15 +1168,7 @@ def _prep_submit_task_job(
# - host exists - eval host_n
# remove at:
# Cylc8.x
if (
rtconfig['platform'] is not None and
rtconfig['remote']['host'] is not None
):
raise WorkflowConfigError(
"A mixture of Cylc 7 (host) and Cylc 8 (platform) "
"logic should not be used. In this case for the task "
f"\"{itask.identity}\" the following are not compatible:\n"
)
fail_if_platform_and_host_conflict(rtconfig, itask.tdef.name)

host_name, platform_name = None, None
try:
Expand All @@ -1196,37 +1188,41 @@ def _prep_submit_task_job(
)

except PlatformError as exc:
itask.waiting_on_job_prep = False
itask.summary['platforms_used'][itask.submit_num] = ''
# Retry delays, needed for the try_num
self._create_job_log_path(itask)
self._set_retry_timers(itask, rtconfig)
self._prep_submit_task_job_error(
itask, '(remote host select)', exc
)
self._prep_submit_task_job_platform_error(itask, rtconfig, exc)
return False
else:
# host/platform select not ready
if host_name is None and platform_name is None:
# host/platform select not ready
return None
elif (
host_name is None
and rtconfig['platform']
and rtconfig['platform'] != platform_name
):
LOG.debug(
msg = (
f"for task {itask.identity}: platform = "
f"{rtconfig['platform']} evaluated as {platform_name}"
f"{rtconfig['platform']} evaluated as '{platform_name}'"
)

if not platform_name:
self._prep_submit_task_job_platform_error(
itask, rtconfig, msg
)
return False
LOG.debug(msg)
elif (
platform_name is None
and rtconfig['remote']['host'] != host_name
):
LOG.debug(
msg = (
f"[{itask}] host = "
f"{rtconfig['remote']['host']} evaluated as {host_name}"
f"{rtconfig['remote']['host']} evaluated as '{host_name}'"
)
if not host_name:
self._prep_submit_task_job_platform_error(
itask, rtconfig, msg
)
return False
LOG.debug(msg)

try:
platform = cast(
Expand Down Expand Up @@ -1293,6 +1289,20 @@ def _prep_submit_task_job(
itask.local_job_file_path = local_job_file_path
return itask

def _prep_submit_task_job_platform_error(
self, itask: 'TaskProxy', rtconfig: dict, exc: Exception | str
):
"""Helper for self._prep_submit_task_job. On platform selection error.
"""
itask.waiting_on_job_prep = False
itask.summary['platforms_used'][itask.submit_num] = ''
# Retry delays, needed for the try_num
self._create_job_log_path(itask)
self._set_retry_timers(itask, rtconfig)
self._prep_submit_task_job_error(
itask, '(remote host select)', exc
)

def _prep_submit_task_job_error(
self,
itask: 'TaskProxy',
Expand Down
7 changes: 3 additions & 4 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,8 +900,7 @@ def remove(self, itask: 'TaskProxy', reason: Optional[str] = None) -> None:
except KeyError:
pass
else:
with suppress(KeyError):
self.tasks_to_trigger_now.remove(itask)
self.tasks_to_trigger_now.discard(itask)
self.tasks_removed = True
self.active_tasks_changed = True
if not self.active_tasks[itask.point]:
Expand Down Expand Up @@ -1042,7 +1041,7 @@ def count_active_tasks(self):

return active_task_counter, pre_prep_tasks

def release_queued_tasks(self):
def release_queued_tasks(self) -> set['TaskProxy']:
"""Return list of queue-released tasks awaiting job prep.

Note:
Expand Down Expand Up @@ -1071,7 +1070,7 @@ def release_queued_tasks(self):
self.spawn_on_all_outputs(itask)

# Note: released and pre_prep_tasks can overlap
return list(set(released + pre_prep_tasks))
return set(released + pre_prep_tasks)

def get_min_point(self):
"""Return the minimum cycle point currently in the pool."""
Expand Down
9 changes: 4 additions & 5 deletions cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
Dict,
List,
NamedTuple,
Optional,
Set,
TYPE_CHECKING,
Tuple,
Expand Down Expand Up @@ -120,8 +119,8 @@ def __init__(self, workflow, proc_pool, bad_hosts, db_mgr, server):
self.server: WorkflowRuntimeServer = server

def _subshell_eval(
self, eval_str: str, command_pattern: re.Pattern
) -> Optional[str]:
self, eval_str: str | None, command_pattern: re.Pattern
) -> str | None:
"""Evaluate a platform or host from a possible subshell string.

Arguments:
Expand Down Expand Up @@ -175,7 +174,7 @@ def _subshell_eval(
# BACK COMPAT: references to "host"
# remove at:
# Cylc8.x
def eval_host(self, host_str: str) -> Optional[str]:
def eval_host(self, host_str: str | None) -> str | None:
"""Evaluate a host from a possible subshell string.

Args:
Expand All @@ -191,7 +190,7 @@ def eval_host(self, host_str: str) -> Optional[str]:
return 'localhost'
return host

def eval_platform(self, platform_str: str) -> Optional[str]:
def eval_platform(self, platform_str: str | None) -> str | None:
"""Evaluate a platform from a possible subshell string.

Args:
Expand Down
18 changes: 11 additions & 7 deletions tests/functional/job-submission/19-platform_select.t
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#-------------------------------------------------------------------------------
# Test recovery of a failed host select command for a group of tasks.
. "$(dirname "$0")/test_header"
set_test_number 7
set_test_number 8

install_workflow "${TEST_NAME_BASE}"

Expand All @@ -29,24 +29,28 @@ logfile="${WORKFLOW_RUN_DIR}/log/scheduler/log"

# Check that host = $(cmd) is correctly evaluated
grep_ok \
"1/host_subshell/01:.* evaluated as improbable host name$" \
"1/host_subshell/01:.* evaluated as 'improbable host name'" \
"${logfile}"
grep_ok \
"1/localhost_subshell/01:.* evaluated as localhost$" \
"1/localhost_subshell/01:.* evaluated as 'localhost'" \
"${logfile}"

# Check that host = `cmd` is correctly evaluated
grep_ok \
"1/host_subshell_backticks/01:.* evaluated as improbable host name$" \
"1/host_subshell_backticks/01:.* evaluated as 'improbable host name'" \
"${logfile}"

# Check that platform = $(cmd) correctly evaluated
grep_ok \
"1/platform_subshell:.* evaluated as improbable platform name$" \
"1/platform_subshell:.* evaluated as 'improbable platform name'" \
"${logfile}"

grep_ok \
"1/platform_subshell_suffix:.* evaluated as prefix-middle-suffix$" \
"1/platform_subshell_empty:.* evaluated as ''" \
"${logfile}"

purge
grep_ok \
"1/platform_subshell_suffix:.* evaluated as 'prefix-middle-suffix'" \
"${logfile}"

# purge
4 changes: 4 additions & 0 deletions tests/functional/job-submission/19-platform_select/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ purpose = """
localhost_subshell
platform_subshell:submit-fail?
platform_no_subshell:submit-fail?
platform_subshell_empty:submit-fail?
platform_subshell_suffix:submit-fail?
host_subshell:submit-fail?
host_subshell_backticks:submit-fail?
Expand All @@ -36,6 +37,9 @@ purpose = """
[[platform_subshell]]
platform = $(echo "improbable platform name")

[[platform_subshell_empty]]
platform = $(echo "")

[[platform_subshell_suffix]]
platform = prefix-$( echo middle )-suffix

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
1/host_subshell_backticks -triggered off [] in flow 1
1/localhost_subshell -triggered off [] in flow 1
1/platform_subshell_suffix -triggered off [] in flow 1
1/platform_subshell_empty -triggered off [] in flow 1
Loading
Loading