Skip to content

Commit 0d54622

Browse files
authored
Implement prioritization of tasks. (#52)
1 parent 42d0abe commit 0d54622

File tree

11 files changed

+362
-31
lines changed

11 files changed

+362
-31
lines changed

docs/changes.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ all releases are available on `Anaconda.org <https://anaconda.org/pytask/pytask>
1515
- :gh:`50` implements correct usage of singular and plural in collection logs.
1616
- :gh:`51` allows to invoke pytask through the Python interpreter with ``python -m
1717
pytask`` which will add the current path to ``sys.path``.
18+
- :gh:`52` allows to prioritize tasks with ``pytask.mark.try_last`` and
19+
``pytask.mark.try_first``.
1820
- :gh:`53` changes the theme of the documentation to furo.
1921

2022

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
How to influence the build order
2+
================================
3+
4+
.. important::
5+
6+
This guide shows how to influence the order in which tasks are executed. The feature
7+
should be treated with caution since it might make projects work whose dependencies
8+
and products are not fully specified.
9+
10+
You can influence the order in which tasks are executed by assigning preferences to
11+
tasks. Use ``pytask.mark.try_first`` to execute a task as early as possible and
12+
``pytask.mark.try_last`` to defer execution.
13+
14+
.. note::
15+
16+
For more background, tasks, dependencies and products form a directed acyclic graph
17+
(DAG). A `topological ordering <https://en.wikipedia.org/wiki/Topological_sorting>`_
18+
determines the order in which tasks are executed such that tasks are not run until
19+
their predecessors have been executed. The ordering is not unique and instead of a
20+
random ordering, an ordering is chosen which is compatible with the preferences.
21+
Among multiple tasks which should all be preferred or delayed, a random task is
22+
chosen.
23+
24+
As an example, here are two tasks where the decorator ensures that the output of the
25+
second task is always shown before the output of the first task.
26+
27+
.. code-block:: python
28+
29+
# Content of task_example.py
30+
31+
import pytask
32+
33+
34+
def task_first():
35+
print("I'm second.")
36+
37+
38+
@pytask.mark.try_first
39+
def task_second():
40+
print("I'm first.")
41+
42+
43+
The execution yields (use ``-s`` to make the output visible in the terminal)
44+
45+
.. code-block:: console
46+
47+
$ pytask -s task_example.py
48+
========================= Start pytask session =========================
49+
Platform: win32 -- Python 3.x.x, pytask 0.x.x, pluggy 0.x.x
50+
Root: x
51+
Collected 2 tasks.
52+
53+
I'm first.
54+
.I'm second.
55+
.
56+
========================= 2 succeeded in 0.04s =========================
57+
58+
Replacing ``pytask.mark.try_first`` with ``pytask.mark.try_last`` yields
59+
60+
.. code-block:: python
61+
62+
# Content of task_example.py
63+
64+
import pytask
65+
66+
67+
def task_first():
68+
print("I'm second.")
69+
70+
71+
@pytask.mark.try_last
72+
def task_second():
73+
print("I'm first.")
74+
75+
and
76+
77+
.. code-block:: console
78+
79+
$ pytask -s task_example.py
80+
========================= Start pytask session =========================
81+
Platform: win32 -- Python 3.x.x, pytask 0.x.x, pluggy 0.x.x
82+
Root: x
83+
Collected 2 tasks.
84+
85+
I'm second.
86+
.I'm first.
87+
.
88+
========================= 2 succeeded in 0.03s =========================

docs/how_to_guides/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ pytask.
1010

1111
how_to_write_a_plugin
1212
how_to_extend_parametrizations
13+
how_to_influence_build_order

src/_pytask/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ def pytask_configure(pm, config_from_cli):
9595
config["markers"] = {
9696
"depends_on": "Attach a dependency/dependencies to a task.",
9797
"produces": "Attach a product/products to a task.",
98+
"try_first": "Try to execute a task a early as possible.",
99+
"try_last": "Try to execute a task a late as possible.",
98100
}
99101

100102
pm.hook.pytask_parse_config(

src/_pytask/dag.py

Lines changed: 130 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,154 @@
11
"""Implement some capabilities to deal with the DAG."""
22
import itertools
3+
import pprint
4+
from typing import Dict
5+
from typing import Generator
6+
from typing import Iterable
7+
from typing import List
38

9+
import attr
410
import networkx as nx
11+
from _pytask.mark import get_specific_markers_from_task
12+
from _pytask.nodes import MetaTask
513

614

7-
def sort_tasks_topologically(dag):
8-
"""Sort tasks in topological ordering."""
9-
for node in nx.topological_sort(dag):
10-
if "task" in dag.nodes[node]:
11-
yield node
12-
13-
14-
def descending_tasks(task_name, dag):
15+
def descending_tasks(task_name: str, dag: nx.DiGraph) -> Generator[str, None, None]:
1516
"""Yield only descending tasks."""
1617
for descendant in nx.descendants(dag, task_name):
1718
if "task" in dag.nodes[descendant]:
1819
yield descendant
1920

2021

21-
def task_and_descending_tasks(task_name, dag):
22+
def task_and_descending_tasks(
23+
task_name: str, dag: nx.DiGraph
24+
) -> Generator[str, None, None]:
2225
"""Yield task and descending tasks."""
2326
yield task_name
2427
yield from descending_tasks(task_name, dag)
2528

2629

27-
def node_and_neighbors(dag, node):
30+
def node_and_neighbors(dag: nx.DiGraph, node: str) -> Generator[str, None, None]:
2831
"""Yield node and neighbors which are first degree predecessors and successors.
2932
3033
We cannot use ``dag.neighbors`` as it only considers successors as neighbors in a
3134
DAG.
3235
3336
"""
3437
return itertools.chain([node], dag.predecessors(node), dag.successors(node))
38+
39+
40+
@attr.s
41+
class TopologicalSorter:
42+
"""The topological sorter class.
43+
44+
This class allows to perform a topological sort
45+
46+
"""
47+
48+
dag = attr.ib(converter=nx.DiGraph)
49+
priorities = attr.ib(factory=dict)
50+
_dag_backup = attr.ib(default=None)
51+
_is_prepared = attr.ib(default=False, type=bool)
52+
_nodes_out = attr.ib(factory=set)
53+
54+
@classmethod
55+
def from_dag(cls, dag: nx.DiGraph) -> "TopologicalSorter":
56+
if not dag.is_directed():
57+
raise ValueError("Only directed graphs have a topological order.")
58+
59+
tasks = [
60+
dag.nodes[node]["task"] for node in dag.nodes if "task" in dag.nodes[node]
61+
]
62+
priorities = _extract_priorities_from_tasks(tasks)
63+
64+
task_names = {task.name for task in tasks}
65+
task_dict = {name: nx.ancestors(dag, name) & task_names for name in task_names}
66+
task_dag = nx.DiGraph(task_dict).reverse()
67+
68+
return cls(task_dag, priorities, task_dag.copy())
69+
70+
def prepare(self) -> None:
71+
"""Perform some checks before creating a topological ordering."""
72+
try:
73+
nx.algorithms.cycles.find_cycle(self.dag)
74+
except nx.NetworkXNoCycle:
75+
pass
76+
else:
77+
raise ValueError("The DAG contains cycles.")
78+
79+
self._is_prepared = True
80+
81+
def get_ready(self, n: int = 1):
82+
"""Get up to ``n`` tasks which are ready."""
83+
if not self._is_prepared:
84+
raise ValueError("The TopologicalSorter needs to be prepared.")
85+
if not isinstance(n, int) or n < 1:
86+
raise ValueError("'n' must be an integer greater or equal than 1.")
87+
88+
ready_nodes = {v for v, d in self.dag.in_degree() if d == 0} - self._nodes_out
89+
prioritized_nodes = sorted(
90+
ready_nodes, key=lambda x: self.priorities.get(x, 0)
91+
)[-n:]
92+
93+
self._nodes_out.update(prioritized_nodes)
94+
95+
return prioritized_nodes
96+
97+
def is_active(self) -> bool:
98+
"""Indicate whether there are still tasks left."""
99+
return bool(self.dag.nodes)
100+
101+
def done(self, *nodes: Iterable[str]) -> None:
102+
"""Mark some tasks as done."""
103+
self._nodes_out = self._nodes_out - set(nodes)
104+
self.dag.remove_nodes_from(nodes)
105+
106+
def reset(self) -> None:
107+
"""Reset an exhausted topological sorter."""
108+
self.dag = self._dag_backup.copy()
109+
self._is_prepared = False
110+
self._nodes_out = set()
111+
112+
def static_order(self) -> Generator[str, None, None]:
113+
"""Return a topological order of tasks as an iterable."""
114+
self.prepare()
115+
while self.is_active():
116+
new_task = self.get_ready()[0]
117+
yield new_task
118+
self.done(new_task)
119+
120+
121+
def _extract_priorities_from_tasks(tasks: List[MetaTask]) -> Dict[str, int]:
122+
"""Extract priorities from tasks.
123+
124+
Priorities are set via the ``pytask.mark.try_first`` and ``pytask.mark.try_last``
125+
markers. We recode these markers to numeric values to sort all available by
126+
priorities. ``try_first`` is assigned the highest value such that it has the
127+
rightmost position in the list. Then, we can simply call :meth:`list.pop` on the
128+
list which is far more efficient than ``list.pop(0)``.
129+
130+
"""
131+
priorities = {
132+
task.name: {
133+
"try_first": bool(get_specific_markers_from_task(task, "try_first")),
134+
"try_last": bool(get_specific_markers_from_task(task, "try_last")),
135+
}
136+
for task in tasks
137+
}
138+
tasks_w_mixed_priorities = [
139+
name for name, p in priorities.items() if p["try_first"] and p["try_last"]
140+
]
141+
if tasks_w_mixed_priorities:
142+
raise ValueError(
143+
"'try_first' and 'try_last' cannot be applied on the same task. See the "
144+
f"following tasks for errors:\n\n{pprint.pformat(tasks_w_mixed_priorities)}"
145+
)
146+
147+
# Recode to numeric values for sorting.
148+
numeric_mapping = {(True, False): 1, (False, False): 0, (False, True): -1}
149+
numeric_priorities = {
150+
name: numeric_mapping[(p["try_first"], p["try_last"])]
151+
for name, p in priorities.items()
152+
}
153+
154+
return numeric_priorities

src/_pytask/execute.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from _pytask.config import hookimpl
77
from _pytask.dag import descending_tasks
88
from _pytask.dag import node_and_neighbors
9-
from _pytask.dag import sort_tasks_topologically
9+
from _pytask.dag import TopologicalSorter
1010
from _pytask.database import create_or_update_state
1111
from _pytask.enums import ColorCode
1212
from _pytask.exceptions import ExecutionError
@@ -41,15 +41,16 @@ def pytask_execute_log_start(session):
4141
@hookimpl(trylast=True)
4242
def pytask_execute_create_scheduler(session):
4343
"""Create a scheduler based on topological sorting."""
44-
for node in sort_tasks_topologically(session.dag):
45-
task = session.dag.nodes[node]["task"]
46-
yield task
44+
scheduler = TopologicalSorter.from_dag(session.dag)
45+
scheduler.prepare()
46+
return scheduler
4747

4848

4949
@hookimpl
5050
def pytask_execute_build(session):
5151
"""Execute tasks."""
52-
for task in session.scheduler:
52+
for name in session.scheduler.static_order():
53+
task = session.dag.nodes[name]["task"]
5354
report = session.hook.pytask_execute_task_protocol(session=session, task=task)
5455
session.execution_reports.append(report)
5556
if session.should_stop:
@@ -98,7 +99,7 @@ def pytask_execute_task_setup(session, task):
9899
for product in session.dag.successors(task.name):
99100
node = session.dag.nodes[product]["node"]
100101
if isinstance(node, FilePathNode):
101-
node.value.parent.mkdir(parents=True, exist_ok=True)
102+
node.path.parent.mkdir(parents=True, exist_ok=True)
102103

103104

104105
@hookimpl
@@ -140,8 +141,8 @@ def pytask_execute_task_process_report(session, report):
140141
)
141142
)
142143

