Skip to content

Commit

Permalink
feat(commands): add ST-Storage commands (#1630)
Browse files Browse the repository at this point in the history
laurent-laporte-pro authored Jul 18, 2023
1 parent c1145de commit e88d4ee
Showing 24 changed files with 2,327 additions and 113 deletions.
48 changes: 25 additions & 23 deletions antarest/study/storage/rawstudy/model/filesystem/config/files.py
Original file line number Diff line number Diff line change
@@ -26,7 +26,9 @@
Link,
Simulation,
transform_name_to_id,
Storage,
)
from antarest.study.storage.rawstudy.model.filesystem.config.st_storage import (
STStorageConfig,
)
from antarest.study.storage.rawstudy.model.filesystem.root.settings.generaldata import (
DUPLICATE_KEYS,
@@ -49,35 +51,38 @@ def build(
study_path: Path, study_id: str, output_path: Optional[Path] = None
) -> "FileStudyTreeConfig":
"""
Extract data from filesystem to build config study.
Args:
study_path: study_path with files inside.
study_id: uuid of the study
output_path: output_path if not in study_path/output
Extracts data from the filesystem to build a study config.
Returns: study config fill with data
Args:
study_path: Path to the study directory or ZIP file containing the study.
study_id: UUID of the study.
output_path: Optional path for the output directory.
If not provided, it will be set to `{study_path}/output`.
Returns:
An instance of `FileStudyTreeConfig` filled with the study data.
"""
(sns, asi, enr_modelling) = _parse_parameters(study_path)
is_zip_file = study_path.suffix.lower() == ".zip"

study_path_without_zip_extension = study_path.parent / (
study_path.stem if study_path.suffix == ".zip" else study_path.name
)
# Study directory to use if the study is compressed
study_dir = study_path.with_suffix("") if is_zip_file else study_path
(sns, asi, enr_modelling) = _parse_parameters(study_path)

outputs_dir: Path = output_path or study_path / "output"
return FileStudyTreeConfig(
study_path=study_path,
output_path=output_path or study_path / "output",
path=study_path_without_zip_extension,
output_path=outputs_dir,
path=study_dir,
study_id=study_id,
version=_parse_version(study_path),
areas=_parse_areas(study_path),
sets=_parse_sets(study_path),
outputs=_parse_outputs(output_path or study_path / "output"),
outputs=_parse_outputs(outputs_dir),
bindings=_parse_bindings(study_path),
store_new_set=sns,
archive_input_series=asi,
enr_modelling=enr_modelling,
zip_path=study_path if study_path.suffix == ".zip" else None,
zip_path=study_path if is_zip_file else None,
)


@@ -359,7 +364,7 @@ def parse_area(root: Path, area: str) -> "Area":
renewables=_parse_renewables(root, area_id),
filters_synthesis=_parse_filters_synthesis(root, area_id),
filters_year=_parse_filters_year(root, area_id),
st_storage=_parse_st_storage(root, area_id),
st_storages=_parse_st_storage(root, area_id),
)


@@ -379,21 +384,18 @@ def _parse_thermal(root: Path, area: str) -> List[Cluster]:
]


