Skip to content

Commit

Permalink
feat(synthesis): parse area configurations avoiding errors when some …
Browse files Browse the repository at this point in the history
…INI files are missing or empty
  • Loading branch information
laurent-laporte-pro committed Apr 19, 2024
1 parent 590d688 commit 59c14b9
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 139 deletions.
118 changes: 75 additions & 43 deletions antarest/study/storage/rawstudy/model/filesystem/config/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
SimulationParsingError,
XpansionParsingError,
)
from antarest.study.storage.rawstudy.model.filesystem.config.field_validators import extract_filtering
from antarest.study.storage.rawstudy.model.filesystem.config.model import (
Area,
DistrictSet,
Expand Down Expand Up @@ -83,6 +84,48 @@ def build(study_path: Path, study_id: str, output_path: t.Optional[Path] = None)
)


def _extract_text_from_zip(root: Path, posix_path: str) -> t.Sequence[str]:
"""
Extracts text from a file inside a ZIP archive and returns it as a list of lines.
Args:
root: The path to the ZIP archive.
posix_path: The relative path to the file inside the ZIP archive.
Returns:
A list of lines in the file. If the file is not found, an empty list is returned.
"""
with zipfile.ZipFile(root) as zf:
try:
with zf.open(posix_path) as f:
text = f.read().decode("utf-8")
return text.splitlines(keepends=False)
except KeyError:
return []


def _extract_ini_from_zip(root: Path, posix_path: str, multi_ini_keys: t.Sequence[str] = ()) -> t.Mapping[str, t.Any]:
"""
Extracts data from an INI file inside a ZIP archive and returns it as a dictionary.
Args:
root: The path to the ZIP archive.
posix_path: The relative path to the file inside the ZIP archive.
multi_ini_keys: List of keys to use for multi INI files.
Returns:
A dictionary of keys/values in the INI file. If the file is not found, an empty dictionary is returned.
"""
reader = IniReader(multi_ini_keys)
with zipfile.ZipFile(root) as zf:
try:
with zf.open(posix_path) as f:
buffer = io.StringIO(f.read().decode("utf-8"))
return reader.read(buffer)
except KeyError:
return {}


def _extract_data_from_file(
root: Path,
inside_root_path: Path,
Expand Down Expand Up @@ -110,14 +153,7 @@ def _extract_data_from_file(
if file_type == FileType.TXT:
# Parse the file as a list of lines, return an empty list if missing.
if is_zip_file:
with zipfile.ZipFile(root) as zf:
try:
with zf.open(posix_path) as f:
text = f.read().decode("utf-8")
return text.splitlines(keepends=False)
except KeyError:
# File not found in the ZIP archive
return []
return _extract_text_from_zip(root, posix_path)
else:
output_data_path = root / inside_root_path
try:
Expand All @@ -127,19 +163,12 @@ def _extract_data_from_file(

elif file_type in {FileType.MULTI_INI, FileType.SIMPLE_INI}:
# Parse the file as a dictionary of keys/values, return an empty dictionary if missing.
reader = IniReader(multi_ini_keys)
if is_zip_file:
with zipfile.ZipFile(root) as zf:
try:
with zf.open(posix_path) as f:
buffer = io.StringIO(f.read().decode("utf-8"))
return reader.read(buffer)
except KeyError:
# File not found in the ZIP archive
return {}
return _extract_ini_from_zip(root, posix_path, multi_ini_keys=multi_ini_keys)
else:
output_data_path = root / inside_root_path
try:
reader = IniReader(multi_ini_keys)
return reader.read(output_data_path)
except FileNotFoundError:
return {}
Expand Down Expand Up @@ -294,7 +323,7 @@ def _parse_xpansion_version(path: Path) -> str:
raise XpansionParsingError(xpansion_json, f"key '{exc}' not found in JSON object") from exc


_regex_eco_adq = re.compile("^([0-9]{8}-[0-9]{4})(eco|adq)-?(.*)")
_regex_eco_adq = re.compile(r"^(\d{8}-\d{4})(eco|adq)-?(.*)")
match_eco_adq = _regex_eco_adq.match


Expand Down Expand Up @@ -359,14 +388,36 @@ def get_playlist(config: JSON) -> t.Optional[t.Dict[int, float]]:


def parse_area(root: Path, area: str) -> "Area":
"""
Parse an area configuration and extract its filtering configuration.
Args:
root: The root directory of the study.
area: The name of the area to parse.
Returns:
The area configuration.
"""
area_id = transform_name_to_id(area)

# Parse the optimization INI file to extract the filtering configuration.
# The file is optional, so we use a default value to avoid a parsing error.
optimization = _extract_data_from_file(
root=root,
inside_root_path=Path(f"input/areas/{area_id}/optimization.ini"),
file_type=FileType.SIMPLE_INI,
)
filtering = optimization.get("filtering", {})
filter_synthesis = extract_filtering(filtering.get("filter-synthesis", ""))
filter_year_by_year = extract_filtering(filtering.get("filter-year-by-year", ""))

return Area(
name=area,
links=_parse_links(root, area_id),
links=_parse_links_filtering(root, area_id),
thermals=_parse_thermal(root, area_id),
renewables=_parse_renewables(root, area_id),
filters_synthesis=_parse_filters_synthesis(root, area_id),
filters_year=_parse_filters_year(root, area_id),
filters_synthesis=filter_synthesis,
filters_year=filter_year_by_year,
st_storages=_parse_st_storage(root, area_id),
)

Expand Down Expand Up @@ -444,33 +495,14 @@ def _parse_st_storage(root: Path, area: str) -> t.List[STStorageConfigType]:
return config_list


def _parse_links(root: Path, area: str) -> t.Dict[str, Link]:
def _parse_links_filtering(root: Path, area: str) -> t.Dict[str, Link]:
properties_ini = _extract_data_from_file(
root=root,
inside_root_path=Path(f"input/links/{area}/properties.ini"),
file_type=FileType.SIMPLE_INI,
)
return {link: Link.from_json(properties_ini[link]) for link in list(properties_ini.keys())}


def _parse_filters_synthesis(root: Path, area: str) -> t.List[str]:
optimization = _extract_data_from_file(
root=root,
inside_root_path=Path(f"input/areas/{area}/optimization.ini"),
file_type=FileType.SIMPLE_INI,
)
filters: str = optimization["filtering"]["filter-synthesis"]
return Link.split(filters)


def _parse_filters_year(root: Path, area: str) -> t.List[str]:
optimization = _extract_data_from_file(
root=root,
inside_root_path=Path(f"input/areas/{area}/optimization.ini"),
file_type=FileType.SIMPLE_INI,
)
filters: str = optimization["filtering"]["filter-year-by-year"]
return Link.split(filters)
links_by_ids = {link_id: Link(**obj) for link_id, obj in properties_ini.items()}
return links_by_ids


def _check_build_on_solver_tests(test_dir: Path) -> None:
Expand Down
Loading

0 comments on commit 59c14b9

Please sign in to comment.