Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Sylvain Leclerc <[email protected]>
  • Loading branch information
sylvlecl committed Jan 22, 2025
1 parent f3dadab commit ca2b53f
Show file tree
Hide file tree
Showing 14 changed files with 371 additions and 260 deletions.
157 changes: 98 additions & 59 deletions antarest/study/storage/rawstudy/ini_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,24 @@
import typing as t
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Callable, Optional

from typing_extensions import override

from antarest.core.model import JSON

PrimitiveType = t.Union[str, int, float, bool]
ValueParser = Callable[[str], PrimitiveType]

def convert_value(value: str) -> t.Union[str, int, float, bool]:

def _lower_case(input: str) -> str:
return input.lower()


LOWER_CASE_PARSER: ValueParser = _lower_case


def _convert_value(value: str) -> PrimitiveType:
"""Convert value to the appropriate type for JSON."""

try:
Expand All @@ -38,8 +49,19 @@ def convert_value(value: str) -> t.Union[str, int, float, bool]:
return value


@dataclasses.dataclass
class IniFilter:
@dataclasses.dataclass(frozen=True)
class OptionKey:
"""
Defines a location in INI file data.
A None section means all sections.
"""

section: Optional[str]
key: Optional[str]


@dataclasses.dataclass(frozen=True)
class ReaderOptions:
"""
Filter sections and options in an INI file based on regular expressions.
Expand All @@ -51,41 +73,6 @@ class IniFilter:
section_regex: t.Optional[t.Pattern[str]] = None
option_regex: t.Optional[t.Pattern[str]] = None

@classmethod
def from_kwargs(
cls,
section: str = "",
option: str = "",
section_regex: t.Optional[t.Union[str, t.Pattern[str]]] = None,
option_regex: t.Optional[t.Union[str, t.Pattern[str]]] = None,
**_unused: t.Any, # ignore unknown options
) -> "IniFilter":
"""
Create an instance from given filtering parameters.
When using `section` or `option` parameters, an exact match is done.
Alternatively, one can use `section_regex` or `option_regex` to perform a full match using a regex.
Args:
section: The section name to match (by default, all sections are matched)
option: The option name to match (by default, all options are matched)
section_regex: The regex for matching section names.
option_regex: The regex for matching option names.
_unused: Placeholder for any unknown options.
Returns:
The newly created instance
"""
if section:
section_regex = re.compile(re.escape(section), re.IGNORECASE)
if option:
option_regex = re.compile(re.escape(option), re.IGNORECASE)
if isinstance(section_regex, str):
section_regex = re.compile(section_regex, re.IGNORECASE) if section_regex else None
if isinstance(option_regex, str):
option_regex = re.compile(option_regex, re.IGNORECASE) if option_regex else None
return cls(section_regex=section_regex, option_regex=option_regex)

def select_section_option(self, section: str, option: str = "") -> bool:
"""
Check if a given section and option match the regular expressions.
Expand All @@ -104,13 +91,46 @@ def select_section_option(self, section: str, option: str = "") -> bool:
return True


def ini_reader_options(
section: str = "",
option: str = "",
section_regex: t.Optional[t.Union[str, t.Pattern[str]]] = None,
option_regex: t.Optional[t.Union[str, t.Pattern[str]]] = None,
) -> ReaderOptions:
"""
Create an instance from given filtering parameters.
When using `section` or `option` parameters, an exact match is done.
Alternatively, one can use `section_regex` or `option_regex` to perform a full match using a regex.
Args:
section: The section name to match (by default, all sections are matched)
option: The option name to match (by default, all options are matched)
section_regex: The regex for matching section names.
option_regex: The regex for matching option names.
_unused: Placeholder for any unknown options.
Returns:
The newly created instance
"""
if section:
section_regex = re.compile(re.escape(section), re.IGNORECASE)
if option:
option_regex = re.compile(re.escape(option), re.IGNORECASE)
if isinstance(section_regex, str):
section_regex = re.compile(section_regex, re.IGNORECASE) if section_regex else None
if isinstance(option_regex, str):
option_regex = re.compile(option_regex, re.IGNORECASE) if option_regex else None
return ReaderOptions(section_regex=section_regex, option_regex=option_regex)