143-
session.n_tests_failed += 1
144-
if session.n_tests_failed >= session.config["max_failures"]:
144+
session.n_tasks_failed += 1
145+
if session.n_tasks_failed >= session.config["max_failures"]:
145146
session.should_stop = True
146147

147148
return True

src/_pytask/resolve_dependencies.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,14 @@
66
import networkx as nx
77
from _pytask.config import hookimpl
88
from _pytask.dag import node_and_neighbors
9-
from _pytask.dag import sort_tasks_topologically
109
from _pytask.dag import task_and_descending_tasks
10+
from _pytask.dag import TopologicalSorter
1111
from _pytask.database import State
1212
from _pytask.exceptions import NodeNotFoundError
1313
from _pytask.exceptions import ResolvingDependenciesError
1414
from _pytask.mark import Mark
1515
from _pytask.nodes import reduce_node_name
1616
from _pytask.report import ResolvingDependenciesReport
17-
from _pytask.traceback import remove_traceback_from_exc_info
1817
from pony import orm
1918

2019

@@ -72,14 +71,14 @@ def pytask_resolve_dependencies_create_dag(tasks):
7271
@hookimpl
7372
def pytask_resolve_dependencies_select_execution_dag(dag):
7473
"""Select the tasks which need to be executed."""
75-
tasks = list(sort_tasks_topologically(dag))
74+
scheduler = TopologicalSorter.from_dag(dag)
7675
visited_nodes = []
77-
for task_name in tasks:
76+
77+
for task_name in scheduler.static_order():
7878
if task_name not in visited_nodes:
7979
have_changed = _have_task_or_neighbors_changed(task_name, dag)
8080
if have_changed:
81-
for name in task_and_descending_tasks(task_name, dag):
82-
visited_nodes.append(name)
81+
visited_nodes += list(task_and_descending_tasks(task_name, dag))
8382
else:
8483
dag.nodes[task_name]["task"].markers.append(
8584
Mark("skip_unchanged", (), {})
@@ -191,7 +190,7 @@ def pytask_resolve_dependencies_log(session, report):
191190

192191
click.echo("")
193192

194-
traceback.print_exception(*remove_traceback_from_exc_info(report.exc_info))
193+
traceback.print_exception(*report.exc_info)
195194

196195
click.echo("")
197196
click.echo("=" * tm_width)

src/_pytask/session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class Session:
3535
execution_start = attr.ib(default=None)
3636
execution_end = attr.ib(default=None)
3737

38-
n_tests_failed = attr.ib(default=0)
38+
n_tasks_failed = attr.ib(default=0)
3939
"""Optional[int]: Number of tests which have failed."""
4040
should_stop = attr.ib(default=False)
4141
"""Optional[bool]: Indicates whether the session should be stopped."""

tests/test_capture.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,8 @@ def task_capturing_error():
255255
tmp_path.joinpath("task_capturing_outerr.py").write_text(textwrap.dedent(source))
256256

257257
result = runner.invoke(cli, [tmp_path.as_posix()])
258+
assert "F." in result.output or ".F" in result.output
258259
for content in [
259-
"F.",
260260
"==== Failures ====",
261261
"task_capturing_error failed",
262262
"---------",

0 commit comments

Comments
 (0)