3
3
4
4
import itertools
5
5
import sys
6
- from typing import Sequence
7
6
from typing import TYPE_CHECKING
8
7
9
8
import networkx as nx
10
9
from _pytask .config import hookimpl
11
- from _pytask .config import IS_FILE_SYSTEM_CASE_SENSITIVE
12
10
from _pytask .console import ARROW_DOWN_ICON
13
11
from _pytask .console import console
14
12
from _pytask .console import FILE_ICON
23
21
from _pytask .database_utils import State
24
22
from _pytask .exceptions import ResolvingDependenciesError
25
23
from _pytask .mark import Mark
26
- from _pytask .mark_utils import get_marks
27
- from _pytask .mark_utils import has_mark
28
24
from _pytask .node_protocols import PNode
29
25
from _pytask .node_protocols import PTask
30
26
from _pytask .nodes import PythonNode
@@ -48,13 +44,12 @@ def pytask_dag(session: Session) -> bool | None:
48
44
session = session , tasks = session .tasks
49
45
)
50
46
session .hook .pytask_dag_modify_dag (session = session , dag = session .dag )
51
- session .hook .pytask_dag_validate_dag (session = session , dag = session .dag )
52
47
session .hook .pytask_dag_select_execution_dag (session = session , dag = session .dag )
53
48
54
49
except Exception : # noqa: BLE001
55
50
report = DagReport .from_exception (sys .exc_info ())
56
51
session .hook .pytask_dag_log (session = session , report = report )
57
- session .dag_reports = report
52
+ session .dag_report = report
58
53
59
54
raise ResolvingDependenciesError from None
60
55
@@ -63,7 +58,7 @@ def pytask_dag(session: Session) -> bool | None:
63
58
64
59
65
60
@hookimpl
66
- def pytask_dag_create_dag (tasks : list [PTask ]) -> nx .DiGraph :
61
+ def pytask_dag_create_dag (session : Session , tasks : list [PTask ]) -> nx .DiGraph :
67
62
"""Create the DAG from tasks, dependencies and products."""
68
63
69
64
def _add_dependency (dag : nx .DiGraph , task : PTask , node : PNode ) -> None :
@@ -101,6 +96,7 @@ def _add_product(dag: nx.DiGraph, task: PTask, node: PNode) -> None:
101
96
)
102
97
103
98
_check_if_dag_has_cycles (dag )
99
+ _check_if_tasks_have_the_same_products (dag , session .config ["paths" ])
104
100
105
101
return dag
106
102
@@ -123,13 +119,6 @@ def pytask_dag_select_execution_dag(session: Session, dag: nx.DiGraph) -> None:
123
119
)
124
120
125
121
126
- @hookimpl
127
- def pytask_dag_validate_dag (session : Session , dag : nx .DiGraph ) -> None :
128
- """Validate the DAG."""
129
- _check_if_root_nodes_are_available (dag , session .config ["paths" ])
130
- _check_if_tasks_have_the_same_products (dag , session .config ["paths" ])
131
-
132
-
133
122
def _have_task_or_neighbors_changed (
134
123
session : Session , dag : nx .DiGraph , task : PTask
135
124
) -> bool :
@@ -198,98 +187,6 @@ def _format_cycles(dag: nx.DiGraph, cycles: list[tuple[str, ...]]) -> str:
198
187
return "\n " .join (lines [:- 1 ])
199
188
200
189
201
- _TEMPLATE_ERROR : str = (
202
- "Some dependencies do not exist or are not produced by any task. See the following "
203
- "tree which shows which dependencies are missing for which tasks.\n \n {}"
204
- )
205
- if IS_FILE_SYSTEM_CASE_SENSITIVE :
206
- _TEMPLATE_ERROR += (
207
- "\n \n (Hint: Your file-system is case-sensitive. Check the paths' "
208
- "capitalization carefully.)"
209
- )
210
-
211
-
212
- def _check_if_root_nodes_are_available (dag : nx .DiGraph , paths : Sequence [Path ]) -> None :
213
- __tracebackhide__ = True
214
-
215
- missing_root_nodes = []
216
- is_task_skipped : dict [str , bool ] = {}
217
-
218
- for node in dag .nodes :
219
- is_node = "node" in dag .nodes [node ]
220
- is_without_parents = len (list (dag .predecessors (node ))) == 0
221
- if is_node and is_without_parents :
222
- are_all_tasks_skipped , is_task_skipped = _check_if_tasks_are_skipped (
223
- node , dag , is_task_skipped
224
- )
225
- if not are_all_tasks_skipped :
226
- try :
227
- node_exists = dag .nodes [node ]["node" ].state ()
228
- except Exception as e : # noqa: BLE001
229
- msg = _format_exception_from_failed_node_state (node , dag , paths )
230
- raise ResolvingDependenciesError (msg ) from e
231
- if not node_exists :
232
- missing_root_nodes .append (node )
233
-
234
- if missing_root_nodes :
235
- dictionary = {}
236
- for node in missing_root_nodes :
237
- short_node_name = format_node_name (dag .nodes [node ]["node" ], paths ).plain
238
- not_skipped_successors = [
239
- task for task in dag .successors (node ) if not is_task_skipped [task ]
240
- ]
241
- short_successors = reduce_names_of_multiple_nodes (
242
- not_skipped_successors , dag , paths
243
- )
244
- dictionary [short_node_name ] = short_successors
245
-
246
- text = _format_dictionary_to_tree (dictionary , "Missing dependencies:" )
247
- raise ResolvingDependenciesError (_TEMPLATE_ERROR .format (text )) from None
248
-
249
-
250
- def _format_exception_from_failed_node_state (
251
- node_signature : str , dag : nx .DiGraph , paths : Sequence [Path ]
252
- ) -> str :
253
- """Format message when ``node.state()`` threw an exception."""
254
- tasks = [dag .nodes [i ]["task" ] for i in dag .successors (node_signature )]
255
- names = [task .name for task in tasks ]
256
- successors = ", " .join ([f"{ name !r} " for name in names ])
257
- node_name = format_node_name (dag .nodes [node_signature ]["node" ], paths ).plain
258
- return (
259
- f"While checking whether dependency { node_name !r} from task(s) "
260
- f"{ successors } exists, an error was raised."
261
- )
262
-
263
-
264
- def _check_if_tasks_are_skipped (
265
- node : PNode , dag : nx .DiGraph , is_task_skipped : dict [str , bool ]
266
- ) -> tuple [bool , dict [str , bool ]]:
267
- """Check for a given node whether it is only used by skipped tasks."""
268
- are_all_tasks_skipped = []
269
- for successor in dag .successors (node ):
270
- if successor not in is_task_skipped :
271
- is_task_skipped [successor ] = _check_if_task_is_skipped (successor , dag )
272
- are_all_tasks_skipped .append (is_task_skipped [successor ])
273
-
274
- return all (are_all_tasks_skipped ), is_task_skipped
275
-
276
-
277
- def _check_if_task_is_skipped (task_name : str , dag : nx .DiGraph ) -> bool :
278
- task = dag .nodes [task_name ]["task" ]
279
- is_skipped = has_mark (task , "skip" )
280
-
281
- if is_skipped :
282
- return True
283
-
284
- skip_if_markers = get_marks (task , "skipif" )
285
- return any (_skipif (* marker .args , ** marker .kwargs )[0 ] for marker in skip_if_markers )
286
-
287
-
288
- def _skipif (condition : bool , * , reason : str ) -> tuple [bool , str ]:
289
- """Shameless copy to circumvent circular imports."""
290
- return condition , reason
291
-
292
-
293
190
def _format_dictionary_to_tree (dict_ : dict [str , list [str ]], title : str ) -> str :
294
191
"""Format missing root nodes."""
295
192
tree = Tree (title )
0 commit comments