class IReader(ABC):
"""
File reader interface.
"""

@abstractmethod
def read(self, path: t.Any, **kwargs: t.Any) -> JSON:
def read(self, path: t.Any, options: Optional[ReaderOptions] = None) -> JSON:
"""
Parse `.ini` file to json object.
Expand Down Expand Up @@ -152,11 +172,17 @@ class IniReader(IReader):
This class is not compatible with standard `.ini` readers.
"""

def __init__(self, special_keys: t.Sequence[str] = (), section_name: str = "settings") -> None:
def __init__(
self,
special_keys: t.Sequence[str] = (),
section_name: str = "settings",
value_parsers: t.Dict[OptionKey, ValueParser] = None,
) -> None:
super().__init__()

# Default section name to use if `.ini` file has no section.
self._special_keys = set(special_keys)
self._value_parsers = value_parsers or {}

# List of keys which should be parsed as list.
self._section_name = section_name
Expand All @@ -180,30 +206,31 @@ def __repr__(self) -> str: # pragma: no cover
return f"{cls}(special_keys={special_keys!r}, section_name={section_name!r})"

@override
def read(self, path: t.Any, **kwargs: t.Any) -> JSON:
def read(self, path: t.Any, options: Optional[ReaderOptions] = None) -> JSON:
options = options or ReaderOptions()
if isinstance(path, (Path, str)):
try:
with open(path, mode="r", encoding="utf-8") as f:
sections = self._parse_ini_file(f, **kwargs)
sections = self._parse_ini_file(f, options)
except UnicodeDecodeError:
# On windows, `.ini` files may use "cp1252" encoding
with open(path, mode="r", encoding="cp1252") as f:
sections = self._parse_ini_file(f, **kwargs)
sections = self._parse_ini_file(f, options)
except FileNotFoundError:
# If the file is missing, an empty dictionary is returned.
# This is required to mimic the behavior of `configparser.ConfigParser`.
return {}

elif hasattr(path, "read"):
with path:
sections = self._parse_ini_file(path, **kwargs)
sections = self._parse_ini_file(path, options)

else: # pragma: no cover
raise TypeError(repr(type(path)))

return t.cast(JSON, sections)

def _parse_ini_file(self, ini_file: t.TextIO, **kwargs: t.Any) -> JSON:
def _parse_ini_file(self, ini_file: t.TextIO, options: ReaderOptions) -> JSON:
"""
Parse `.ini` file to JSON object.
Expand Down Expand Up @@ -242,8 +269,6 @@ def _parse_ini_file(self, ini_file: t.TextIO, **kwargs: t.Any) -> JSON:
Returns:
Dictionary of parsed `.ini` file which can be converted to JSON.
"""
ini_filter = IniFilter.from_kwargs(**kwargs)

# NOTE: This algorithm is 1.93x faster than configparser.ConfigParser
section_name = self._section_name

Expand All @@ -258,10 +283,10 @@ def _parse_ini_file(self, ini_file: t.TextIO, **kwargs: t.Any) -> JSON:
continue
elif line.startswith("["):
section_name = line[1:-1]
stop = self._handle_section(ini_filter, section_name)
stop = self._handle_section(options, section_name)
elif "=" in line:
key, value = map(str.strip, line.split("=", 1))
stop = self._handle_option(ini_filter, section_name, key, value)
stop = self._handle_option(options, section_name, key, value)
else:
raise ValueError(f"☠☠☠ Invalid line: {line!r}")

Expand All @@ -271,7 +296,7 @@ def _parse_ini_file(self, ini_file: t.TextIO, **kwargs: t.Any) -> JSON:

return self._curr_sections

def _handle_section(self, ini_filter: IniFilter, section: str) -> bool:
def _handle_section(self, ini_filter: ReaderOptions, section: str) -> bool:
# state: a new section is found
match = ini_filter.select_section_option(section)

Expand All @@ -294,29 +319,43 @@ def _append_section(self, section: str) -> None:
self._curr_section = section
self._curr_option = ""

