Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(synthesis): parse area configurations avoiding errors when some …
Browse files Browse the repository at this point in the history
…INI files are missing or empty
laurent-laporte-pro committed Apr 17, 2024
1 parent 594459f commit aef1828
Showing 3 changed files with 213 additions and 139 deletions.
118 changes: 75 additions & 43 deletions antarest/study/storage/rawstudy/model/filesystem/config/files.py
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
SimulationParsingError,
XpansionParsingError,
)
from antarest.study.storage.rawstudy.model.filesystem.config.field_validators import extract_filtering
from antarest.study.storage.rawstudy.model.filesystem.config.model import (
Area,
DistrictSet,
@@ -83,6 +84,48 @@ def build(study_path: Path, study_id: str, output_path: t.Optional[Path] = None)
)


def _extract_text_from_zip(root: Path, posix_path: str) -> t.Sequence[str]:
"""
Extracts text from a file inside a ZIP archive and returns it as a list of lines.
Args:
root: The path to the ZIP archive.
posix_path: The relative path to the file inside the ZIP archive.
Returns:
A list of lines in the file. If the file is not found, an empty list is returned.
"""
with zipfile.ZipFile(root) as zf:
try:
with zf.open(posix_path) as f:
text = f.read().decode("utf-8")
return text.splitlines(keepends=False)
except KeyError:
return []


def _extract_ini_from_zip(root: Path, posix_path: str, multi_ini_keys: t.Sequence[str] = ()) -> t.Mapping[str, t.Any]:
"""
Extracts data from an INI file inside a ZIP archive and returns it as a dictionary.
Args:
root: The path to the ZIP archive.
posix_path: The relative path to the file inside the ZIP archive.
multi_ini_keys: List of keys to use for multi INI files.
Returns:
A dictionary of keys/values in the INI file. If the file is not found, an empty dictionary is returned.
"""
reader = IniReader(multi_ini_keys)
with zipfile.ZipFile(root) as zf:
try:
with zf.open(posix_path) as f:
buffer = io.StringIO(f.read().decode("utf-8"))
return reader.read(buffer)
except KeyError:
return {}