def _parse_st_storage(root: Path, area: str) -> List[Storage]:
def _parse_st_storage(root: Path, area: str) -> List[STStorageConfig]:
"""
Parse the short-term storage INI file, return an empty list if missing.
"""
list_ini: Dict[str, Any] = _extract_data_from_file(
config_dict: Dict[str, Any] = _extract_data_from_file(
root=root,
inside_root_path=Path(f"input/st-storage/clusters/{area}/list.ini"),
file_type=FileType.SIMPLE_INI,
)
return [
Storage(
id=transform_name_to_id(key),
name=values.get("name", key),
)
for key, values in list_ini.items()
STStorageConfig(**dict(values, id=storage_id))
for storage_id, values in config_dict.items()
]


33 changes: 15 additions & 18 deletions antarest/study/storage/rawstudy/model/filesystem/config/model.py
Original file line number Diff line number Diff line change
@@ -3,10 +3,14 @@
from pathlib import Path
from typing import Dict, List, Optional, Set

from pydantic import Extra

from antarest.core.model import JSON
from antarest.core.utils.utils import DTO
from pydantic.main import BaseModel

from .st_storage import STStorageConfig


class ENR_MODELLING(Enum):
AGGREGATED = "aggregated"
@@ -23,15 +27,6 @@ class Cluster(BaseModel):
enabled: bool = True


class Storage(BaseModel):
"""
Short-term storage model used in Area creation
"""

id: str
name: str


class Link(BaseModel):
"""
Object linked to /input/links/<link>/properties.ini information
@@ -59,14 +54,17 @@ class Area(BaseModel):
Object linked to /input/<area>/optimization.ini information
"""

class Config:
extra = Extra.forbid

name: str
links: Dict[str, Link]
thermals: List[Cluster]
renewables: List[Cluster]
filters_synthesis: List[str]
filters_year: List[str]
# since v8.6
storages: List[Storage] = []
st_storages: List[STStorageConfig] = []


class DistrictSet(BaseModel):
@@ -143,14 +141,14 @@ def __init__(
self.study_id = study_id
self.version = version
self.output_path = output_path
self.areas = areas or dict()
self.sets = sets or dict()
self.outputs = outputs or dict()
self.bindings = bindings or list()
self.areas = areas or {}
self.sets = sets or {}
self.outputs = outputs or {}
self.bindings = bindings or []
self.store_new_set = store_new_set
self.archive_input_series = archive_input_series or list()
self.archive_input_series = archive_input_series or []
self.enr_modelling = enr_modelling
self.cache = cache or dict()
self.cache = cache or {}
self.zip_path = zip_path

def next_file(
@@ -218,8 +216,7 @@ def get_thermal_names(

def get_st_storage_names(self, area: str) -> List[str]:
return self.cache.get(
f"%st-storage%{area}",
[storage.id for storage in self.areas[area].storages],
f"%st-storage%{area}", [s.id for s in self.areas[area].st_storages]
)

def get_renewable_names(
115 changes: 115 additions & 0 deletions antarest/study/storage/rawstudy/model/filesystem/config/st_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from typing import Dict, Any

from pydantic import BaseModel, Extra, Field, root_validator

from antarest.study.business.enum_ignore_case import EnumIgnoreCase


class STStorageGroup(EnumIgnoreCase):
"""
This class defines the specific energy storage systems.
Enum values:
- PSP_OPEN: Represents an open pumped storage plant.
- PSP_CLOSED: Represents a closed pumped storage plant.
- PONDAGE: Represents a pondage storage system (reservoir storage system).
- BATTERY: Represents a battery storage system.
- OTHER: Represents other energy storage systems.
"""

PSP_OPEN = "PSP_open"
PSP_CLOSED = "PSP_closed"
PONDAGE = "Pondage"
BATTERY = "Battery"
OTHER = "Other"


# noinspection SpellCheckingInspection
class STStorageConfig(BaseModel):
"""
Manage the configuration files in the context of Short-Term Storage.
It provides a convenient way to read and write configuration data from/to an INI file format.
"""

class Config:
extra = Extra.forbid
allow_population_by_field_name = True

# The `id` field is a calculated field that is excluded when converting
# the model to a dictionary or JSON format (`model_dump`).
id: str = Field(
description="Short-term storage ID",
regex=r"[a-zA-Z0-9_(),& -]+",
exclude=True,
)
name: str = Field(
description="Short-term storage name",
regex=r"[a-zA-Z0-9_(),& -]+",
)
group: STStorageGroup = Field(
...,
description="Energy storage system group (mandatory)",
)
injection_nominal_capacity: float = Field(
0,
description="Injection nominal capacity (MW)",
ge=0,
alias="injectionnominalcapacity",
)
withdrawal_nominal_capacity: float = Field(
0,
description="Withdrawal nominal capacity (MW)",
ge=0,
alias="withdrawalnominalcapacity",
)
reservoir_capacity: float = Field(
0,
description="Reservoir capacity (MWh)",
ge=0,
alias="reservoircapacity",
)
efficiency: float = Field(
1,
description="Efficiency of the storage system",
ge=0,
le=1,
)
initial_level: float = Field(
0,
description="Initial level of the storage system",
ge=0,
alias="initiallevel",
)
initial_level_optim: bool = Field(
False,
description="Flag indicating if the initial level is optimized",
alias="initialleveloptim",
)

@root_validator(pre=True)
def calculate_storage_id(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""
Calculate the short-term storage ID based on the storage name, if not provided.
Args:
values: values used to construct the object.
Returns:
The updated values.
"""
# Avoid circular imports
from antarest.study.storage.rawstudy.model.filesystem.config.model import (
transform_name_to_id,
)

if values.get("id") or not values.get("name"):
return values
storage_name = values["name"]
if storage_id := transform_name_to_id(storage_name):
values["id"] = storage_id
else:
raise ValueError(
f"Invalid short term storage name '{storage_name}'."
)
return values
Original file line number Diff line number Diff line change
@@ -13,12 +13,12 @@
class InputSTStorageAreaStorage(FolderNode):
def build(self) -> TREE:
children: TREE = {
"PMAX-injection": InputSeriesMatrix(
"pmax_injection": InputSeriesMatrix(
self.context,
self.config.next_file("PMAX-injection.txt"),
default_empty=series.pmax_injection,
),
"PMAX-withdrawal": InputSeriesMatrix(
"pmax_withdrawal": InputSeriesMatrix(
self.context,
self.config.next_file("PMAX-withdrawal.txt"),
default_empty=series.pmax_withdrawal,
@@ -28,12 +28,12 @@ def build(self) -> TREE:
self.config.next_file("inflows.txt"),
default_empty=series.inflows,
),
"lower-rule-curve": InputSeriesMatrix(
"lower_rule_curve": InputSeriesMatrix(
self.context,
self.config.next_file("lower-rule-curve.txt"),
default_empty=series.lower_rule_curve,
),
"upper-rule-curve": InputSeriesMatrix(
"upper_rule_curve": InputSeriesMatrix(
self.context,
self.config.next_file("upper-rule-curve.txt"),
default_empty=series.upper_rule_curve,
31 changes: 31 additions & 0 deletions antarest/study/storage/variantstudy/business/command_reverter.py
Original file line number Diff line number Diff line change
@@ -33,6 +33,9 @@
from antarest.study.storage.variantstudy.model.command.create_renewables_cluster import (
CreateRenewablesCluster,
)
from antarest.study.storage.variantstudy.model.command.create_st_storage import (
CreateSTStorage,
)
from antarest.study.storage.variantstudy.model.command.icommand import ICommand
from antarest.study.storage.variantstudy.model.command.remove_area import (
RemoveArea,
@@ -52,6 +55,9 @@
from antarest.study.storage.variantstudy.model.command.remove_renewables_cluster import (
RemoveRenewablesCluster,
)
from antarest.study.storage.variantstudy.model.command.remove_st_storage import (
RemoveSTStorage,
)
from antarest.study.storage.variantstudy.model.command.replace_matrix import (
ReplaceMatrix,
)
@@ -267,6 +273,31 @@ def _revert_remove_renewables_cluster(
"The revert function for RemoveRenewablesCluster is not available"
)

@staticmethod
def _revert_create_st_storage(
base_command: CreateSTStorage,
history: List["ICommand"],
base: FileStudy,
) -> List[ICommand]:
storage_id = base_command.parameters.id
return [
RemoveSTStorage(
area_id=base_command.area_id,
storage_id=storage_id,
command_context=base_command.command_context,
)
]

@staticmethod
def _revert_remove_st_storage(
base_command: RemoveSTStorage,
history: List["ICommand"],
base: FileStudy,
) -> List[ICommand]:
raise NotImplementedError(
"The revert function for RemoveSTStorage is not available"
)

@staticmethod
def _revert_replace_matrix(
base_command: ReplaceMatrix, history: List["ICommand"], base: FileStudy
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from . import hydro, prepro, thermals, link
from . import hydro, prepro, thermals, link, st_storage
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import series
Original file line number Diff line number Diff line change
@@ -2,25 +2,23 @@
from pathlib import Path
from typing import Dict

from filelock import FileLock

from antarest.matrixstore.service import ISimpleMatrixService
from antarest.study.storage.variantstudy.business import matrix_constants
from antarest.study.storage.variantstudy.business.matrix_constants.common import (
NULL_MATRIX,
NULL_SCENARIO_MATRIX,
FIXED_4_COLUMNS,
FIXED_8_COLUMNS,
NULL_MATRIX,
NULL_SCENARIO_MATRIX,
)
from filelock import FileLock

# TODO: put index into variable
# fmt: off
HYDRO_COMMON_CAPACITY_MAX_POWER_V7 = "hydro/common/capacity/max_power/v7"
HYDRO_COMMON_CAPACITY_RESERVOIR_V7 = "hydro/common/capacity/reservoir/v7"
HYDRO_COMMON_CAPACITY_RESERVOIR_V6 = "hydro/common/capacity/reservoir/v6"
HYDRO_COMMON_CAPACITY_INFLOW_PATTERN = "hydro/common/capacity/inflow_pattern"
HYDRO_COMMON_CAPACITY_CREDIT_MODULATION = (
"hydro/common/capacity/credit_modulations"
)
HYDRO_COMMON_CAPACITY_CREDIT_MODULATION = "hydro/common/capacity/credit_modulations"
RESERVES_TS = "reserves"
MISCGEN_TS = "miscgen"
PREPRO_CONVERSION = "prepro/conversion"
@@ -33,9 +31,20 @@
LINK_INDIRECT = "link_indirect"
NULL_MATRIX_NAME = "null_matrix"
EMPTY_SCENARIO_MATRIX = "empty_scenario_matrix"
ONES_SCENARIO_MATRIX = "ones_scenario_matrix"
# fmt: on

# Short-term storage aliases
ST_STORAGE_PMAX_INJECTION = ONES_SCENARIO_MATRIX
ST_STORAGE_PMAX_WITHDRAWAL = ONES_SCENARIO_MATRIX
ST_STORAGE_LOWER_RULE_CURVE = EMPTY_SCENARIO_MATRIX
ST_STORAGE_UPPER_RULE_CURVE = ONES_SCENARIO_MATRIX
ST_STORAGE_INFLOWS = EMPTY_SCENARIO_MATRIX

MATRIX_PROTOCOL_PREFIX = "matrix://"


# noinspection SpellCheckingInspection
class GeneratorMatrixConstants:
def __init__(self, matrix_service: ISimpleMatrixService) -> None:
self.hashes: Dict[str, str] = {}
@@ -98,6 +107,11 @@ def _init(self) -> None:
self.hashes[RESERVES_TS] = self.matrix_service.create(FIXED_4_COLUMNS)
self.hashes[MISCGEN_TS] = self.matrix_service.create(FIXED_8_COLUMNS)

# Some short-term storage matrices use np.ones((8760, 1))
self.hashes[ONES_SCENARIO_MATRIX] = self.matrix_service.create(
matrix_constants.st_storage.series.pmax_injection
)

def get_hydro_max_power(self, version: int) -> str:
if version > 650:
return (
@@ -164,3 +178,25 @@ def get_default_reserves(self) -> str:

def get_default_miscgen(self) -> str:
return MATRIX_PROTOCOL_PREFIX + self.hashes[MISCGEN_TS]

# fmt: off
def get_st_storage_pmax_injection(self) -> str:
"""2D-matrix of shape (8760, 1), filled-in with ones."""
return MATRIX_PROTOCOL_PREFIX + self.hashes[ST_STORAGE_PMAX_INJECTION]

def get_st_storage_pmax_withdrawal(self) -> str:
"""2D-matrix of shape (8760, 1), filled-in with ones."""
return MATRIX_PROTOCOL_PREFIX + self.hashes[ST_STORAGE_PMAX_WITHDRAWAL]

def get_st_storage_lower_rule_curve(self) -> str:
"""2D-matrix of shape (8760, 1), filled-in with zeros."""
return MATRIX_PROTOCOL_PREFIX + self.hashes[ST_STORAGE_LOWER_RULE_CURVE]

def get_st_storage_upper_rule_curve(self) -> str:
"""2D-matrix of shape (8760, 1), filled-in with ones."""
return MATRIX_PROTOCOL_PREFIX + self.hashes[ST_STORAGE_UPPER_RULE_CURVE]

def get_st_storage_inflows(self) -> str:
"""2D-matrix of shape (8760, 1), filled-in with zeros."""
return MATRIX_PROTOCOL_PREFIX + self.hashes[ST_STORAGE_INFLOWS]
# fmt: on
10 changes: 9 additions & 1 deletion antarest/study/storage/variantstudy/command_factory.py
Original file line number Diff line number Diff line change
@@ -27,6 +27,9 @@
from antarest.study.storage.variantstudy.model.command.create_renewables_cluster import (
CreateRenewablesCluster,
)
from antarest.study.storage.variantstudy.model.command.create_st_storage import (
CreateSTStorage,
)
from antarest.study.storage.variantstudy.model.command.icommand import ICommand
from antarest.study.storage.variantstudy.model.command.remove_area import (
RemoveArea,
@@ -46,6 +49,9 @@
from antarest.study.storage.variantstudy.model.command.remove_renewables_cluster import (
RemoveRenewablesCluster,
)
from antarest.study.storage.variantstudy.model.command.remove_st_storage import (
RemoveSTStorage,
)
from antarest.study.storage.variantstudy.model.command.replace_matrix import (
ReplaceMatrix,
)
@@ -89,6 +95,8 @@
CommandName.REMOVE_THERMAL_CLUSTER.value: RemoveCluster,
CommandName.CREATE_RENEWABLES_CLUSTER.value: CreateRenewablesCluster,
CommandName.REMOVE_RENEWABLES_CLUSTER.value: RemoveRenewablesCluster,
CommandName.CREATE_ST_STORAGE.value: CreateSTStorage,
CommandName.REMOVE_ST_STORAGE.value: RemoveSTStorage,
CommandName.REPLACE_MATRIX.value: ReplaceMatrix,
CommandName.UPDATE_CONFIG.value: UpdateConfig,
CommandName.UPDATE_COMMENTS.value: UpdateComments,
@@ -154,7 +162,7 @@ def to_commands(self, cmd_dto_list: List[CommandDTO]) -> List[ICommand]:
Convert a list of CommandDTO to a list of ICommand.
Args:
cmd_dto_list: The CommandDTO objetcs to convert.
cmd_dto_list: The CommandDTO objects to convert.
Returns:
List: A list of ICommand instances.
2 changes: 2 additions & 0 deletions antarest/study/storage/variantstudy/model/command/common.py
Original file line number Diff line number Diff line change
@@ -40,6 +40,8 @@ class CommandName(Enum):
REMOVE_THERMAL_CLUSTER = "remove_cluster"
CREATE_RENEWABLES_CLUSTER = "create_renewables_cluster"
REMOVE_RENEWABLES_CLUSTER = "remove_renewables_cluster"
CREATE_ST_STORAGE = "create_st_storage"
REMOVE_ST_STORAGE = "remove_st_storage"
REPLACE_MATRIX = "replace_matrix"
UPDATE_CONFIG = "update_config"
UPDATE_COMMENTS = "update_comments"
Original file line number Diff line number Diff line change
@@ -115,6 +115,8 @@ def _apply(self, study_data: FileStudy) -> CommandOutput:
cluster_list_config = study_data.tree.get(
["input", "thermal", "clusters", self.area_id, "list"]
)
# fixme: rigorously, the section name in the INI file is the cluster ID, not the cluster name
# cluster_list_config[transform_name_to_id(self.cluster_name)] = self.parameters
cluster_list_config[self.cluster_name] = self.parameters

self.parameters["name"] = self.cluster_name
Original file line number Diff line number Diff line change
@@ -96,6 +96,8 @@ def _apply(self, study_data: FileStudy) -> CommandOutput:
# default values
if "ts-interpretation" not in self.parameters:
self.parameters["ts-interpretation"] = "power-generation"
# fixme: rigorously, the section name in the INI file is the cluster ID, not the cluster name
# cluster_list_config[transform_name_to_id(self.cluster_name)] = self.parameters
cluster_list_config[self.cluster_name] = self.parameters

self.parameters["name"] = self.cluster_name
381 changes: 381 additions & 0 deletions antarest/study/storage/variantstudy/model/command/create_st_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,381 @@
import json
from typing import Any, Dict, List, Optional, Tuple, Union, cast

import numpy as np
from antarest.core.model import JSON
from antarest.matrixstore.model import MatrixData
from antarest.study.storage.rawstudy.model.filesystem.config.model import (
Area,
FileStudyTreeConfig,
)
from antarest.study.storage.rawstudy.model.filesystem.config.st_storage import (
STStorageConfig,
)
from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy
from antarest.study.storage.variantstudy.business.matrix_constants_generator import (
GeneratorMatrixConstants,
)
from antarest.study.storage.variantstudy.business.utils import (
strip_matrix_protocol,
validate_matrix,
)
from antarest.study.storage.variantstudy.model.command.common import (
CommandName,
CommandOutput,
)
from antarest.study.storage.variantstudy.model.command.icommand import (
MATCH_SIGNATURE_SEPARATOR,
ICommand,
)
from antarest.study.storage.variantstudy.model.model import CommandDTO
from pydantic import Field, validator, Extra
from pydantic.fields import ModelField

# noinspection SpellCheckingInspection
_MATRIX_NAMES = (
"pmax_injection",
"pmax_withdrawal",
"lower_rule_curve",
"upper_rule_curve",
"inflows",
)

# Minimum required version.
REQUIRED_VERSION = 860

MatrixType = List[List[MatrixData]]


# noinspection SpellCheckingInspection
class CreateSTStorage(ICommand):
"""
Command used to create a short-terme storage in an area.
"""

class Config:
extra = Extra.forbid

# Overloaded parameters
# =====================

command_name = CommandName.CREATE_ST_STORAGE
version = 1

# Command parameters
# ==================

area_id: str = Field(description="Area ID", regex=r"[a-z0-9_(),& -]+")
parameters: STStorageConfig
pmax_injection: Optional[Union[MatrixType, str]] = Field(
None,
description="Charge capacity (modulation)",
)
pmax_withdrawal: Optional[Union[MatrixType, str]] = Field(
None,
description="Discharge capacity (modulation)",
)
lower_rule_curve: Optional[Union[MatrixType, str]] = Field(
None,
description="Lower rule curve (coefficient)",
)
upper_rule_curve: Optional[Union[MatrixType, str]] = Field(
None,
description="Upper rule curve (coefficient)",
)
inflows: Optional[Union[MatrixType, str]] = Field(
None,
description="Inflows (MW)",
)

@property
def storage_id(self) -> str:
"""The normalized version of the storage's name used as the ID."""
return self.parameters.id

@property
def storage_name(self) -> str:
"""The label representing the name of the storage for the user."""
return self.parameters.name

@validator(*_MATRIX_NAMES, always=True)
def register_matrix(
cls,
v: Optional[Union[MatrixType, str]],
values: Dict[str, Any],
field: ModelField,
) -> Optional[Union[MatrixType, str]]:
"""
Validates a matrix array or link, and store the matrix array in the matrix repository.
This method is used to validate the matrix array or link provided as input.
- If the input is `None`, it retrieves a default matrix from the
generator matrix constants.
- If the input is a string, it validates the matrix link.
- If the input is a list of lists, it validates the matrix values
and creates the corresponding matrix link.
Args:
v: The matrix array or link to be validated and registered.
values: A dictionary containing additional values used for validation.
field: The field being validated.
Returns:
The ID of the validated and stored matrix prefixed by "matrix://".
Raises:
ValueError: If the matrix has an invalid shape, contains NaN values,
or violates specific constraints.
TypeError: If the input datatype is not supported.
"""
if v is None:
# use an already-registered default matrix
constants: GeneratorMatrixConstants
constants = values["command_context"].generator_matrix_constants
# Directly access the methods instead of using `getattr` for maintainability
methods = {
"pmax_injection": constants.get_st_storage_pmax_injection,
"pmax_withdrawal": constants.get_st_storage_pmax_withdrawal,
"lower_rule_curve": constants.get_st_storage_lower_rule_curve,
"upper_rule_curve": constants.get_st_storage_upper_rule_curve,
"inflows": constants.get_st_storage_inflows,
}
method = methods[field.name]
return method()
if isinstance(v, str):
# Check the matrix link
return validate_matrix(v, values)
if isinstance(v, list):
# Check the matrix values and create the corresponding matrix link
array = np.array(v, dtype=np.float64)
if array.shape != (8760, 1):
raise ValueError(
f"Invalid matrix shape {array.shape}, expected (8760, 1)"
)
if np.isnan(array).any():
raise ValueError("Matrix values cannot contain NaN")
# All matrices except "inflows" are constrained between 0 and 1
constrained = set(_MATRIX_NAMES) - {"inflows"}
if field.name in constrained and (
np.any(array < 0) or np.any(array > 1)
):
raise ValueError("Matrix values should be between 0 and 1")
v = cast(MatrixType, array.tolist())
return validate_matrix(v, values)
# Invalid datatype
# pragma: no cover
raise TypeError(repr(v))

def _apply_config(
self, study_data: FileStudyTreeConfig
) -> Tuple[CommandOutput, Dict[str, Any]]:
"""
Applies configuration changes to the study data: add the short-term storage in the storages list.
Args:
study_data: The study data configuration.
Returns:
A tuple containing the command output and a dictionary of extra data.
On success, the dictionary of extra data is `{"storage_id": storage_id}`.
"""

# Check if the study version is above the minimum required version.
version = study_data.version
if version < REQUIRED_VERSION:
return (
CommandOutput(
status=False,
message=(
f"Invalid study version {version},"
f" at least version {REQUIRED_VERSION} is required."
),
),
{},
)

# Search the Area in the configuration
if self.area_id not in study_data.areas:
return (
CommandOutput(
status=False,
message=f"Area '{self.area_id}' does not exist in the study configuration.",
),
{},
)
area: Area = study_data.areas[self.area_id]

# Check if the short-term storage already exists in the area
if any(s.id == self.storage_id for s in area.st_storages):
return (
CommandOutput(
status=False,
message=(
f"Short-term storage '{self.storage_name}' already exists"
f" in the area '{self.area_id}'."
),
),
{},
)

# Create a new short-term storage and add it to the area
area.st_storages.append(self.parameters)

return (
CommandOutput(
status=True,
message=(
f"Short-term st_storage '{self.storage_name}' successfully added"
f" to area '{self.area_id}'."
),
),
{"storage_id": self.storage_id},
)

def _apply(self, study_data: FileStudy) -> CommandOutput:
"""
Applies the study data to update storage configurations and saves the changes.
Saves the changes made to the storage configurations.
Args:
study_data: The study data to be applied.
Returns:
The output of the command execution.
"""
output, data = self._apply_config(study_data.config)
if not output.status:
return output

# Fill-in the "list.ini" file with the parameters
config = study_data.tree.get(
["input", "st-storage", "clusters", self.area_id, "list"]
)
config[self.storage_id] = json.loads(
self.parameters.json(by_alias=True)
)

new_data: JSON = {
"input": {
"st-storage": {
"clusters": {self.area_id: {"list": config}},
"series": {
self.area_id: {
self.storage_id: {
attr: getattr(self, attr)
for attr in _MATRIX_NAMES
}
}
},
}
}
}
study_data.tree.save(new_data)

return output

def to_dto(self) -> CommandDTO:
"""
Converts the current object to a Data Transfer Object (DTO)
which is stored in the `CommandBlock` in the database.
Returns:
The DTO object representing the current command.
"""
parameters = json.loads(self.parameters.json(by_alias=True))
return CommandDTO(
action=self.command_name.value,
args={
"area_id": self.area_id,
"parameters": parameters,
**{
attr: strip_matrix_protocol(getattr(self, attr))
for attr in _MATRIX_NAMES
},
},
)

def match_signature(self) -> str:
"""Returns the command signature."""
return str(
self.command_name.value
+ MATCH_SIGNATURE_SEPARATOR
+ self.area_id
+ MATCH_SIGNATURE_SEPARATOR
+ self.storage_id
)

def match(self, other: "ICommand", equal: bool = False) -> bool:
"""
Checks if the current instance matches another `ICommand` object.
Args:
other: Another `ICommand` object to compare against.
equal: Flag indicating whether to perform a deep comparison.
Returns:
bool: `True` if the current instance matches the other object, `False` otherwise.
"""
if not isinstance(other, CreateSTStorage):
return False
if equal:
# Deep comparison
return self.__eq__(other)
else:
return (
self.area_id == other.area_id
and self.storage_id == other.storage_id
)

def _create_diff(self, other: "ICommand") -> List["ICommand"]:
"""
Creates a list of commands representing the differences between
the current instance and another `ICommand` object.
Args:
other: Another ICommand object to compare against.
Returns:
A list of commands representing the differences between
the two `ICommand` objects.
"""
from antarest.study.storage.variantstudy.model.command.replace_matrix import (
ReplaceMatrix,
)
from antarest.study.storage.variantstudy.model.command.update_config import (
UpdateConfig,
)

other = cast(CreateSTStorage, other)
commands: List[ICommand] = [
ReplaceMatrix(
target=f"input/st-storage/series/{self.area_id}/{self.storage_id}/{attr}",
matrix=strip_matrix_protocol(getattr(other, attr)),
command_context=self.command_context,
)
for attr in _MATRIX_NAMES
if getattr(self, attr) != getattr(other, attr)
]
if self.parameters != other.parameters:
data: Dict[str, Any] = json.loads(
other.parameters.json(by_alias=True)
)
commands.append(
UpdateConfig(
target=f"input/st-storage/clusters/{self.area_id}/list/{self.storage_id}",
data=data,
command_context=self.command_context,
)
)
return commands

def get_inner_matrices(self) -> List[str]:
"""
Retrieves the list of matrix IDs.
"""
matrices: List[str] = [
strip_matrix_protocol(getattr(self, attr))
for attr in _MATRIX_NAMES
]
return matrices
169 changes: 169 additions & 0 deletions antarest/study/storage/variantstudy/model/command/remove_st_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
from typing import Any, Dict, Tuple, List

from antarest.study.storage.rawstudy.model.filesystem.config.model import (
Area,
FileStudyTreeConfig,
)
from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy
from antarest.study.storage.variantstudy.model.command.common import (
CommandName,
CommandOutput,
)
from antarest.study.storage.variantstudy.model.command.icommand import (
ICommand,
MATCH_SIGNATURE_SEPARATOR,
)
from pydantic import Field

from antarest.study.storage.variantstudy.model.model import CommandDTO

# minimum required version.
REQUIRED_VERSION = 860


class RemoveSTStorage(ICommand):
"""
Command used to remove a short-terme storage from an area.
"""

area_id: str = Field(description="Area ID", regex=r"[a-z0-9_(),& -]+")
storage_id: str = Field(
description="Short term storage ID",
regex=r"[a-z0-9_(),& -]+",
)

def __init__(self, **data: Any) -> None:
super().__init__(
command_name=CommandName.REMOVE_ST_STORAGE, version=1, **data
)

def _apply_config(
self, study_data: FileStudyTreeConfig
) -> Tuple[CommandOutput, Dict[str, Any]]:
"""
Applies configuration changes to the study data: remove the storage from the storages list.
Args:
study_data: The study data configuration.
Returns:
A tuple containing the command output and a dictionary of extra data.
On success, the dictionary is empty.
"""
# Check if the study version is above the minimum required version.
version = study_data.version
if version < REQUIRED_VERSION:
return (
CommandOutput(
status=False,
message=(
f"Invalid study version {version},"
f" at least version {REQUIRED_VERSION} is required."
),
),
{},
)

# Search the Area in the configuration
if self.area_id not in study_data.areas:
return (
CommandOutput(
status=False,
message=(
f"Area '{self.area_id}' does not exist"
f" in the study configuration."
),
),
{},
)
area: Area = study_data.areas[self.area_id]

# Search the Short term storage in the area
for st_storage in area.st_storages:
if st_storage.id == self.storage_id:
break
else:
return (
CommandOutput(
status=False,
message=(
f"Short term storage '{self.storage_id}' does not exist"
f" in the area '{self.area_id}'."
),
),
{},
)

# Remove the Short term storage from the configuration
area.st_storages.remove(st_storage)

return (
CommandOutput(
status=True,
message=(
f"Short term storage '{self.storage_id}' removed"
f" from the area '{self.area_id}'."
),
),
{},
)

def _apply(self, study_data: FileStudy) -> CommandOutput:
"""
Applies the study data to update storage configurations and saves the changes:
remove the storage from the configuration and remove the attached time series.
Args:
study_data: The study data to be applied.
Returns:
The output of the command execution.
"""
# It is required to delete the files and folders that correspond to the short-term storage
# BEFORE updating the configuration, as we need the configuration to do so.
# Specifically, deleting the time series uses the list of short-term storages from the configuration.
# fmt: off
paths = [
["input", "st-storage", "clusters", self.area_id, "list", self.storage_id],
["input", "st-storage", "series", self.area_id, self.storage_id],
]
# fmt: on
for path in paths:
study_data.tree.delete(path)
# Deleting the short-term storage in the configuration must be done AFTER
# deleting the files and folders.
return self._apply_config(study_data.config)[0]

def to_dto(self) -> CommandDTO:
"""
Converts the current object to a Data Transfer Object (DTO)
which is stored in the `CommandBlock` in the database.
Returns:
The DTO object representing the current command.
"""
return CommandDTO(
action=self.command_name.value,
args={"area_id": self.area_id, "storage_id": self.storage_id},
)

def match_signature(self) -> str:
"""Returns the command signature."""
return str(
self.command_name.value
+ MATCH_SIGNATURE_SEPARATOR
+ self.area_id
+ MATCH_SIGNATURE_SEPARATOR
+ self.storage_id
)

def match(self, other: "ICommand", equal: bool = False) -> bool:
# always perform a deep comparison, as there are no parameters
# or matrices, so that shallow and deep comparisons are identical.
return self.__eq__(other)

def _create_diff(self, other: "ICommand") -> List["ICommand"]:
return []

def get_inner_matrices(self) -> List[str]:
return []
315 changes: 285 additions & 30 deletions docs/user-guide/2-variant_manager.md

Large diffs are not rendered by default.

Empty file.
240 changes: 240 additions & 0 deletions tests/integration/variant_blueprint/test_st_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
import http
from unittest.mock import ANY

import numpy as np
import pytest
from starlette.testclient import TestClient

from antarest.core.tasks.model import TaskStatus
from antarest.study.storage.rawstudy.model.filesystem.config.model import (
transform_name_to_id,
)
from tests.integration.utils import wait_task_completion


@pytest.mark.integration_test
class TestSTStorage:
"""
This unit test is designed to demonstrate the creation, modification of properties and
updating of matrices, and the deletion of one or more short-term storages.
"""

# noinspection SpellCheckingInspection
def test_lifecycle(
self,
client: TestClient,
user_access_token: str,
study_id: str,
):
# =======================
# Study version upgrade
# =======================

# We have an "old" study that we need to upgrade to version 860
min_study_version = 860
res = client.put(
f"/v1/studies/{study_id}/upgrade",
headers={"Authorization": f"Bearer {user_access_token}"},
params={"target_version": min_study_version},
)
res.raise_for_status()
task_id = res.json()
task = wait_task_completion(client, user_access_token, task_id)
assert task.status == TaskStatus.COMPLETED, task

# We can check that the study is upgraded to the required version
res = client.get(
f"/v1/studies/{study_id}",
headers={"Authorization": f"Bearer {user_access_token}"},
)
res.raise_for_status()
assert res.json() == {
"id": study_id,
"name": "STA-mini",
"version": min_study_version,
"created": ANY, # ISO8601 Date/time
"updated": ANY, # ISO8601 Date/time
"type": "rawstudy",
"owner": {"id": None, "name": ANY},
"groups": [],
"public_mode": "FULL",
"workspace": "ext",
"managed": False,
"archived": False,
"horizon": "2030",
"scenario": None,
"status": None,
"doc": None,
"folder": "STA-mini",
"tags": [],
}

# Here is the list of available areas
res = client.get(
f"/v1/studies/{study_id}/areas",
headers={"Authorization": f"Bearer {user_access_token}"},
)
res.raise_for_status()
areas = res.json()
area_ids = {a["id"] for a in areas if a["type"] == "AREA"}
assert area_ids == {"es", "it", "de", "fr"}

# =============================================
# Short-Term Storage Creation w/o Time Series
# =============================================

# First, we will define a short-term storage in the geographical
# area "FR" called "Siemens Battery" with the bellow arguments.
# We will use the default values for the time series:
# - `pmax_injection`: Charge capacity,
# - `pmax_withdrawal`: Discharge capacity,
# - `lower_rule_curve`: Lower rule curve,
# - `upper_rule_curve`: Upper rule curve,
# - `inflows`: Inflows
area_id = transform_name_to_id("FR")
siemens_battery = "Siemens Battery"
args = {
"area_id": area_id,
"parameters": {
"name": siemens_battery,
"group": "Battery",
"injection_nominal_capacity": 150,
"withdrawal_nominal_capacity": 150,
"reservoir_capacity": 600,
"efficiency": 0.94,
"initial_level_optim": True,
},
}
res = client.post(
f"/v1/studies/{study_id}/commands",
headers={"Authorization": f"Bearer {user_access_token}"},
json=[{"action": "create_st_storage", "args": args}],
)
res.raise_for_status()

# =======================================
# Short-Term Storage Time Series Update
# =======================================

# Then, it is possible to update a time series.
# For instance, we want to initialize the `inflows` time series
# with random values (for this demo).
# To do that, we can use the `replace_matrix` command like bellow:
siemens_battery_id = transform_name_to_id(siemens_battery)
inflows = np.random.randint(0, 1001, size=(8760, 1))
args1 = {
"target": f"input/st-storage/series/{area_id}/{siemens_battery_id}/inflows",
"matrix": inflows.tolist(),
}
pmax_injection = np.random.rand(8760, 1)
args2 = {
"target": f"input/st-storage/series/{area_id}/{siemens_battery_id}/pmax_injection",
"matrix": pmax_injection.tolist(),
}
res = client.post(
f"/v1/studies/{study_id}/commands",
headers={"Authorization": f"Bearer {user_access_token}"},
json=[
{"action": "replace_matrix", "args": args1},
{"action": "replace_matrix", "args": args2},
],
)
res.raise_for_status()

# ==============================================
# Short-Term Storage Creation with Time Series
# ==============================================

# Another way to create a Short-Term Storage is by providing
# both the parameters and the time series arrays.
# Here is an example where we populate some arrays with random values.
pmax_injection = np.random.rand(8760, 1)
pmax_withdrawal = np.random.rand(8760, 1)
inflows = np.random.randint(0, 1001, size=(8760, 1))
grand_maison = "Grand'Maison"
args = {
"area_id": area_id,
"parameters": {
"name": grand_maison,
"group": "PSP_closed",
"injectionnominalcapacity": 1500,
"withdrawalnominalcapacity": 1800,
"reservoircapacity": 20000,
"efficiency": 0.78,
"initiallevel": 10000,
},
"pmax_injection": pmax_injection.tolist(),
"pmax_withdrawal": pmax_withdrawal.tolist(),
"inflows": inflows.tolist(),
}
res = client.post(
f"/v1/studies/{study_id}/commands",
headers={"Authorization": f"Bearer {user_access_token}"},
json=[{"action": "create_st_storage", "args": args}],
)
res.raise_for_status()

# ============================
# Short-Term Storage Removal
# ============================

# The `remove_st_storage` command allows you to delete a Short-Term Storage.
args = {"area_id": area_id, "storage_id": siemens_battery_id}
res = client.post(
f"/v1/studies/{study_id}/commands",
headers={"Authorization": f"Bearer {user_access_token}"},
json=[{"action": "remove_st_storage", "args": args}],
)
res.raise_for_status()

# =======================================
# Parameters and Time Series Validation
# =======================================

# When creating a Short-Term Storage, both the validity of the parameters
# (value type and valid range) and the validity of the time series
# (value range) are checked.
# In the example below, multiple parameters are invalid, and one matrix contains
# values outside the valid range. Upon executing the request, an HTTP 422
# error occurs, and a response specifies the invalid values.
pmax_injection = np.random.rand(8760, 1)
pmax_withdrawal = np.random.rand(8760, 1) * 10 # Oops!
inflows = np.random.randint(0, 1001, size=(8760, 1))
args = {
"area_id": area_id,
"parameters": {
"name": "Bad Storage",
"group": "Wonderland", # Oops!
"injection_nominal_capacity": -2000, # Oops!
"withdrawal_nominal_capacity": 1500,
"reservoir_capacity": 20000,
"efficiency": 0.78,
"initial_level": 10000,
"initial_level_optim": "BlurBool", # Oops!
},
"pmax_injection": pmax_injection.tolist(),
"pmax_withdrawal": pmax_withdrawal.tolist(),
"inflows": inflows.tolist(),
}
res = client.post(
f"/v1/studies/{study_id}/commands",
headers={"Authorization": f"Bearer {user_access_token}"},
json=[{"action": "create_st_storage", "args": args}],
)
assert res.status_code == http.HTTPStatus.UNPROCESSABLE_ENTITY
description = res.json()["description"]
"""
4 validation errors for CreateSTStorage
parameters -> group
value is not a valid enumeration member […]
parameters -> injectionnominalcapacity
ensure this value is greater than or equal to 0 (type=value_error.number.not_ge; limit_value=0)
parameters -> initialleveloptim
value could not be parsed to a boolean (type=type_error.bool)
pmax_withdrawal
Matrix values should be between 0 and 1 (type=value_error)
"""
assert "parameters -> group" in description
assert "parameters -> injectionnominalcapacity" in description
assert "parameters -> initialleveloptim" in description
assert "pmax_withdrawal" in description
75 changes: 54 additions & 21 deletions tests/storage/repository/filesystem/config/test_config_files.py
Original file line number Diff line number Diff line change
@@ -20,7 +20,10 @@
DistrictSet,
Cluster,
BindingConstraintDTO,
Storage,
)
from antarest.study.storage.rawstudy.model.filesystem.config.st_storage import (
STStorageConfig,
STStorageGroup,
)
from tests.storage.business.assets import ASSETS_DIR

@@ -283,29 +286,59 @@ def test_parse_thermal(tmp_path: Path) -> None:
]


def test_parse_st_storage(tmp_path: Path) -> None:
study_path = build_empty_files(tmp_path)
study_path.joinpath("input", "st-storage", "clusters", "fr").mkdir(
parents=True
)
content = """
[t1]
name = t1
[t2]
name = t2
# noinspection SpellCheckingInspection
ST_STORAGE_LIST_INI = """\
[siemens battery]
name = Siemens Battery
group = Battery
injectionnominalcapacity = 150.0
withdrawalnominalcapacity = 150.0
reservoircapacity = 600.0
efficiency = 0.94
initiallevel = 0
initialleveloptim = True
[grand maison]
name = Grand'Maison
group = PSP_closed
injectionnominalcapacity = 1500.0
withdrawalnominalcapacity = 1800.0
reservoircapacity = 20000.0
efficiency = 0.78
initiallevel = 10000.0
initialleveloptim = False
"""

[t3]
name = t3
"""
study_path.joinpath(
"input", "st-storage", "clusters", "fr", "list.ini"
).write_text(content)

def test_parse_st_storage(tmp_path: Path) -> None:
study_path = build_empty_files(tmp_path)
config_dir = study_path.joinpath("input", "st-storage", "clusters", "fr")
config_dir.mkdir(parents=True)
config_dir.joinpath("list.ini").write_text(ST_STORAGE_LIST_INI)
# noinspection SpellCheckingInspection
assert _parse_st_storage(study_path, "fr") == [
Storage(id="t1", name="t1"),
Storage(id="t2", name="t2"),
Storage(id="t3", name="t3"),
STStorageConfig(
id="siemens battery",
name="Siemens Battery",
group=STStorageGroup.BATTERY,
injection_nominal_capacity=150.0,
withdrawal_nominal_capacity=150.0,
reservoir_capacity=600.0,
efficiency=0.94,
initial_level=0.0,
initial_level_optim=True,
),
STStorageConfig(
id="grand maison",
name="Grand'Maison",
group=STStorageGroup.PSP_CLOSED,
injection_nominal_capacity=1500.0,
withdrawal_nominal_capacity=1800.0,
reservoir_capacity=20000.0,
efficiency=0.78,
initial_level=10000.0,
initial_level_optim=False,
),
]


Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import numpy as np
from antarest.matrixstore.service import SimpleMatrixService
from antarest.study.storage.variantstudy.business import matrix_constants
from antarest.study.storage.variantstudy.business.matrix_constants_generator import (
MATRIX_PROTOCOL_PREFIX,
GeneratorMatrixConstants,
)


class TestGeneratorMatrixConstants:
def test_get_st_storage(self, tmp_path):
generator = GeneratorMatrixConstants(
matrix_service=SimpleMatrixService(bucket_dir=tmp_path)
)

ref1 = generator.get_st_storage_pmax_injection()
matrix_id1 = ref1.split(MATRIX_PROTOCOL_PREFIX)[1]
matrix_dto1 = generator.matrix_service.get(matrix_id1)
assert (
np.array(matrix_dto1.data).all()
== matrix_constants.st_storage.series.pmax_injection.all()
)

ref2 = generator.get_st_storage_pmax_withdrawal()
matrix_id2 = ref2.split(MATRIX_PROTOCOL_PREFIX)[1]
matrix_dto2 = generator.matrix_service.get(matrix_id2)
assert (
np.array(matrix_dto2.data).all()
== matrix_constants.st_storage.series.pmax_withdrawal.all()
)

ref3 = generator.get_st_storage_lower_rule_curve()
matrix_id3 = ref3.split(MATRIX_PROTOCOL_PREFIX)[1]
matrix_dto3 = generator.matrix_service.get(matrix_id3)
assert (
np.array(matrix_dto3.data).all()
== matrix_constants.st_storage.series.lower_rule_curve.all()
)

ref4 = generator.get_st_storage_upper_rule_curve()
matrix_id4 = ref4.split(MATRIX_PROTOCOL_PREFIX)[1]
matrix_dto4 = generator.matrix_service.get(matrix_id4)
assert (
np.array(matrix_dto4.data).all()
== matrix_constants.st_storage.series.upper_rule_curve.all()
)

ref5 = generator.get_st_storage_inflows()
matrix_id5 = ref5.split(MATRIX_PROTOCOL_PREFIX)[1]
matrix_dto5 = generator.matrix_service.get(matrix_id5)
assert (
np.array(matrix_dto5.data).all()
== matrix_constants.st_storage.series.inflows.all()
)
534 changes: 534 additions & 0 deletions tests/variantstudy/model/command/test_create_st_storage.py

Large diffs are not rendered by default.

259 changes: 259 additions & 0 deletions tests/variantstudy/model/command/test_remove_st_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
import re

import pytest
from antarest.study.storage.rawstudy.model.filesystem.config.model import (
transform_name_to_id,
)
from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy
from antarest.study.storage.study_upgrader import upgrade_study
from antarest.study.storage.variantstudy.model.command.common import (
CommandName,
)
from antarest.study.storage.variantstudy.model.command.create_area import (
CreateArea,
)
from antarest.study.storage.variantstudy.model.command.create_st_storage import (
CreateSTStorage,
)
from antarest.study.storage.variantstudy.model.command.remove_st_storage import (
REQUIRED_VERSION,
RemoveSTStorage,
)
from antarest.study.storage.variantstudy.model.command_context import (
CommandContext,
)
from antarest.study.storage.variantstudy.model.model import CommandDTO
from pydantic import ValidationError


@pytest.fixture(name="recent_study")
def recent_study_fixture(empty_study: FileStudy) -> FileStudy:
"""
Fixture for creating a recent version of the FileStudy object.
Args:
empty_study: The empty FileStudy object used as model.
Returns:
FileStudy: The FileStudy object upgraded to the required version.
"""
upgrade_study(empty_study.config.study_path, str(REQUIRED_VERSION))
empty_study.config.version = REQUIRED_VERSION
return empty_study


# The parameter names to be used are those in the INI file.
# Non-string values are automatically converted into strings.
# noinspection SpellCheckingInspection
PARAMETERS = {
"name": "Storage1",
"group": "Battery",
"injectionnominalcapacity": 1500,
"withdrawalnominalcapacity": 1500,
"reservoircapacity": 20000,
"efficiency": 0.94,
"initialleveloptim": True,
}


class TestRemoveSTStorage:
# noinspection SpellCheckingInspection
def test_init(self, command_context: CommandContext):
cmd = RemoveSTStorage(
command_context=command_context,
area_id="area_fr",
storage_id="storage_1",
)

# Check the attribues
assert cmd.command_name == CommandName.REMOVE_ST_STORAGE
assert cmd.version == 1
assert cmd.command_context == command_context
assert cmd.area_id == "area_fr"
assert cmd.storage_id == "storage_1"

def test_init__invalid_storage_id(
self, recent_study: FileStudy, command_context: CommandContext
):
# When we apply the config for a new ST Storage with a bad name
with pytest.raises(ValidationError) as ctx:
RemoveSTStorage(
command_context=command_context,
area_id="dummy",
storage_id="?%$$", # bad name
)
assert ctx.value.errors() == [
{
"ctx": {"pattern": "[a-z0-9_(),& -]+"},
"loc": ("storage_id",),
"msg": 'string does not match regex "[a-z0-9_(),& -]+"',
"type": "value_error.str.regex",
}
]

def test_apply_config__invalid_version(
self, empty_study: FileStudy, command_context: CommandContext
):
# Given an old study in version 720
# When we apply the config to add a new ST Storage
remove_st_storage = RemoveSTStorage(
command_context=command_context,
area_id="foo",
storage_id="bar",
)
command_output = remove_st_storage.apply_config(empty_study.config)

# Then, the output should be an error
assert command_output.status is False
assert re.search(
rf"Invalid.*version {empty_study.config.version}",
command_output.message,
flags=re.IGNORECASE,
)

def test_apply_config__missing_area(
self, recent_study: FileStudy, command_context: CommandContext
):
# Given a study without "unknown area" area
# When we apply the config to add a new ST Storage
remove_st_storage = RemoveSTStorage(
command_context=command_context,
area_id="unknown area", # bad ID
storage_id="storage_1",
)
command_output = remove_st_storage.apply_config(recent_study.config)

# Then, the output should be an error
assert command_output.status is False
assert re.search(
rf"'{re.escape(remove_st_storage.area_id)}'.*does not exist",
command_output.message,
flags=re.IGNORECASE,
)

def test_apply_config__missing_storage(
self, recent_study: FileStudy, command_context: CommandContext
):
# First, prepare a new Area
create_area = CreateArea(
command_context=command_context,
area_name="Area FR",
)
create_area.apply(recent_study)

# Then, apply the config for a new ST Storage
remove_st_storage = RemoveSTStorage(
command_context=command_context,
area_id=transform_name_to_id(create_area.area_name),
storage_id="storage 1",
)
command_output = remove_st_storage.apply_config(recent_study.config)

# Then, the output should be an error
assert command_output.status is False
assert re.search(
rf"'{re.escape(remove_st_storage.storage_id)}'.*does not exist",
command_output.message,
flags=re.IGNORECASE,
)

def test_apply_config__nominal_case(
self, recent_study: FileStudy, command_context: CommandContext
):
# First, prepare a new Area
create_area = CreateArea(
area_name="Area FR",
command_context=command_context,
)
create_area.apply(recent_study)

# Then, prepare a new Storage
create_st_storage = CreateSTStorage(
command_context=command_context,
area_id=transform_name_to_id(create_area.area_name),
parameters=PARAMETERS, # type: ignore
)
create_st_storage.apply(recent_study)

# Then, apply the config for a new ST Storage
remove_st_storage = RemoveSTStorage(
command_context=command_context,
area_id=transform_name_to_id(create_area.area_name),
storage_id=create_st_storage.storage_id,
)
command_output = remove_st_storage.apply_config(recent_study.config)

# Check the command output and extra dict
assert command_output.status is True
assert re.search(
rf"'{re.escape(remove_st_storage.storage_id)}'.*removed",
command_output.message,
flags=re.IGNORECASE,
)

def test_to_dto(self, command_context: CommandContext):
cmd = RemoveSTStorage(
command_context=command_context,
area_id="area_fr",
storage_id="storage_1",
)
actual = cmd.to_dto()

# noinspection SpellCheckingInspection
assert actual == CommandDTO(
action=CommandName.REMOVE_ST_STORAGE.value,
args={"area_id": "area_fr", "storage_id": "storage_1"},
)

def test_match_signature(self, command_context: CommandContext):
cmd = RemoveSTStorage(
command_context=command_context,
area_id="area_fr",
storage_id="storage_1",
)
assert cmd.match_signature() == "remove_st_storage%area_fr%storage_1"

@pytest.mark.parametrize("area_id", ["area_fr", "area_en"])
@pytest.mark.parametrize("storage_id", ["storage_1", "storage_2"])
def test_match(
self,
command_context: CommandContext,
area_id,
storage_id,
):
cmd1 = RemoveSTStorage(
command_context=command_context,
area_id="area_fr",
storage_id="storage_1",
)
cmd2 = RemoveSTStorage(
command_context=command_context,
area_id=area_id,
storage_id=storage_id,
)
is_equal = area_id == cmd1.area_id and storage_id == cmd1.storage_id
assert cmd1.match(cmd2, equal=False) == is_equal
assert cmd1.match(cmd2, equal=True) == is_equal

def test_create_diff(self, command_context: CommandContext):
cmd = RemoveSTStorage(
command_context=command_context,
area_id="area_fr",
storage_id="storage_1",
)
other = RemoveSTStorage(
command_context=command_context,
area_id=cmd.area_id,
storage_id=cmd.storage_id,
)
actual = cmd.create_diff(other)
assert not actual

def test_get_inner_matrices(self, command_context: CommandContext):
cmd = RemoveSTStorage(
command_context=command_context,
area_id="area_fr",
storage_id="storage_1",
)
actual = cmd.get_inner_matrices()
assert actual == []
109 changes: 101 additions & 8 deletions tests/variantstudy/test_command_factory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import importlib
import itertools
import pkgutil
from unittest.mock import Mock

@@ -8,7 +9,6 @@
from antarest.study.storage.variantstudy.business.matrix_constants_generator import (
GeneratorMatrixConstants,
)
from antarest.study.storage.variantstudy.business.utils import remove_none_args
from antarest.study.storage.variantstudy.command_factory import CommandFactory
from antarest.study.storage.variantstudy.model.command.common import (
CommandName,
@@ -320,6 +320,88 @@ def setup_class(self):
}
},
),
CommandDTO(
action=CommandName.CREATE_ST_STORAGE.value,
args={
"area_id": "area 1",
"parameters": {
"name": "Storage 1",
"group": "Battery",
"injectionnominalcapacity": 0,
"withdrawalnominalcapacity": 0,
"reservoircapacity": 0,
"efficiency": 1,
"initiallevel": 0,
"initialleveloptim": False,
},
"pmax_injection": "matrix://59ea6c83-6348-466d-9530-c35c51ca4c37",
"pmax_withdrawal": "matrix://5f988548-dadc-4bbb-8ce8-87a544dbf756",
"lower_rule_curve": "matrix://8ce4fcea-cc97-4d2c-b641-a27a53454612",
"upper_rule_curve": "matrix://8ce614c8-c687-41af-8b24-df8a49cc52af",
"inflows": "matrix://df9b25e1-e3f7-4a57-8182-0ff9791439e5",
},
),
CommandDTO(
action=CommandName.CREATE_ST_STORAGE.value,
args=[
{
"area_id": "area 1",
"parameters": {
"efficiency": 1,
"group": "Battery",
"initiallevel": 0,
"initialleveloptim": False,
"injectionnominalcapacity": 0,
"name": "Storage 1",
"reservoircapacity": 0,
"withdrawalnominalcapacity": 0,
},
"pmax_injection": "matrix://59ea6c83-6348-466d-9530-c35c51ca4c37",
"pmax_withdrawal": "matrix://5f988548-dadc-4bbb-8ce8-87a544dbf756",
"lower_rule_curve": "matrix://8ce4fcea-cc97-4d2c-b641-a27a53454612",
"upper_rule_curve": "matrix://8ce614c8-c687-41af-8b24-df8a49cc52af",
"inflows": "matrix://df9b25e1-e3f7-4a57-8182-0ff9791439e5",
},
{
"area_id": "area 1",
"parameters": {
"efficiency": 0.94,
"group": "Battery",
"initiallevel": 0,
"initialleveloptim": False,
"injectionnominalcapacity": 0,
"name": "Storage 2",
"reservoircapacity": 0,
"withdrawalnominalcapacity": 0,
},
"pmax_injection": "matrix://3f5b3746-3995-49b7-a6da-622633472e05",
"pmax_withdrawal": "matrix://4b64a31f-927b-4887-b4cd-adcddd39bdcd",
"lower_rule_curve": "matrix://16c7c3ae-9824-4ef2-aa68-51145884b025",
"upper_rule_curve": "matrix://9a6104e9-990a-415f-a6e2-57507e13b58c",
"inflows": "matrix://e8923768-9bdd-40c2-a6ea-2da2523be727",
},
],
),
CommandDTO(
action=CommandName.REMOVE_ST_STORAGE.value,
args={
"area_id": "area 1",
"storage_id": "storage 1",
},
),
CommandDTO(
action=CommandName.REMOVE_ST_STORAGE.value,
args=[
{
"area_id": "area 1",
"storage_id": "storage 1",
},
{
"area_id": "area 1",
"storage_id": "storage 2",
},
],
),
],
)
@pytest.mark.unit_test
@@ -329,17 +411,28 @@ def test_command_factory(self, command_dto: CommandDTO):
matrix_service=Mock(spec=MatrixService),
patch_service=Mock(spec=PatchService),
)
command_list = command_factory.to_command(command_dto=command_dto)
commands = command_factory.to_command(command_dto=command_dto)

# fmt: off
if isinstance(args := command_dto.args, dict):
assert len(command_list) == 1
assert remove_none_args(command_list[0].to_dto()) == command_dto
exp_action_args_list = [(command_dto.action, command_dto.args)]
else:
assert len(command_list) == len(args)
exp_action_args_list = [(command_dto.action, args) for args in command_dto.args]
# fmt: on

for command in command_list:
assert command.command_name.value == command_dto.action
# fmt: off
actual_cmd: ICommand
for actual_cmd, exp_action_args in itertools.zip_longest(commands, exp_action_args_list):
assert actual_cmd is not None, f"Missing action/args for {exp_action_args=}"
assert exp_action_args is not None, f"Missing command for {actual_cmd=}"
expected_action, expected_args = exp_action_args
actual_dto = actual_cmd.to_dto()
actual_args = {k:v for k,v in actual_dto.args.items() if v is not None}
assert actual_dto.action == expected_action
assert actual_args == expected_args
# fmt: on

self.command_class_set.discard(type(command_list[0]).__name__)
self.command_class_set.discard(type(commands[0]).__name__)

def teardown_class(self):
# Check that all command classes have been tested

0 comments on commit e88d4ee

Please sign in to comment.