Skip to content

Commit 53097ce

Browse files
authored
Merge branch 'main' into pending-status
2 parents ba20e22 + e99e530 commit 53097ce

File tree

4 files changed

+26
-5
lines changed

4 files changed

+26
-5
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ repos:
2323
- id: python-use-type-annotations
2424
- id: text-unicode-replacement-char
2525
- repo: https://github.com/astral-sh/ruff-pre-commit
26-
rev: v0.4.2
26+
rev: v0.4.4
2727
hooks:
2828
- id: ruff
2929
- id: ruff-format

docs/source/changes.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and
2626
- {pull}`101` adds syncing for local paths as dependencies or products in remote
2727
environments with the same OS.
2828
- {pull}`102` implements a pending status for scheduled but not started tasks.
29+
- {pull}`106` fixes {pull}`99` such that only when there are coiled functions, all ready
30+
tasks are submitted.
31+
- {pull}`107` removes status from `pytask_execute_task_log_start` hook call.
2932

3033
## 0.4.1 - 2024-01-12
3134

src/pytask_parallel/execute.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444

4545

4646
@hookimpl
47-
def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR0915
47+
def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR0912, PLR0915
4848
"""Execute tasks with a parallel backend.
4949
5050
There are three phases while the scheduler has tasks which need to be executed.
@@ -75,6 +75,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
7575

7676
# Get the live execution manager from the registry if it exists.
7777
live_execution = session.config["pm"].get_plugin("live_execution")
78+
any_coiled_task = any(is_coiled_function(task) for task in session.tasks)
7879

7980
# The executor can only be created after the collection to give users the
8081
# possibility to inject their own executors.
@@ -93,7 +94,26 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
9394
while session.scheduler.is_active():
9495
try:
9596
newly_collected_reports = []
96-
ready_tasks = list(session.scheduler.get_ready(10_000))
97+
98+
# If there is any coiled function, the user probably wants to exploit
99+
# adaptive scaling. Thus, we need to submit all ready tasks.
100+
# Unfortunately, all submitted tasks are shown as running although some
101+
# are pending.
102+
#
103+
# Without coiled functions, we submit as many tasks as there are
104+
# available workers since we cannot reliably detect a pending status.
105+
#
106+
# See #98 for more information.
107+
if any_coiled_task:
108+
n_new_tasks = 10_000
109+
else:
110+
n_new_tasks = session.config["n_workers"] - len(running_tasks)
111+
112+
ready_tasks = (
113+
list(session.scheduler.get_ready(n_new_tasks))
114+
if n_new_tasks >= 1
115+
else []
116+
)
97117

98118
for task_signature in ready_tasks:
99119
task = session.dag.nodes[task_signature]["task"]

tests/test_execute.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ def task_2(path: Annotated[Path, Product] = Path("out_2.txt")):
7878

7979

8080
@pytest.mark.end_to_end()
81-
@pytest.mark.skip(reason="See #98")
8281
@pytest.mark.parametrize("parallel_backend", _IMPLEMENTED_BACKENDS)
8382
def test_stop_execution_when_max_failures_is_reached(tmp_path, parallel_backend):
8483
source = """
@@ -107,7 +106,6 @@ def task_3(): time.sleep(3)
107106

108107

109108
@pytest.mark.end_to_end()
110-
@pytest.mark.skip(reason="See #98")
111109
@pytest.mark.parametrize("parallel_backend", _IMPLEMENTED_BACKENDS)
112110
def test_task_priorities(tmp_path, parallel_backend):
113111
source = """

0 commit comments

Comments
 (0)