def _extract_data_from_file(
root: Path,
inside_root_path: Path,
@@ -110,14 +153,7 @@ def _extract_data_from_file(
if file_type == FileType.TXT:
# Parse the file as a list of lines, return an empty list if missing.
if is_zip_file:
with zipfile.ZipFile(root) as zf:
try:
with zf.open(posix_path) as f:
text = f.read().decode("utf-8")
return text.splitlines(keepends=False)
except KeyError:
# File not found in the ZIP archive
return []
return _extract_text_from_zip(root, posix_path)
else:
output_data_path = root / inside_root_path
try:
@@ -127,19 +163,12 @@ def _extract_data_from_file(

elif file_type in {FileType.MULTI_INI, FileType.SIMPLE_INI}:
# Parse the file as a dictionary of keys/values, return an empty dictionary if missing.
reader = IniReader(multi_ini_keys)
if is_zip_file:
with zipfile.ZipFile(root) as zf:
try:
with zf.open(posix_path) as f:
buffer = io.StringIO(f.read().decode("utf-8"))
return reader.read(buffer)
except KeyError:
# File not found in the ZIP archive
return {}
return _extract_ini_from_zip(root, posix_path, multi_ini_keys=multi_ini_keys)
else:
output_data_path = root / inside_root_path
try:
reader = IniReader(multi_ini_keys)
return reader.read(output_data_path)
except FileNotFoundError:
return {}
@@ -294,7 +323,7 @@ def _parse_xpansion_version(path: Path) -> str:
raise XpansionParsingError(xpansion_json, f"key '{exc}' not found in JSON object") from exc


_regex_eco_adq = re.compile("^([0-9]{8}-[0-9]{4})(eco|adq)-?(.*)")
_regex_eco_adq = re.compile(r"^(\d{8}-\d{4})(eco|adq)-?(.*)")
match_eco_adq = _regex_eco_adq.match


@@ -359,14 +388,36 @@ def get_playlist(config: JSON) -> t.Optional[t.Dict[int, float]]:


def parse_area(root: Path, area: str) -> "Area":
"""
Parse an area configuration and extract its filtering configuration.
Args:
root: The root directory of the study.
area: The name of the area to parse.
Returns:
The area configuration.
"""
area_id = transform_name_to_id(area)

# Parse the optimization INI file to extract the filtering configuration.
# The file is optional, so we use a default value to avoid a parsing error.
optimization = _extract_data_from_file(
root=root,
inside_root_path=Path(f"input/areas/{area_id}/optimization.ini"),
file_type=FileType.SIMPLE_INI,
)
filtering = optimization.get("filtering", {})
filter_synthesis = extract_filtering(filtering.get("filter-synthesis", ""))
filter_year_by_year = extract_filtering(filtering.get("filter-year-by-year", ""))

return Area(
name=area,
links=_parse_links(root, area_id),
links=_parse_links_filtering(root, area_id),
thermals=_parse_thermal(root, area_id),
renewables=_parse_renewables(root, area_id),
filters_synthesis=_parse_filters_synthesis(root, area_id),
filters_year=_parse_filters_year(root, area_id),
filters_synthesis=filter_synthesis,
filters_year=filter_year_by_year,
st_storages=_parse_st_storage(root, area_id),
)

@@ -444,33 +495,14 @@ def _parse_st_storage(root: Path, area: str) -> t.List[STStorageConfigType]:
return config_list


def _parse_links(root: Path, area: str) -> t.Dict[str, Link]:
def _parse_links_filtering(root: Path, area: str) -> t.Dict[str, Link]:
properties_ini = _extract_data_from_file(
root=root,
inside_root_path=Path(f"input/links/{area}/properties.ini"),
file_type=FileType.SIMPLE_INI,
)
return {link: Link.from_json(properties_ini[link]) for link in list(properties_ini.keys())}


def _parse_filters_synthesis(root: Path, area: str) -> t.List[str]:
optimization = _extract_data_from_file(
root=root,
inside_root_path=Path(f"input/areas/{area}/optimization.ini"),
file_type=FileType.SIMPLE_INI,
)
filters: str = optimization["filtering"]["filter-synthesis"]
return Link.split(filters)


def _parse_filters_year(root: Path, area: str) -> t.List[str]:
optimization = _extract_data_from_file(
root=root,
inside_root_path=Path(f"input/areas/{area}/optimization.ini"),
file_type=FileType.SIMPLE_INI,
)
filters: str = optimization["filtering"]["filter-year-by-year"]
return Link.split(filters)
links_by_ids = {link_id: Link(**obj) for link_id, obj in properties_ini.items()}
return links_by_ids


def _check_build_on_solver_tests(test_dir: Path) -> None:
114 changes: 58 additions & 56 deletions antarest/study/storage/rawstudy/model/filesystem/config/model.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import re
import typing as t
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional

from pydantic import Extra
from pydantic import Field, root_validator
from pydantic.main import BaseModel

from antarest.core.model import JSON
from antarest.core.utils.utils import DTO

from .binding_constraint import BindingConstraintDTO
from .field_validators import extract_filtering
from .renewable import RenewableConfigType
from .st_storage import STStorageConfigType
from .thermal import ThermalConfigType
@@ -20,42 +20,44 @@ class ENR_MODELLING(Enum):
CLUSTERS = "clusters"


class Link(BaseModel):
class Link(BaseModel, extra="ignore"):
"""
Object linked to /input/links/<link>/properties.ini information
"""
filters_synthesis: List[str]
filters_year: List[str]
Attributes:
filters_synthesis: list of filters for synthesis data
filters_year: list of filters for year-by-year data
@staticmethod
def from_json(properties: JSON) -> "Link":
return Link(
filters_year=Link.split(properties["filter-year-by-year"]),
filters_synthesis=Link.split(properties["filter-synthesis"]),
)
Notes:
Ignore extra fields, because we only need `filter-synthesis` and `filter-year-by-year`.
"""

@staticmethod
def split(line: str) -> List[str]:
return [token.strip() for token in line.split(",") if token.strip() != ""]
filters_synthesis: t.List[str] = Field(default_factory=list)
filters_year: t.List[str] = Field(default_factory=list)

@root_validator(pre=True)
def validation(cls, values: t.MutableMapping[str, t.Any]) -> t.MutableMapping[str, t.Any]:
# note: field names are in kebab-case in the INI file
filters_synthesis = values.pop("filter-synthesis", values.pop("filters_synthesis", ""))
filters_year = values.pop("filter-year-by-year", values.pop("filters_year", ""))
values["filters_synthesis"] = extract_filtering(filters_synthesis)
values["filters_year"] = extract_filtering(filters_year)
return values


class Area(BaseModel):
class Area(BaseModel, extra="forbid"):
"""
Object linked to /input/<area>/optimization.ini information
"""

class Config:
extra = Extra.forbid

name: str
links: Dict[str, Link]
thermals: List[ThermalConfigType]
renewables: List[RenewableConfigType]
filters_synthesis: List[str]
filters_year: List[str]
links: t.Dict[str, Link]
thermals: t.List[ThermalConfigType]
renewables: t.List[RenewableConfigType]
filters_synthesis: t.List[str]
filters_year: t.List[str]
# since v8.6
st_storages: List[STStorageConfigType] = []
st_storages: t.List[STStorageConfigType] = []


class DistrictSet(BaseModel):
@@ -64,14 +66,14 @@ class DistrictSet(BaseModel):
"""

ALL = ["hourly", "daily", "weekly", "monthly", "annual"]
name: Optional[str] = None
name: t.Optional[str] = None
inverted_set: bool = False
areas: Optional[List[str]] = None
areas: t.Optional[t.List[str]] = None
output: bool = True
filters_synthesis: List[str] = ALL
filters_year: List[str] = ALL
filters_synthesis: t.List[str] = ALL
filters_year: t.List[str] = ALL

def get_areas(self, all_areas: List[str]) -> List[str]:
def get_areas(self, all_areas: t.List[str]) -> t.List[str]:
if self.inverted_set:
return list(set(all_areas).difference(set(self.areas or [])))
return self.areas or []
@@ -89,7 +91,7 @@ class Simulation(BaseModel):
synthesis: bool
by_year: bool
error: bool
playlist: Optional[List[int]]
playlist: t.Optional[t.List[int]]
archived: bool = False
xpansion: str

@@ -110,16 +112,16 @@ def __init__(
path: Path,
study_id: str,
version: int,
output_path: Optional[Path] = None,
areas: Optional[Dict[str, Area]] = None,
sets: Optional[Dict[str, DistrictSet]] = None,
outputs: Optional[Dict[str, Simulation]] = None,
bindings: Optional[List[BindingConstraintDTO]] = None,
output_path: t.Optional[Path] = None,
areas: t.Optional[t.Dict[str, Area]] = None,
sets: t.Optional[t.Dict[str, DistrictSet]] = None,
outputs: t.Optional[t.Dict[str, Simulation]] = None,
bindings: t.Optional[t.List[BindingConstraintDTO]] = None,
store_new_set: bool = False,
archive_input_series: Optional[List[str]] = None,
archive_input_series: t.Optional[t.List[str]] = None,
enr_modelling: str = ENR_MODELLING.AGGREGATED.value,
cache: Optional[Dict[str, List[str]]] = None,
zip_path: Optional[Path] = None,
cache: t.Optional[t.Dict[str, t.List[str]]] = None,
zip_path: t.Optional[Path] = None,
):
self.study_path = study_path
self.path = path
@@ -138,7 +140,7 @@ def __init__(

def next_file(self, name: str, is_output: bool = False) -> "FileStudyTreeConfig":
if is_output and name in self.outputs and self.outputs[name].archived:
zip_path: Optional[Path] = self.path / f"{name}.zip"
zip_path: t.Optional[Path] = self.path / f"{name}.zip"
else:
zip_path = self.zip_path

@@ -176,43 +178,43 @@ def at_file(self, filepath: Path) -> "FileStudyTreeConfig":
cache=self.cache,
)

def area_names(self) -> List[str]:
def area_names(self) -> t.List[str]:
return self.cache.get("%areas", list(self.areas.keys()))

def set_names(self, only_output: bool = True) -> List[str]:
def set_names(self, only_output: bool = True) -> t.List[str]:
return self.cache.get(
f"%districts%{only_output}",
[k for k, v in self.sets.items() if v.output or not only_output],
)

def get_thermal_ids(self, area: str) -> List[str]:
def get_thermal_ids(self, area: str) -> t.List[str]:
"""
Returns a list of thermal cluster IDs for a given area.
Note that IDs may not be in lower case (but series IDs are).
"""
return self.cache.get(f"%thermal%{area}%{area}", [th.id for th in self.areas[area].thermals])

def get_renewable_ids(self, area: str) -> List[str]:
def get_renewable_ids(self, area: str) -> t.List[str]:
"""
Returns a list of renewable cluster IDs for a given area.
Note that IDs may not be in lower case (but series IDs are).
"""
return self.cache.get(f"%renewable%{area}", [r.id for r in self.areas[area].renewables])

def get_st_storage_ids(self, area: str) -> List[str]:
def get_st_storage_ids(self, area: str) -> t.List[str]:
return self.cache.get(f"%st-storage%{area}", [s.id for s in self.areas[area].st_storages])

def get_links(self, area: str) -> List[str]:
def get_links(self, area: str) -> t.List[str]:
return self.cache.get(f"%links%{area}", list(self.areas[area].links.keys()))

def get_filters_synthesis(self, area: str, link: Optional[str] = None) -> List[str]:
def get_filters_synthesis(self, area: str, link: t.Optional[str] = None) -> t.List[str]:
if link:
return self.areas[area].links[link].filters_synthesis
if area in self.sets and self.sets[area].output:
return self.sets[area].filters_synthesis
return self.areas[area].filters_synthesis

def get_filters_year(self, area: str, link: Optional[str] = None) -> List[str]:
def get_filters_year(self, area: str, link: t.Optional[str] = None) -> t.List[str]:
if link:
return self.areas[area].links[link].filters_year
if area in self.sets and self.sets[area].output:
@@ -245,15 +247,15 @@ class FileStudyTreeConfigDTO(BaseModel):
path: Path
study_id: str
version: int
output_path: Optional[Path] = None
areas: Dict[str, Area] = dict()
sets: Dict[str, DistrictSet] = dict()
outputs: Dict[str, Simulation] = dict()
bindings: List[BindingConstraintDTO] = list()
output_path: t.Optional[Path] = None
areas: t.Dict[str, Area] = dict()
sets: t.Dict[str, DistrictSet] = dict()
outputs: t.Dict[str, Simulation] = dict()
bindings: t.List[BindingConstraintDTO] = list()
store_new_set: bool = False
archive_input_series: List[str] = list()
archive_input_series: t.List[str] = list()
enr_modelling: str = ENR_MODELLING.AGGREGATED.value
zip_path: Optional[Path] = None
zip_path: t.Optional[Path] = None

@staticmethod
def from_build_config(
120 changes: 80 additions & 40 deletions tests/storage/repository/filesystem/config/test_config_files.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import textwrap
from pathlib import Path
from typing import Any, Dict
from zipfile import ZipFile
@@ -10,7 +11,7 @@
BindingConstraintFrequency,
)
from antarest.study.storage.rawstudy.model.filesystem.config.files import (
_parse_links,
_parse_links_filtering,
_parse_renewables,
_parse_sets,
_parse_st_storage,
@@ -31,8 +32,12 @@
from tests.storage.business.assets import ASSETS_DIR


def build_empty_files(tmp: Path) -> Path:
study_path = tmp / "my-study"
@pytest.fixture(name="study_path")
def study_path_fixture(tmp_path: Path) -> Path:
"""
Create a study directory with the minimal structure required to build the configuration.
"""
study_path = tmp_path / "my-study"
(study_path / "input/bindingconstraints/").mkdir(parents=True)
(study_path / "input/bindingconstraints/bindingconstraints.ini").touch()

@@ -49,39 +54,37 @@ def build_empty_files(tmp: Path) -> Path:
return study_path


def test_parse_output_parameters(tmp_path: Path) -> None:
study = build_empty_files(tmp_path)
def test_parse_output_parameters(study_path: Path) -> None:
content = """
[output]
synthesis = true
storenewset = true
archives =
"""
(study / "settings/generaldata.ini").write_text(content)
(study_path / "settings/generaldata.ini").write_text(content)

config = FileStudyTreeConfig(
study_path=study,
path=study,
study_path=study_path,
path=study_path,
version=-1,
store_new_set=True,
study_id="id",
output_path=study / "output",
output_path=study_path / "output",
)
assert build(study, "id") == config
assert build(study_path, "id") == config


def test_parse_bindings(tmp_path: Path) -> None:
def test_parse_bindings(study_path: Path) -> None:
# Setup files
study_path = build_empty_files(tmp_path)
content = """
content = """\
[bindA]
id = bindA
[bindB]
id = bindB
type = weekly
"""
(study_path / "input/bindingconstraints/bindingconstraints.ini").write_text(content)
(study_path / "input/bindingconstraints/bindingconstraints.ini").write_text(textwrap.dedent(content))

config = FileStudyTreeConfig(
study_path=study_path,
@@ -108,14 +111,13 @@ def test_parse_bindings(tmp_path: Path) -> None:
assert build(study_path, "id") == config


def test_parse_outputs(tmp_path: Path) -> None:
study_path = build_empty_files(tmp_path)
def test_parse_outputs(study_path: Path) -> None:
output_path = study_path / "output/20201220-1456eco-hello/"
output_path.mkdir(parents=True)

(output_path / "about-the-study").mkdir()
file = output_path / "about-the-study/parameters.ini"
content = """
content = """\
[general]
nbyears = 1
year-by-year = true
@@ -127,7 +129,7 @@ def test_parse_outputs(tmp_path: Path) -> None:
[playlist]
playlist_year + = 0
"""
file.write_text(content)
file.write_text(textwrap.dedent(content))

(output_path / "checkIntegrity.txt").touch()

@@ -226,21 +228,19 @@ def test_parse_outputs__nominal(tmp_path: Path, assets_name: str, expected: Dict
assert actual == expected


def test_parse_sets(tmp_path: Path) -> None:
study_path = build_empty_files(tmp_path)
content = """
[hello]
output = true
+ = a
+ = b
"""
(study_path / "input/areas/sets.ini").write_text(content)
def test_parse_sets(study_path: Path) -> None:
content = """\
[hello]
output = true
+ = a
+ = b
"""
(study_path / "input/areas/sets.ini").write_text(textwrap.dedent(content))

assert _parse_sets(study_path) == {"hello": DistrictSet(areas=["a", "b"], output=True, inverted_set=False)}


def test_parse_area(tmp_path: Path) -> None:
study_path = build_empty_files(tmp_path)
def test_parse_area(study_path: Path) -> None:
(study_path / "input/areas/list.txt").write_text("FR\n")
(study_path / "input/areas/fr").mkdir(parents=True)
content = """
@@ -270,6 +270,51 @@ def test_parse_area(tmp_path: Path) -> None:
assert build(study_path, "id") == config


def test_parse_area__extra_area(study_path: Path) -> None:
"""
Test the case where an extra area is present in the `list.txt` file.
The extra area should be taken into account with default values to avoid any parsing error.
"""

(study_path / "input/areas/list.txt").write_text("FR\nDE\n")
(study_path / "input/areas/fr").mkdir(parents=True)
content = """
[filtering]
filter-synthesis = daily, monthly
filter-year-by-year = hourly, weekly, annual
"""
(study_path / "input/areas/fr/optimization.ini").write_text(content)

config = FileStudyTreeConfig(
study_path=study_path,
path=study_path,
study_id="id",
version=-1,
output_path=study_path / "output",
areas={
"fr": Area(
name="FR",
thermals=[],
renewables=[],
links={},
filters_year=["hourly", "weekly", "annual"],
filters_synthesis=["daily", "monthly"],
),
"de": Area(
name="DE",
links={},
thermals=[],
renewables=[],
filters_synthesis=[],
filters_year=[],
st_storages=[],
),
},
)
assert build(study_path, "id") == config


# noinspection SpellCheckingInspection
THERMAL_LIST_INI = """\
[t1]
@@ -286,8 +331,7 @@ def test_parse_area(tmp_path: Path) -> None:
"""


def test_parse_thermal(tmp_path: Path) -> None:
study_path = build_empty_files(tmp_path)
def test_parse_thermal(study_path: Path) -> None:
study_path.joinpath("study.antares").write_text("[antares] \n version = 700")
ini_path = study_path.joinpath("input/thermal/clusters/fr/list.ini")

@@ -325,8 +369,7 @@ def test_parse_thermal(tmp_path: Path) -> None:


@pytest.mark.parametrize("version", [850, 860, 870])
def test_parse_thermal_860(tmp_path: Path, version, caplog) -> None:
study_path = build_empty_files(tmp_path)
def test_parse_thermal_860(study_path: Path, version, caplog) -> None:
study_path.joinpath("study.antares").write_text(f"[antares] \n version = {version}")
ini_path = study_path.joinpath("input/thermal/clusters/fr/list.ini")
ini_path.parent.mkdir(parents=True)
@@ -361,8 +404,7 @@ def test_parse_thermal_860(tmp_path: Path, version, caplog) -> None:
"""


def test_parse_renewables(tmp_path: Path) -> None:
study_path = build_empty_files(tmp_path)
def test_parse_renewables(study_path: Path) -> None:
study_path.joinpath("study.antares").write_text("[antares] \n version = 810")
ini_path = study_path.joinpath("input/renewables/clusters/fr/list.ini")

@@ -411,8 +453,7 @@ def test_parse_renewables(tmp_path: Path) -> None:
"""


def test_parse_st_storage(tmp_path: Path) -> None:
study_path = build_empty_files(tmp_path)
def test_parse_st_storage(study_path: Path) -> None:
study_path.joinpath("study.antares").write_text("[antares] \n version = 860")
config_dir = study_path.joinpath("input", "st-storage", "clusters", "fr")
config_dir.mkdir(parents=True)
@@ -452,8 +493,7 @@ def test_parse_st_storage_with_no_file(tmp_path: Path) -> None:
assert _parse_st_storage(tmp_path, "") == []


def test_parse_links(tmp_path: Path) -> None:
study_path = build_empty_files(tmp_path)
def test_parse_links(study_path: Path) -> None:
(study_path / "input/links/fr").mkdir(parents=True)
content = """
[l1]
@@ -463,4 +503,4 @@ def test_parse_links(tmp_path: Path) -> None:
(study_path / "input/links/fr/properties.ini").write_text(content)

link = Link(filters_synthesis=["annual"], filters_year=["hourly"])
assert _parse_links(study_path, "fr") == {"l1": link}
assert _parse_links_filtering(study_path, "fr") == {"l1": link}

0 comments on commit aef1828

Please sign in to comment.