Skip to content

Commit 7809ca5

Browse files
committed
avoid circular import
1 parent f635528 commit 7809ca5

File tree

3 files changed

+87
-34
lines changed

3 files changed

+87
-34
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import dataclasses
2+
from typing import Any, Mapping
3+
import pandas as pd
4+
5+
@dataclasses.dataclass(frozen=True)
6+
class _ColumnProperties:
7+
"""Expected/required properties of a column in the job manager related dataframes"""
8+
9+
dtype: str = "object"
10+
default: Any = None
11+
12+
# Expected columns in the job DB dataframes.
13+
# TODO: make this part of public API when settled?
14+
# TODO: move non official statuses to seperate column (not_started, queued_for_start)
15+
_COLUMN_REQUIREMENTS: Mapping[str, _ColumnProperties] = {
16+
"id": _ColumnProperties(dtype="str"),
17+
"backend_name": _ColumnProperties(dtype="str"),
18+
"status": _ColumnProperties(dtype="str", default="not_started"),
19+
# TODO: use proper date/time dtype instead of legacy str for start times?
20+
"start_time": _ColumnProperties(dtype="str"),
21+
"running_start_time": _ColumnProperties(dtype="str"),
22+
# TODO: these columns "cpu", "memory", "duration" are not referenced explicitly from MultiBackendJobManager,
23+
# but are indirectly coupled through handling of VITO-specific "usage" metadata in `_track_statuses`.
24+
# Since bfd99e34 they are not really required to be present anymore, can we make that more explicit?
25+
"cpu": _ColumnProperties(dtype="str"),
26+
"memory": _ColumnProperties(dtype="str"),
27+
"duration": _ColumnProperties(dtype="str"),
28+
"costs": _ColumnProperties(dtype="float64"),
29+
}
30+
31+
def _normalize(df: pd.DataFrame) -> pd.DataFrame:
32+
"""
33+
Normalize given pandas dataframe (creating a new one):
34+
ensure we have the required columns.
35+
36+
:param df: The dataframe to normalize.
37+
:return: a new dataframe that is normalized.
38+
"""
39+
new_columns = {col: req.default for (col, req) in _COLUMN_REQUIREMENTS.items() if col not in df.columns}
40+
df = df.assign(**new_columns)
41+
42+
return df

openeo/extra/job_management/_job_db.py

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import pandas as pd
88

99
from openeo.extra.job_management._interface import JobDatabaseInterface
10-
from openeo.extra.job_management._manager import MultiBackendJobManager
10+
from openeo.extra.job_management._df_schema import _normalize, _COLUMN_REQUIREMENTS
1111

1212
_log = logging.getLogger(__name__)
1313

@@ -40,7 +40,7 @@ def initialize_from_df(self, df: pd.DataFrame, *, on_exists: str = "error"):
4040
else:
4141
# TODO handle other on_exists modes: e.g. overwrite, merge, ...
4242
raise ValueError(f"Invalid on_exists={on_exists!r}")
43-
df = MultiBackendJobManager._normalize_df(df)
43+
df = _normalize(df)
4444
self.persist(df)
4545
# Return self to allow chaining with constructor.
4646
return self
@@ -133,7 +133,7 @@ def read(self) -> pd.DataFrame:
133133
df = pd.read_csv(
134134
self.path,
135135
# TODO: possible to avoid hidden coupling with MultiBackendJobManager here?
136-
dtype={c: r.dtype for (c, r) in MultiBackendJobManager._COLUMN_REQUIREMENTS.items()},
136+
dtype={c: r.dtype for (c, r) in _COLUMN_REQUIREMENTS.items()},
137137
)
138138
if (
139139
"geometry" in df.columns
@@ -203,3 +203,42 @@ def persist(self, df: pd.DataFrame):
203203
self.df.to_parquet(self.path, index=False)
204204

205205

206+
def get_job_db(path: Union[str, Path]) -> JobDatabaseInterface:
207+
"""
208+
Factory to get a job database at a given path,
209+
guessing the database type from filename extension.
210+
211+
:param path: path to job database file.
212+
213+
.. versionadded:: 0.33.0
214+
"""
215+
path = Path(path)
216+
if path.suffix.lower() in {".csv"}:
217+
job_db = CsvJobDatabase(path=path)
218+
elif path.suffix.lower() in {".parquet", ".geoparquet"}:
219+
job_db = ParquetJobDatabase(path=path)
220+
else:
221+
raise ValueError(f"Could not guess job database type from {path!r}")
222+
return job_db
223+
224+
225+
def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = "error"):
226+
"""
227+
Factory to create a job database at given path,
228+
initialized from a given dataframe,
229+
and its database type guessed from filename extension.
230+
231+
:param path: Path to the job database file.
232+
:param df: DataFrame to store in the job database.
233+
:param on_exists: What to do when the job database already exists:
234+
- "error": (default) raise an exception
235+
- "skip": work with existing database, ignore given dataframe and skip any initialization
236+
237+
.. versionadded:: 0.33.0
238+
"""
239+
job_db = get_job_db(path)
240+
if isinstance(job_db, FullDataFrameJobDatabase):
241+
job_db.initialize_from_df(df=df, on_exists=on_exists)
242+
else:
243+
raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.")
244+
return job_db

