Skip to content

Schedule only all ready tasks when there are coiled functions. #106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/source/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and
- {pull}`100` adds project management with rye.
- {pull}`101` adds syncing for local paths as dependencies or products in remote
environments with the same OS.
- {pull}`106` fixes {pull}`99` such that only when there are coiled functions, all ready
tasks are submitted.

## 0.4.1 - 2024-01-12

Expand Down
25 changes: 23 additions & 2 deletions src/pytask_parallel/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from pytask import PTask
from pytask import PythonNode
from pytask import Session
from pytask import TaskExecutionStatus
from pytask import console
from pytask import get_marks
from pytask import hookimpl
Expand Down Expand Up @@ -52,6 +53,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
__tracebackhide__ = True
reports = session.execution_reports
running_tasks: dict[str, Future[Any]] = {}
any_coiled_task = any(is_coiled_function(task) for task in session.tasks)

# The executor can only be created after the collection to give users the
# possibility to inject their own executors.
Expand All @@ -66,12 +68,31 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
while session.scheduler.is_active():
try:
newly_collected_reports = []
ready_tasks = list(session.scheduler.get_ready(10_000))

# If there is any coiled function, the user probably wants to exploit
# adaptive scaling. Thus, we need to submit all ready tasks.
# Unfortunately, all submitted tasks are shown as running although some
# are pending.
#
# Without coiled functions, we submit as many tasks as there are
# available workers since we cannot reliably detect a pending status.
#
# See #98 for more information.
if any_coiled_task:
n_new_tasks = 10_000
else:
n_new_tasks = session.config["n_workers"] - len(running_tasks)

ready_tasks = (
list(session.scheduler.get_ready(n_new_tasks))
if n_new_tasks >= 1
else []
)

for task_name in ready_tasks:
task = session.dag.nodes[task_name]["task"]
session.hook.pytask_execute_task_log_start(
session=session, task=task
session=session, task=task, status=TaskExecutionStatus.RUNNING
)
try:
session.hook.pytask_execute_task_setup(
Expand Down
2 changes: 0 additions & 2 deletions tests/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ def task_2(path: Annotated[Path, Product] = Path("out_2.txt")):


@pytest.mark.end_to_end()
@pytest.mark.skip(reason="See #98")
@pytest.mark.parametrize("parallel_backend", _IMPLEMENTED_BACKENDS)
def test_stop_execution_when_max_failures_is_reached(tmp_path, parallel_backend):
source = """
Expand Down Expand Up @@ -107,7 +106,6 @@ def task_3(): time.sleep(3)


@pytest.mark.end_to_end()
@pytest.mark.skip(reason="See #98")
@pytest.mark.parametrize("parallel_backend", _IMPLEMENTED_BACKENDS)
def test_task_priorities(tmp_path, parallel_backend):
source = """
Expand Down
Loading