def _handle_option(self, ini_filter: IniFilter, section: str, key: str, value: str) -> bool:
def _handle_option(self, options: ReaderOptions, section: str, key: str, value: str) -> bool:
# state: a new option is found (which may be a duplicate)
match = ini_filter.select_section_option(section, key)
match = options.select_section_option(section, key)

if self._curr_option:
if match:
self._append_option(section, key, value)
self._append_option(section, key, value, options)
return False
# prematurely stop parsing if the filter don't match
return not ini_filter.select_section_option(section)
return not options.select_section_option(section)

if match:
self._append_option(section, key, value)
self._append_option(section, key, value, options)
# continue parsing to the next option
return False

def _append_option(self, section: str, key: str, value: str) -> None:
def _get_parser(self, section: str, key: str) -> ValueParser:
if not self._value_parsers:
return _convert_value
possible_keys = [
OptionKey(section=section, key=key),
OptionKey(section=None, key=key),
]
for k in possible_keys:
if parser := self._value_parsers.get(k, None):
return parser
return _convert_value

def _append_option(self, section: str, key: str, value: str, options: ReaderOptions) -> None:
self._curr_sections.setdefault(section, {})
values = self._curr_sections[section]
parser = self._get_parser(section, key)
parsed = parser(value)
if key in self._special_keys:
values.setdefault(key, []).append(convert_value(value))
values.setdefault(key, []).append(parsed)
else:
values[key] = convert_value(value)
values[key] = parsed
self._curr_option = key


Expand All @@ -326,7 +365,7 @@ class SimpleKeyValueReader(IniReader):
"""

@override
def read(self, path: t.Any, **kwargs: t.Any) -> JSON:
def read(self, path: t.Any, options: Optional[ReaderOptions] = None) -> JSON:
"""
Parse `.ini` file which has no section to JSON object.
Expand Down
42 changes: 39 additions & 3 deletions antarest/study/storage/rawstudy/ini_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,52 @@

import ast
import configparser
import typing as t
from pathlib import Path
from typing import Callable, Dict, List, Optional, Union

from typing_extensions import override

from antarest.core.model import JSON
from antarest.study.storage.rawstudy.ini_reader import OptionKey

PrimitiveType = Union[str, int, float, bool]
ValueSerializer = Callable[[str], PrimitiveType]


def _lower_case(input: str) -> str:
return input.lower()


LOWER_CASE_SERIALIZER: ValueSerializer = _lower_case


class IniConfigParser(configparser.RawConfigParser):
def __init__(self, special_keys: t.Optional[t.List[str]] = None) -> None:
def __init__(
self,
special_keys: Optional[List[str]] = None,
value_serializers: Optional[Dict[OptionKey, ValueSerializer]] = None,
) -> None:
super().__init__()
self.special_keys = special_keys
self._value_serializers = value_serializers or {}

# noinspection SpellCheckingInspection
@override
def optionxform(self, optionstr: str) -> str:
return optionstr

def _get_serializer(self, section: str, key: str) -> Optional[ValueSerializer]:
if not self._value_serializers:
return None
possible_keys = [
OptionKey(section=section, key=key),
OptionKey(section=None, key=key),
]
for k in possible_keys:
if parser := self._value_serializers.get(k, None):
return parser
return None

def _write_line( # type:ignore
self,
delimiter,
Expand All @@ -41,6 +69,9 @@ def _write_line( # type:ignore
value = self._interpolation.before_write( # type:ignore
self, section_name, key, value
)
if self._value_serializers:
if serializer := self._get_serializer(key):
value = serializer(value)
if value is not None or not self._allow_no_value: # type:ignore
value = delimiter + str(value).replace("\n", "\n\t")
else:
Expand Down Expand Up @@ -70,8 +101,13 @@ class IniWriter:
Standard INI writer.
"""

def __init__(self, special_keys: t.Optional[t.List[str]] = None):
def __init__(
self,
special_keys: Optional[List[str]] = None,
value_serializers: Optional[Dict[OptionKey, ValueSerializer]] = None,
):
self.special_keys = special_keys
self._value_serializers = value_serializers or {}

def write(self, data: JSON, path: Path) -> None:
"""
Expand Down
Loading

0 comments on commit ca2b53f

Please sign in to comment.