openeo/extra/job_management/_manager.py

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
Callable,
1414
Dict,
1515
List,
16-
Mapping,
1716
NamedTuple,
1817
Optional,
1918
Tuple,
@@ -31,7 +30,8 @@
3130
_JobStartTask,
3231
)
3332
from openeo.extra.job_management._interface import JobDatabaseInterface
34-
#from openeo.extra.job_management._job_db import get_job_db #TODO circular import
33+
from openeo.extra.job_management._job_db import get_job_db
34+
from openeo.extra.job_management._df_schema import _normalize
3535

3636
from openeo.rest import OpenEoApiError
3737
from openeo.rest.auth.auth import BearerAuth
@@ -55,13 +55,6 @@ class _Backend(NamedTuple):
5555
# Maximum number of jobs to allow in parallel on a backend
5656
parallel_jobs: int
5757

58-
@dataclasses.dataclass(frozen=True)
59-
class _ColumnProperties:
60-
"""Expected/required properties of a column in the job manager related dataframes"""
61-
62-
dtype: str = "object"
63-
default: Any = None
64-
6558

6659

6760

@@ -132,24 +125,6 @@ def start_job(
132125
Added ``cancel_running_job_after`` parameter.
133126
"""
134127

135-
# Expected columns in the job DB dataframes.
136-
# TODO: make this part of public API when settled?
137-
# TODO: move non official statuses to seperate column (not_started, queued_for_start)
138-
_COLUMN_REQUIREMENTS: Mapping[str, _ColumnProperties] = {
139-
"id": _ColumnProperties(dtype="str"),
140-
"backend_name": _ColumnProperties(dtype="str"),
141-
"status": _ColumnProperties(dtype="str", default="not_started"),
142-
# TODO: use proper date/time dtype instead of legacy str for start times?
143-
"start_time": _ColumnProperties(dtype="str"),
144-
"running_start_time": _ColumnProperties(dtype="str"),
145-
# TODO: these columns "cpu", "memory", "duration" are not referenced explicitly from MultiBackendJobManager,
146-
# but are indirectly coupled through handling of VITO-specific "usage" metadata in `_track_statuses`.
147-
# Since bfd99e34 they are not really required to be present anymore, can we make that more explicit?
148-
"cpu": _ColumnProperties(dtype="str"),
149-
"memory": _ColumnProperties(dtype="str"),
150-
"duration": _ColumnProperties(dtype="str"),
151-
"costs": _ColumnProperties(dtype="float64"),
152-
}
153128

154129
def __init__(
155130
self,
@@ -259,10 +234,7 @@ def _normalize_df(cls, df: pd.DataFrame) -> pd.DataFrame:
259234
:param df: The dataframe to normalize.
260235
:return: a new dataframe that is normalized.
261236
"""
262-
new_columns = {col: req.default for (col, req) in cls._COLUMN_REQUIREMENTS.items() if col not in df.columns}
263-
df = df.assign(**new_columns)
264-
265-
return df
237+
return _normalize(df)
266238

267239
def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabaseInterface):
268240
"""

0 commit comments

Comments
 (0)