diff --git a/antarest/core/filesystem_blueprint.py b/antarest/core/filesystem_blueprint.py index 10145d744e..2016cd47cf 100644 --- a/antarest/core/filesystem_blueprint.py +++ b/antarest/core/filesystem_blueprint.py @@ -203,7 +203,7 @@ async def from_path(cls, full_path: Path, *, details: bool = False) -> "FileInfo return obj -async def _calc_details(full_path: t.Union[str, Path]) -> t.Tuple[int, int]: +async def _calc_details(full_path: str | Path) -> t.Tuple[int, int]: """Calculate the number of files and the total size of a directory recursively.""" full_path = Path(full_path) diff --git a/antarest/core/permissions.py b/antarest/core/permissions.py index 9bf073e809..b26c2c51b4 100644 --- a/antarest/core/permissions.py +++ b/antarest/core/permissions.py @@ -20,7 +20,7 @@ logger = logging.getLogger(__name__) -permission_matrix: t.Dict[str, t.Dict[str, t.Sequence[t.Union[RoleType, PublicMode]]]] = { +permission_matrix: t.Dict[str, t.Dict[str, t.Sequence[RoleType | PublicMode]]] = { StudyPermissionType.READ.value: { "roles": [ RoleType.ADMIN, diff --git a/antarest/core/serialization/__init__.py b/antarest/core/serialization/__init__.py index 5591290ce8..4cc9ed550a 100644 --- a/antarest/core/serialization/__init__.py +++ b/antarest/core/serialization/__init__.py @@ -22,7 +22,7 @@ # Since pydantic v2 is written in RUST it's way faster. -def from_json(data: t.Union[str, bytes, bytearray]) -> t.Dict[str, t.Any]: +def from_json(data: str | bytes | bytearray) -> t.Dict[str, t.Any]: return ADAPTER.validate_json(data) # type: ignore diff --git a/antarest/core/tasks/service.py b/antarest/core/tasks/service.py index 4ada902c58..22c70e0d42 100644 --- a/antarest/core/tasks/service.py +++ b/antarest/core/tasks/service.py @@ -69,7 +69,7 @@ def add_worker_task( self, task_type: TaskType, task_queue: str, - task_args: t.Dict[str, t.Union[int, float, bool, str]], + task_args: t.Dict[str, int | float | bool | str], name: t.Optional[str], ref_id: t.Optional[str], request_params: RequestParameters, @@ -178,7 +178,7 @@ def _create_worker_task( self, task_id: str, task_type: str, - task_args: t.Dict[str, t.Union[int, float, bool, str]], + task_args: t.Dict[str, int | float | bool | str], ) -> Task: task_result_wrapper: t.List[TaskResult] = [] @@ -227,7 +227,7 @@ def add_worker_task( self, task_type: TaskType, task_queue: str, - task_args: t.Dict[str, t.Union[int, float, bool, str]], + task_args: t.Dict[str, int | float | bool | str], name: t.Optional[str], ref_id: t.Optional[str], request_params: RequestParameters, diff --git a/antarest/launcher/model.py b/antarest/launcher/model.py index df5c9cb903..25c12f711c 100644 --- a/antarest/launcher/model.py +++ b/antarest/launcher/model.py @@ -39,7 +39,7 @@ class LauncherParametersDTO(AntaresBaseModel): nb_cpu: t.Optional[int] = None post_processing: bool = False time_limit: int = 240 * 3600 # Default value set to 240 hours (in seconds) - xpansion: t.Union[XpansionParametersDTO, bool, None] = None + xpansion: XpansionParametersDTO | bool | None = None xpansion_r_version: bool = False archive_output: bool = True auto_unzip: bool = True diff --git a/antarest/matrixstore/model.py b/antarest/matrixstore/model.py index b117e12e1f..e63dda439a 100644 --- a/antarest/matrixstore/model.py +++ b/antarest/matrixstore/model.py @@ -237,8 +237,8 @@ class MatrixContent(AntaresBaseModel): """ data: t.List[t.List[MatrixData]] - index: t.List[t.Union[int, str]] - columns: t.List[t.Union[int, str]] + index: t.List[int | str] + columns: t.List[int | str] class MatrixDataSetUpdateDTO(AntaresBaseModel): diff --git a/antarest/matrixstore/repository.py b/antarest/matrixstore/repository.py index 4b8aa7840e..853b812908 100644 --- a/antarest/matrixstore/repository.py +++ b/antarest/matrixstore/repository.py @@ -181,7 +181,7 @@ def exists(self, matrix_hash: str) -> bool: matrix_file = self.bucket_dir.joinpath(f"{matrix_hash}.tsv") return matrix_file.exists() - def save(self, content: t.Union[t.List[t.List[MatrixData]], npt.NDArray[np.float64]]) -> str: + def save(self, content: t.List[t.List[MatrixData]] | npt.NDArray[np.float64]) -> str: """ Saves the content of a matrix as a TSV file in the bucket directory and returns its SHA256 hash. diff --git a/antarest/matrixstore/service.py b/antarest/matrixstore/service.py index ca69f40fa2..50e5d254c7 100644 --- a/antarest/matrixstore/service.py +++ b/antarest/matrixstore/service.py @@ -72,7 +72,7 @@ def __init__(self, matrix_content_repository: MatrixContentRepository) -> None: self.matrix_content_repository = matrix_content_repository @abstractmethod - def create(self, data: t.Union[t.List[t.List[MatrixData]], npt.NDArray[np.float64]]) -> str: + def create(self, data: t.List[t.List[MatrixData]] | npt.NDArray[np.float64]) -> str: raise NotImplementedError() @abstractmethod @@ -87,7 +87,7 @@ def exists(self, matrix_id: str) -> bool: def delete(self, matrix_id: str) -> None: raise NotImplementedError() - def get_matrix_id(self, matrix: t.Union[t.List[t.List[float]], str]) -> str: + def get_matrix_id(self, matrix: t.List[t.List[float]] | str) -> str: """ Get the matrix ID from a matrix or a matrix link. @@ -114,7 +114,7 @@ def __init__(self, matrix_content_repository: MatrixContentRepository): super().__init__(matrix_content_repository=matrix_content_repository) @override - def create(self, data: t.Union[t.List[t.List[MatrixData]], npt.NDArray[np.float64]]) -> str: + def create(self, data: t.List[t.List[MatrixData]] | npt.NDArray[np.float64]) -> str: return self.matrix_content_repository.save(data) @override @@ -171,7 +171,7 @@ def _from_dto(dto: MatrixDTO) -> t.Tuple[Matrix, MatrixContent]: return matrix, content @override - def create(self, data: t.Union[t.List[t.List[MatrixData]], npt.NDArray[np.float64]]) -> str: + def create(self, data: t.List[t.List[MatrixData]] | npt.NDArray[np.float64]) -> str: """ Creates a new matrix object with the specified data. diff --git a/antarest/study/business/aggregator_management.py b/antarest/study/business/aggregator_management.py index 6ee055bf2f..83a44ddc8f 100644 --- a/antarest/study/business/aggregator_management.py +++ b/antarest/study/business/aggregator_management.py @@ -146,7 +146,7 @@ def __init__( self, study_path: Path, output_id: str, - query_file: t.Union[MCIndAreasQueryFile, MCAllAreasQueryFile, MCIndLinksQueryFile, MCAllLinksQueryFile], + query_file: MCIndAreasQueryFile | MCAllAreasQueryFile | MCIndLinksQueryFile | MCAllLinksQueryFile, frequency: MatrixFrequency, ids_to_consider: t.Sequence[str], columns_names: t.Sequence[str], diff --git a/antarest/study/business/areas/st_storage_management.py b/antarest/study/business/areas/st_storage_management.py index dceeda6956..12769da8b5 100644 --- a/antarest/study/business/areas/st_storage_management.py +++ b/antarest/study/business/areas/st_storage_management.py @@ -584,7 +584,7 @@ def duplicate_cluster(self, study: Study, area_id: str, source_id: str, new_clus ] # Prepare and execute commands - commands: t.List[t.Union[CreateSTStorage, ReplaceMatrix]] = [create_cluster_cmd] + commands: t.List[CreateSTStorage | ReplaceMatrix] = [create_cluster_cmd] storage_service = self.storage_service.get_storage(study) command_context = self.storage_service.variant_study_service.command_factory.command_context for source_path, new_path in zip(source_paths, new_paths): diff --git a/antarest/study/business/areas/thermal_management.py b/antarest/study/business/areas/thermal_management.py index c78ab6217f..811f5765d0 100644 --- a/antarest/study/business/areas/thermal_management.py +++ b/antarest/study/business/areas/thermal_management.py @@ -464,7 +464,7 @@ def duplicate_cluster( new_paths.append(f"input/thermal/series/{area_id}/{lower_new_id}/fuelCost") # Prepare and execute commands - commands: t.List[t.Union[CreateCluster, ReplaceMatrix]] = [create_cluster_cmd] + commands: t.List[CreateCluster | ReplaceMatrix] = [create_cluster_cmd] storage_service = self.storage_service.get_storage(study) command_context = self.storage_service.variant_study_service.command_factory.command_context for source_path, new_path in zip(source_paths, new_paths): diff --git a/antarest/study/business/binding_constraint_management.py b/antarest/study/business/binding_constraint_management.py index e18fcc0564..6c66f04001 100644 --- a/antarest/study/business/binding_constraint_management.py +++ b/antarest/study/business/binding_constraint_management.py @@ -145,7 +145,7 @@ class ConstraintTerm(AntaresBaseModel): id: t.Optional[str] = None weight: t.Optional[float] = None offset: t.Optional[int] = None - data: t.Optional[t.Union[LinkTerm, ClusterTerm]] = None + data: t.Optional[LinkTerm | ClusterTerm] = None @field_validator("id") def id_to_lower(cls, v: t.Optional[str]) -> t.Optional[str]: @@ -348,7 +348,7 @@ class ConstraintOutput870(ConstraintOutput830): # WARNING: Do not change the order of the following line, it is used to determine # the type of the output constraint in the FastAPI endpoint. -ConstraintOutput = t.Union[ConstraintOutputBase, ConstraintOutput830, ConstraintOutput870] +ConstraintOutput = ConstraintOutputBase | ConstraintOutput830 | ConstraintOutput870 OPERATOR_MATRIX_FILE_MAP = { BindingConstraintOperator.EQUAL: ["{bc_id}_eq"], @@ -1177,7 +1177,7 @@ def _replace_matrices_according_to_frequency_and_version( def check_attributes_coherence( - data: t.Union[ConstraintCreation, ConstraintInput], + data: ConstraintCreation | ConstraintInput, study_version: StudyVersion, operator: BindingConstraintOperator, ) -> None: diff --git a/antarest/study/business/scenario_builder_management.py b/antarest/study/business/scenario_builder_management.py index 0ea15dd6d8..8ce3b9345d 100644 --- a/antarest/study/business/scenario_builder_management.py +++ b/antarest/study/business/scenario_builder_management.py @@ -31,7 +31,7 @@ _HYDRO_LEVEL_PERCENT = 100 -_Section: te.TypeAlias = t.MutableMapping[str, t.Union[int, float]] +_Section: te.TypeAlias = t.MutableMapping[str, int | float] _Sections: te.TypeAlias = t.MutableMapping[str, _Section] Ruleset: te.TypeAlias = t.MutableMapping[str, t.Any] @@ -92,11 +92,11 @@ def _get_ruleset_config( file_study: FileStudy, ruleset_name: str, symbol: str = "", -) -> t.Dict[str, t.Union[int, float]]: +) -> t.Dict[str, int | float]: try: suffix = f"/{symbol}" if symbol else "" url = f"settings/scenariobuilder/{ruleset_name}{suffix}".split("/") - ruleset_cfg = t.cast(t.Dict[str, t.Union[int, float]], file_study.tree.get(url)) + ruleset_cfg = t.cast(t.Dict[str, int | float], file_study.tree.get(url)) except KeyError: ruleset_cfg = {} return ruleset_cfg diff --git a/antarest/study/business/xpansion_management.py b/antarest/study/business/xpansion_management.py index 7464de5ec5..4450db44b7 100644 --- a/antarest/study/business/xpansion_management.py +++ b/antarest/study/business/xpansion_management.py @@ -703,7 +703,7 @@ def get_resource_content( study: Study, resource_type: XpansionResourceFileType, filename: str, - ) -> t.Union[JSON, bytes]: + ) -> JSON | bytes: logger.info(f"Getting xpansion {resource_type} resource file '{filename}' from study '{study.id}'") file_study = self.study_storage_service.get_storage(study).get_raw(study) return file_study.tree.get(self._raw_file_dir(resource_type) + [filename]) diff --git a/antarest/study/common/studystorage.py b/antarest/study/common/studystorage.py index 221857c339..3bffa67f0e 100644 --- a/antarest/study/common/studystorage.py +++ b/antarest/study/common/studystorage.py @@ -116,7 +116,7 @@ def patch_update_study_metadata(self, study: T, metadata: StudyMetadataPatchDTO) def import_output( self, study: T, - output: t.Union[t.BinaryIO, Path], + output: t.BinaryIO | Path, output_name: t.Optional[str] = None, ) -> t.Optional[str]: """ diff --git a/antarest/study/model.py b/antarest/study/model.py index d2ff8b97b9..4dbacf2ec9 100644 --- a/antarest/study/model.py +++ b/antarest/study/model.py @@ -431,7 +431,7 @@ class StudyMetadataDTO(AntaresBaseModel): tags: t.List[str] = [] @field_validator("horizon", mode="before") - def transform_horizon_to_str(cls, val: t.Union[str, int, None]) -> t.Optional[str]: + def transform_horizon_to_str(cls, val: str | int | None) -> t.Optional[str]: # horizon can be an int. return str(val) if val else val # type: ignore diff --git a/antarest/study/repository.py b/antarest/study/repository.py index a904124d4d..1cb08da490 100644 --- a/antarest/study/repository.py +++ b/antarest/study/repository.py @@ -59,7 +59,7 @@ class AccessPermissions(AntaresBaseModel, frozen=True, extra="forbid"): user_groups: t.Sequence[str] = () @classmethod - def from_params(cls, params: t.Union[RequestParameters, JWTUser]) -> "AccessPermissions": + def from_params(cls, params: RequestParameters | JWTUser) -> "AccessPermissions": """ This function makes it easier to pass on user ids and groups into the repository filtering function by extracting the associated `AccessPermissions` object. diff --git a/antarest/study/service.py b/antarest/study/service.py index 31bb80fa1f..0aff0459ae 100644 --- a/antarest/study/service.py +++ b/antarest/study/service.py @@ -181,7 +181,7 @@ MAX_MISSING_STUDY_TIMEOUT = 2 # days -def get_disk_usage(path: t.Union[str, Path]) -> int: +def get_disk_usage(path: str | Path) -> int: """Calculate the total disk usage (in bytes) of a study in a compressed file or directory.""" path = Path(path) if is_archive_format(path.suffix.lower()): @@ -213,7 +213,7 @@ def _imports_matrix_from_bytes(data: bytes) -> npt.NDArray[np.float64]: def _get_path_inside_user_folder( - path: str, exception_class: t.Type[t.Union[FolderCreationNotAllowed, ResourceDeletionNotAllowed]] + path: str, exception_class: t.Type[FolderCreationNotAllowed | ResourceDeletionNotAllowed] ) -> str: """ Retrieves the path inside the `user` folder for a given user path @@ -498,7 +498,7 @@ def aggregate_output_data( self, uuid: str, output_id: str, - query_file: t.Union[MCIndAreasQueryFile, MCAllAreasQueryFile, MCIndLinksQueryFile, MCAllLinksQueryFile], + query_file: MCIndAreasQueryFile | MCAllAreasQueryFile | MCIndLinksQueryFile | MCAllLinksQueryFile, frequency: MatrixFrequency, columns_names: t.Sequence[str], ids_to_consider: t.Sequence[str], @@ -594,7 +594,7 @@ def save_logs( ) stopwatch.log_elapsed(lambda d: logger.info(f"Saved logs for job {job_id} in {d}s")) - def get_comments(self, study_id: str, params: RequestParameters) -> t.Union[str, JSON]: + def get_comments(self, study_id: str, params: RequestParameters) -> str | JSON: """ Get the comments of a study. @@ -1360,7 +1360,7 @@ def download_outputs( filetype: ExportFormat, params: RequestParameters, tmp_export_file: t.Optional[Path] = None, - ) -> t.Union[Response, FileDownloadTaskDTO, FileResponse]: + ) -> Response | FileDownloadTaskDTO | FileResponse: """ Download outputs Args: @@ -1551,7 +1551,7 @@ def import_study( def import_output( self, uuid: str, - output: t.Union[t.BinaryIO, Path], + output: t.BinaryIO | Path, params: RequestParameters, output_name_suffix: t.Optional[str] = None, auto_unzip: bool = True, @@ -1896,7 +1896,7 @@ def get_all_areas( area_type: t.Optional[AreaType], ui: bool, params: RequestParameters, - ) -> t.Union[t.List[AreaInfoDTO], t.Dict[str, t.Any]]: + ) -> t.List[AreaInfoDTO] | t.Dict[str, t.Any]: study = self.get_study(uuid) assert_permission(params.user, study, StudyPermissionType.READ) return self.areas.get_all_areas_ui_info(study) if ui else self.areas.get_all_areas(study, area_type) @@ -2817,9 +2817,9 @@ def create_user_folder(self, study_id: str, path: str, current_user: JWTUser) -> def _alter_user_folder( self, study_id: str, - command_data: t.Union[CreateUserResourceData, RemoveUserResourceData], - command_class: t.Type[t.Union[CreateUserResource, RemoveUserResource]], - exception_class: t.Type[t.Union[FolderCreationNotAllowed, ResourceDeletionNotAllowed]], + command_data: CreateUserResourceData | RemoveUserResourceData, + command_class: t.Type[CreateUserResource | RemoveUserResource], + exception_class: t.Type[FolderCreationNotAllowed | ResourceDeletionNotAllowed], current_user: JWTUser, ) -> None: study = self.get_study(study_id) diff --git a/antarest/study/storage/abstract_storage_service.py b/antarest/study/storage/abstract_storage_service.py index 69d9ba6a27..d5937600d2 100644 --- a/antarest/study/storage/abstract_storage_service.py +++ b/antarest/study/storage/abstract_storage_service.py @@ -254,7 +254,7 @@ def get_study_sim_result( def import_output( self, metadata: T, - output: t.Union[t.BinaryIO, Path], + output: t.BinaryIO | Path, output_name: t.Optional[str] = None, ) -> t.Optional[str]: """ diff --git a/antarest/study/storage/df_download.py b/antarest/study/storage/df_download.py index acb9471802..e0f2c25f75 100644 --- a/antarest/study/storage/df_download.py +++ b/antarest/study/storage/df_download.py @@ -71,7 +71,7 @@ def suffix(self) -> str: def export_table( self, df: pd.DataFrame, - export_path: t.Union[str, Path], + export_path: str | Path, *, with_index: bool = True, with_header: bool = True, diff --git a/antarest/study/storage/patch_service.py b/antarest/study/storage/patch_service.py index f98870e240..146c285ccb 100644 --- a/antarest/study/storage/patch_service.py +++ b/antarest/study/storage/patch_service.py @@ -30,7 +30,7 @@ class PatchService: def __init__(self, repository: t.Optional[StudyMetadataRepository] = None): self.repository = repository - def get(self, study: t.Union[RawStudy, VariantStudy], get_from_file: bool = False) -> Patch: + def get(self, study: RawStudy | VariantStudy, get_from_file: bool = False) -> Patch: if not get_from_file and study.additional_data is not None: # the `study.additional_data.patch` field is optional if study.additional_data.patch: @@ -52,7 +52,7 @@ def get_from_filestudy(self, file_study: FileStudy) -> Patch: def set_reference_output( self, - study: t.Union[RawStudy, VariantStudy], + study: RawStudy | VariantStudy, output_id: str, status: bool = True, ) -> None: @@ -63,7 +63,7 @@ def set_reference_output( patch.outputs = PatchOutputs(reference=output_id) self.save(study, patch) - def save(self, study: t.Union[RawStudy, VariantStudy], patch: Patch) -> None: + def save(self, study: RawStudy | VariantStudy, patch: Patch) -> None: if self.repository: study.additional_data = study.additional_data or StudyAdditionalData() study.additional_data.patch = patch.model_dump_json() diff --git a/antarest/study/storage/rawstudy/ini_reader.py b/antarest/study/storage/rawstudy/ini_reader.py index e003878e7c..5fb613b80a 100644 --- a/antarest/study/storage/rawstudy/ini_reader.py +++ b/antarest/study/storage/rawstudy/ini_reader.py @@ -21,13 +21,13 @@ from antarest.core.model import JSON -def convert_value(value: str) -> t.Union[str, int, float, bool]: +def convert_value(value: str) -> str | int | float | bool: """Convert value to the appropriate type for JSON.""" try: # Infinity values are not supported by JSON, so we use a string instead. mapping = {"true": True, "false": False, "+inf": "+Inf", "-inf": "-Inf", "inf": "+Inf"} - return t.cast(t.Union[str, int, float, bool], mapping[value.lower()]) + return t.cast(str | int | float | bool, mapping[value.lower()]) except KeyError: try: return int(value) @@ -56,8 +56,8 @@ def from_kwargs( cls, section: str = "", option: str = "", - section_regex: t.Optional[t.Union[str, t.Pattern[str]]] = None, - option_regex: t.Optional[t.Union[str, t.Pattern[str]]] = None, + section_regex: t.Optional[str | t.Pattern[str]] = None, + option_regex: t.Optional[str | t.Pattern[str]] = None, **_unused: t.Any, # ignore unknown options ) -> "IniFilter": """ diff --git a/antarest/study/storage/rawstudy/model/filesystem/config/ruleset_matrices.py b/antarest/study/storage/rawstudy/model/filesystem/config/ruleset_matrices.py index 47a7568142..7cce665366 100644 --- a/antarest/study/storage/rawstudy/model/filesystem/config/ruleset_matrices.py +++ b/antarest/study/storage/rawstudy/model/filesystem/config/ruleset_matrices.py @@ -31,15 +31,15 @@ "hgp": "hydro-generation-power", } -_Value: te.TypeAlias = t.Union[int, float] +_Value: te.TypeAlias = int | float _SimpleScenario: te.TypeAlias = pd.DataFrame _ClusterScenario: te.TypeAlias = t.MutableMapping[str, pd.DataFrame] -_Scenario: te.TypeAlias = t.Union[_SimpleScenario, _ClusterScenario] +_Scenario: te.TypeAlias = _SimpleScenario | _ClusterScenario _ScenarioMapping: te.TypeAlias = t.MutableMapping[str, _Scenario] -SimpleTableForm: te.TypeAlias = t.Dict[str, t.Dict[str, t.Union[int, float, str, None]]] +SimpleTableForm: te.TypeAlias = t.Dict[str, t.Dict[str, int | float | str | None]] ClusterTableForm: te.TypeAlias = t.Dict[str, SimpleTableForm] -TableForm: te.TypeAlias = t.Union[SimpleTableForm, ClusterTableForm] +TableForm: te.TypeAlias = SimpleTableForm | ClusterTableForm _AREA_RELATED_SYMBOLS = "l", "h", "w", "s", "hgp" _BINDING_CONSTRAINTS_RELATED_SYMBOLS = ("bc",) @@ -315,7 +315,7 @@ def to_percent(v: t.Any) -> _Value: raise NotImplementedError(f"Unknown symbol {symbol}") return scenario_rules - def get_table_form(self, scenario_type: str, *, nan_value: t.Union[str, None] = "") -> TableForm: + def get_table_form(self, scenario_type: str, *, nan_value: str | None = "") -> TableForm: """ Get the scenario matrices in table form for the frontend. @@ -366,7 +366,7 @@ def set_table_form( table_form: TableForm, scenario_type: str, *, - nan_value: t.Union[str, None] = "", + nan_value: str | None = "", ) -> None: """ Set the scenario matrix from table form data, for a specific scenario type. diff --git a/antarest/study/storage/rawstudy/model/filesystem/config/st_storage.py b/antarest/study/storage/rawstudy/model/filesystem/config/st_storage.py index 0d21455783..a2bb0ee714 100644 --- a/antarest/study/storage/rawstudy/model/filesystem/config/st_storage.py +++ b/antarest/study/storage/rawstudy/model/filesystem/config/st_storage.py @@ -160,7 +160,7 @@ class STStorage880Config(STStorage880Properties, LowerCaseIdentifier): # NOTE: In the following Union, it is important to place the older version first, # because otherwise, creating a short term storage always creates a v8.8 one. -STStorageConfigType = t.Union[STStorageConfig, STStorage880Config] +STStorageConfigType = STStorageConfig | STStorage880Config def get_st_storage_config_cls(study_version: StudyVersion) -> t.Type[STStorageConfigType]: diff --git a/antarest/study/storage/rawstudy/model/filesystem/config/thermal.py b/antarest/study/storage/rawstudy/model/filesystem/config/thermal.py index 87f20514f4..b5c46853fd 100644 --- a/antarest/study/storage/rawstudy/model/filesystem/config/thermal.py +++ b/antarest/study/storage/rawstudy/model/filesystem/config/thermal.py @@ -408,7 +408,7 @@ class Thermal870Config(Thermal870Properties, IgnoreCaseIdentifier): # NOTE: In the following Union, it is important to place the most specific type first, # because the type matching generally occurs sequentially from left to right within the union. -ThermalConfigType = t.Union[Thermal870Config, Thermal860Config, ThermalConfig] +ThermalConfigType = Thermal870Config | Thermal860Config | ThermalConfig def get_thermal_config_cls(study_version: StudyVersion) -> t.Type[ThermalConfigType]: diff --git a/antarest/study/storage/rawstudy/model/filesystem/folder_node.py b/antarest/study/storage/rawstudy/model/filesystem/folder_node.py index cfcebe9019..81409574f4 100644 --- a/antarest/study/storage/rawstudy/model/filesystem/folder_node.py +++ b/antarest/study/storage/rawstudy/model/filesystem/folder_node.py @@ -61,7 +61,7 @@ def _forward_get( depth: int = -1, formatted: bool = True, get_node: bool = False, - ) -> t.Union[JSON, INode[JSON, SUB_JSON, JSON]]: + ) -> JSON | INode[JSON, SUB_JSON, JSON]: children = self.build() names, sub_url = self.extract_child(children, url) @@ -91,7 +91,7 @@ def _forward_get( def _expand_get( self, depth: int = -1, formatted: bool = True, get_node: bool = False - ) -> t.Union[JSON, INode[JSON, SUB_JSON, JSON]]: + ) -> JSON | INode[JSON, SUB_JSON, JSON]: if get_node: return self @@ -110,7 +110,7 @@ def _get( depth: int = -1, formatted: bool = True, get_node: bool = False, - ) -> t.Union[JSON, INode[JSON, SUB_JSON, JSON]]: + ) -> JSON | INode[JSON, SUB_JSON, JSON]: if url and url != [""]: return self._forward_get(url, depth, formatted, get_node) else: diff --git a/antarest/study/storage/rawstudy/model/filesystem/ini_file_node.py b/antarest/study/storage/rawstudy/model/filesystem/ini_file_node.py index 960ccc83e4..099e3376d5 100644 --- a/antarest/study/storage/rawstudy/model/filesystem/ini_file_node.py +++ b/antarest/study/storage/rawstudy/model/filesystem/ini_file_node.py @@ -95,7 +95,7 @@ def _get( depth: int = -1, expanded: bool = False, get_node: bool = False, - ) -> t.Union[SUB_JSON, INode[SUB_JSON, SUB_JSON, JSON]]: + ) -> SUB_JSON | INode[SUB_JSON, SUB_JSON, JSON]: if get_node: return self diff --git a/antarest/study/storage/rawstudy/model/filesystem/json_file_node.py b/antarest/study/storage/rawstudy/model/filesystem/json_file_node.py index 0ff675c650..719b5ec1e8 100644 --- a/antarest/study/storage/rawstudy/model/filesystem/json_file_node.py +++ b/antarest/study/storage/rawstudy/model/filesystem/json_file_node.py @@ -32,7 +32,7 @@ class JsonReader(IReader): @override def read(self, path: t.Any, **kwargs: t.Any) -> JSON: - content: t.Union[str, bytes] + content: str | bytes if isinstance(path, (Path, str)): try: diff --git a/antarest/study/storage/rawstudy/model/filesystem/lazy_node.py b/antarest/study/storage/rawstudy/model/filesystem/lazy_node.py index 2a9204d71e..ba3a9c841b 100644 --- a/antarest/study/storage/rawstudy/model/filesystem/lazy_node.py +++ b/antarest/study/storage/rawstudy/model/filesystem/lazy_node.py @@ -76,7 +76,7 @@ def _get( expanded: bool = False, formatted: bool = True, get_node: bool = False, - ) -> t.Union[t.Union[str, G], INode[G, S, V]]: + ) -> str | G | INode[G, S, V]: self._assert_url_end(url) if get_node: @@ -101,7 +101,7 @@ def get( depth: int = -1, expanded: bool = False, formatted: bool = True, - ) -> t.Union[str, G]: + ) -> str | G: output = self._get(url, depth, expanded, formatted, get_node=False) assert not isinstance(output, INode) return output @@ -128,7 +128,7 @@ def get_link_path(self) -> Path: return path @override - def save(self, data: t.Union[str, bytes, S], url: t.Optional[t.List[str]] = None) -> None: + def save(self, data: str | bytes | S, url: t.Optional[t.List[str]] = None) -> None: self._assert_not_in_zipped_file() self._assert_url_end(url) diff --git a/antarest/study/storage/rawstudy/model/filesystem/root/settings/scenariobuilder.py b/antarest/study/storage/rawstudy/model/filesystem/root/settings/scenariobuilder.py index d384433216..69937ea4d2 100644 --- a/antarest/study/storage/rawstudy/model/filesystem/root/settings/scenariobuilder.py +++ b/antarest/study/storage/rawstudy/model/filesystem/root/settings/scenariobuilder.py @@ -29,7 +29,7 @@ _TSNumber: te.TypeAlias = int _HydroLevel: te.TypeAlias = float -_Rules = t.MutableMapping[str, t.Union[t.Type[_TSNumber], t.Type[_HydroLevel]]] +_Rules = t.MutableMapping[str, t.Type[_TSNumber] | t.Type[_HydroLevel]] class ScenarioBuilder(IniFileNode): diff --git a/antarest/study/storage/utils.py b/antarest/study/storage/utils.py index e8336cc1d1..bd08ee76a4 100644 --- a/antarest/study/storage/utils.py +++ b/antarest/study/storage/utils.py @@ -214,7 +214,7 @@ def study_match(study: StudyMetadataDTO) -> bool: def assert_permission_on_studies( user: t.Optional[JWTUser], - studies: t.Sequence[t.Union[Study, StudyMetadataDTO]], + studies: t.Sequence[Study | StudyMetadataDTO], permission_type: StudyPermissionType, *, raising: bool = True, @@ -254,7 +254,7 @@ def assert_permission_on_studies( def assert_permission( user: t.Optional[JWTUser], - study: t.Optional[t.Union[Study, StudyMetadataDTO]], + study: t.Optional[Study | StudyMetadataDTO], permission_type: StudyPermissionType, raising: bool = True, ) -> bool: diff --git a/antarest/study/storage/variantstudy/business/utils.py b/antarest/study/storage/variantstudy/business/utils.py index 84fb848056..eb82ddebb0 100644 --- a/antarest/study/storage/variantstudy/business/utils.py +++ b/antarest/study/storage/variantstudy/business/utils.py @@ -22,7 +22,7 @@ from antarest.study.storage.variantstudy.model.model import CommandDTO -def validate_matrix(matrix: t.Union[t.List[t.List[MatrixData]], str], values: t.Dict[str, t.Any]) -> str: +def validate_matrix(matrix: t.List[t.List[MatrixData]] | str, values: t.Dict[str, t.Any]) -> str: """ Validates the matrix, stores the matrix array in the matrices repository, and returns a reference to the stored array. @@ -75,7 +75,7 @@ def remove_none_args(command_dto: CommandDTO) -> CommandDTO: return command_dto -def strip_matrix_protocol(matrix_uri: t.Union[t.List[t.List[float]], str, None]) -> str: +def strip_matrix_protocol(matrix_uri: t.List[t.List[float]] | str | None) -> str: assert isinstance(matrix_uri, str) if matrix_uri.startswith(MATRIX_PROTOCOL_PREFIX): return matrix_uri[len(MATRIX_PROTOCOL_PREFIX) :] diff --git a/antarest/study/storage/variantstudy/business/utils_binding_constraint.py b/antarest/study/storage/variantstudy/business/utils_binding_constraint.py index 399df46636..6c4d8c6eac 100644 --- a/antarest/study/storage/variantstudy/business/utils_binding_constraint.py +++ b/antarest/study/storage/variantstudy/business/utils_binding_constraint.py @@ -22,7 +22,7 @@ def parse_bindings_coeffs_and_save_into_config( bd_id: str, study_data_config: FileStudyTreeConfig, - coeffs: t.Mapping[str, t.Union[t.Literal["hourly", "daily", "weekly"], t.Sequence[float]]], + coeffs: t.Mapping[str, t.Literal["hourly", "daily", "weekly"] | t.Sequence[float]], operator: BindingConstraintOperator, time_step: BindingConstraintFrequency, group: str, diff --git a/antarest/study/storage/variantstudy/model/command/create_binding_constraint.py b/antarest/study/storage/variantstudy/model/command/create_binding_constraint.py index 766934c8ee..02185483d8 100644 --- a/antarest/study/storage/variantstudy/model/command/create_binding_constraint.py +++ b/antarest/study/storage/variantstudy/model/command/create_binding_constraint.py @@ -122,11 +122,9 @@ class BindingConstraintProperties870(BindingConstraintProperties830): group: str = DEFAULT_GROUP -BindingConstraintProperties = t.Union[ - BindingConstraintPropertiesBase, - BindingConstraintProperties830, - BindingConstraintProperties870, -] +BindingConstraintProperties = ( + BindingConstraintPropertiesBase | BindingConstraintProperties830 | BindingConstraintProperties870 +) def get_binding_constraint_config_cls(study_version: StudyVersion) -> t.Type[BindingConstraintProperties]: @@ -173,27 +171,27 @@ class BindingConstraintMatrices(AntaresBaseModel, extra="forbid", populate_by_na Class used to store the matrices of a binding constraint. """ - values: t.Optional[t.Union[MatrixType, str]] = Field( + values: t.Optional[MatrixType | str] = Field( default=None, description="2nd member matrix for studies before v8.7", ) - less_term_matrix: t.Optional[t.Union[MatrixType, str]] = Field( + less_term_matrix: t.Optional[MatrixType | str] = Field( default=None, description="less term matrix for v8.7+ studies", ) - greater_term_matrix: t.Optional[t.Union[MatrixType, str]] = Field( + greater_term_matrix: t.Optional[MatrixType | str] = Field( default=None, description="greater term matrix for v8.7+ studies", ) - equal_term_matrix: t.Optional[t.Union[MatrixType, str]] = Field( + equal_term_matrix: t.Optional[MatrixType | str] = Field( default=None, description="equal term matrix for v8.7+ studies", ) @model_validator(mode="before") def check_matrices( - cls, values: t.Dict[str, t.Optional[t.Union[MatrixType, str]]] - ) -> t.Dict[str, t.Optional[t.Union[MatrixType, str]]]: + cls, values: t.Dict[str, t.Optional[MatrixType | str]] + ) -> t.Dict[str, t.Optional[MatrixType | str]]: values_matrix = values.get("values") or None less_term_matrix = values.get("less_term_matrix") or None greater_term_matrix = values.get("greater_term_matrix") or None @@ -263,7 +261,7 @@ def get_inner_matrices(self) -> t.List[str]: def get_corresponding_matrices( self, - v: t.Optional[t.Union[MatrixType, str]], + v: t.Optional[MatrixType | str], time_step: BindingConstraintFrequency, version: StudyVersion, create: bool, diff --git a/antarest/study/storage/variantstudy/model/command/create_cluster.py b/antarest/study/storage/variantstudy/model/command/create_cluster.py index c8f42fb60a..997fd2acf8 100644 --- a/antarest/study/storage/variantstudy/model/command/create_cluster.py +++ b/antarest/study/storage/variantstudy/model/command/create_cluster.py @@ -50,8 +50,8 @@ class CreateCluster(ICommand): area_id: str cluster_name: str parameters: t.Dict[str, t.Any] - prepro: t.Optional[t.Union[t.List[t.List[MatrixData]], str]] = Field(None, validate_default=True) - modulation: t.Optional[t.Union[t.List[t.List[MatrixData]], str]] = Field(None, validate_default=True) + prepro: t.Optional[t.List[t.List[MatrixData]] | str] = Field(None, validate_default=True) + modulation: t.Optional[t.List[t.List[MatrixData]] | str] = Field(None, validate_default=True) @field_validator("cluster_name", mode="before") def validate_cluster_name(cls, val: str) -> str: @@ -63,9 +63,9 @@ def validate_cluster_name(cls, val: str) -> str: @field_validator("prepro", mode="before") def validate_prepro( cls, - v: t.Optional[t.Union[t.List[t.List[MatrixData]], str]], - values: t.Union[t.Dict[str, t.Any], ValidationInfo], - ) -> t.Optional[t.Union[t.List[t.List[MatrixData]], str]]: + v: t.Optional[t.List[t.List[MatrixData]] | str], + values: t.Dict[str, t.Any] | ValidationInfo, + ) -> t.Optional[t.List[t.List[MatrixData]] | str]: new_values = values if isinstance(values, dict) else values.data if v is None: v = new_values["command_context"].generator_matrix_constants.get_thermal_prepro_data() @@ -76,9 +76,9 @@ def validate_prepro( @field_validator("modulation", mode="before") def validate_modulation( cls, - v: t.Optional[t.Union[t.List[t.List[MatrixData]], str]], - values: t.Union[t.Dict[str, t.Any], ValidationInfo], - ) -> t.Optional[t.Union[t.List[t.List[MatrixData]], str]]: + v: t.Optional[t.List[t.List[MatrixData]] | str], + values: t.Dict[str, t.Any] | ValidationInfo, + ) -> t.Optional[t.List[t.List[MatrixData]] | str]: new_values = values if isinstance(values, dict) else values.data if v is None: v = new_values["command_context"].generator_matrix_constants.get_thermal_prepro_modulation() diff --git a/antarest/study/storage/variantstudy/model/command/create_st_storage.py b/antarest/study/storage/variantstudy/model/command/create_st_storage.py index a3c16195ad..1ff12ec36c 100644 --- a/antarest/study/storage/variantstudy/model/command/create_st_storage.py +++ b/antarest/study/storage/variantstudy/model/command/create_st_storage.py @@ -61,23 +61,23 @@ class CreateSTStorage(ICommand): area_id: str = Field(description="Area ID", pattern=r"[a-z0-9_(),& -]+") parameters: STStorageConfigType - pmax_injection: t.Optional[t.Union[MatrixType, str]] = Field( + pmax_injection: t.Optional[MatrixType | str] = Field( default=None, description="Charge capacity (modulation)", ) - pmax_withdrawal: t.Optional[t.Union[MatrixType, str]] = Field( + pmax_withdrawal: t.Optional[MatrixType | str] = Field( default=None, description="Discharge capacity (modulation)", ) - lower_rule_curve: t.Optional[t.Union[MatrixType, str]] = Field( + lower_rule_curve: t.Optional[MatrixType | str] = Field( default=None, description="Lower rule curve (coefficient)", ) - upper_rule_curve: t.Optional[t.Union[MatrixType, str]] = Field( + upper_rule_curve: t.Optional[MatrixType | str] = Field( default=None, description="Upper rule curve (coefficient)", ) - inflows: t.Optional[t.Union[MatrixType, str]] = Field( + inflows: t.Optional[MatrixType | str] = Field( default=None, description="Inflows (MW)", ) @@ -94,8 +94,8 @@ def storage_name(self) -> str: @staticmethod def validate_field( - v: t.Optional[t.Union[MatrixType, str]], values: t.Dict[str, t.Any], field: str - ) -> t.Optional[t.Union[MatrixType, str]]: + v: t.Optional[MatrixType | str], values: t.Dict[str, t.Any], field: str + ) -> t.Optional[MatrixType | str]: """ Validates a matrix array or link, and store the matrix array in the matrix repository. @@ -155,7 +155,7 @@ def validate_field( raise TypeError(repr(v)) @model_validator(mode="before") - def validate_matrices(cls, values: t.Union[t.Dict[str, t.Any], ValidationInfo]) -> t.Dict[str, t.Any]: + def validate_matrices(cls, values: t.Dict[str, t.Any] | ValidationInfo) -> t.Dict[str, t.Any]: new_values = values if isinstance(values, dict) else values.data for field in _MATRIX_NAMES: new_values[field] = cls.validate_field(new_values.get(field, None), new_values, field) diff --git a/antarest/study/storage/variantstudy/model/command/replace_matrix.py b/antarest/study/storage/variantstudy/model/command/replace_matrix.py index aaceb7c3f7..c51280bf8b 100644 --- a/antarest/study/storage/variantstudy/model/command/replace_matrix.py +++ b/antarest/study/storage/variantstudy/model/command/replace_matrix.py @@ -44,10 +44,10 @@ class ReplaceMatrix(ICommand): # ================== target: str - matrix: t.Union[t.List[t.List[MatrixData]], str] = Field(validate_default=True) + matrix: t.List[t.List[MatrixData]] | str = Field(validate_default=True) @field_validator("matrix", mode="before") - def matrix_validator(cls, matrix: t.Union[t.List[t.List[MatrixData]], str], values: ValidationInfo) -> str: + def matrix_validator(cls, matrix: t.List[t.List[MatrixData]] | str, values: ValidationInfo) -> str: return validate_matrix(matrix, values.data) @override diff --git a/antarest/study/storage/variantstudy/model/command/update_config.py b/antarest/study/storage/variantstudy/model/command/update_config.py index e53ed7edfe..159cd535dd 100644 --- a/antarest/study/storage/variantstudy/model/command/update_config.py +++ b/antarest/study/storage/variantstudy/model/command/update_config.py @@ -26,7 +26,7 @@ _ENR_MODELLING_KEY = "settings/generaldata/other preferences/renewable-generation-modelling" -_Data: te.TypeAlias = t.Union[str, int, float, bool, JSON, None] +_Data: te.TypeAlias = str | int | float | bool | JSON | None def _iter_dict(data: _Data, root_key: str = "") -> t.Generator[t.Tuple[str, t.Any], None, None]: diff --git a/antarest/study/storage/variantstudy/model/command/update_scenario_builder.py b/antarest/study/storage/variantstudy/model/command/update_scenario_builder.py index 6ae373e238..3180e78fce 100644 --- a/antarest/study/storage/variantstudy/model/command/update_scenario_builder.py +++ b/antarest/study/storage/variantstudy/model/command/update_scenario_builder.py @@ -53,7 +53,7 @@ class UpdateScenarioBuilder(ICommand): # Command parameters # ================== - data: t.Union[t.Dict[str, t.Any], t.Mapping[str, t.Any], t.MutableMapping[str, t.Any]] + data: t.Dict[str, t.Any] | t.Mapping[str, t.Any] | t.MutableMapping[str, t.Any] @override def _apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: diff --git a/antarest/study/storage/variantstudy/model/model.py b/antarest/study/storage/variantstudy/model/model.py index cb1866037f..a9c24f4ff9 100644 --- a/antarest/study/storage/variantstudy/model/model.py +++ b/antarest/study/storage/variantstudy/model/model.py @@ -42,7 +42,7 @@ class NewDetailsDTO(te.TypedDict): msg: str -DetailsDTO = t.Union[LegacyDetailsDTO, NewDetailsDTO] +DetailsDTO = LegacyDetailsDTO | NewDetailsDTO class GenerationResultInfoDTO(AntaresBaseModel): @@ -71,7 +71,7 @@ class CommandDTOAPI(AntaresBaseModel): id: t.Optional[str] = None action: str - args: t.Union[t.MutableSequence[JSON], JSON] + args: t.MutableSequence[JSON] | JSON version: int = 1 user_name: t.Optional[str] = None updated_at: t.Optional[datetime.datetime] = None @@ -93,7 +93,7 @@ class CommandDTO(AntaresBaseModel): id: t.Optional[str] = None action: str - args: t.Union[t.MutableSequence[JSON], JSON] + args: t.MutableSequence[JSON] | JSON version: int = 1 study_version: StudyVersionStr user_id: t.Optional[int] = None diff --git a/antarest/study/storage/variantstudy/snapshot_generator.py b/antarest/study/storage/variantstudy/snapshot_generator.py index 58d4d62b7b..901db0f54c 100644 --- a/antarest/study/storage/variantstudy/snapshot_generator.py +++ b/antarest/study/storage/variantstudy/snapshot_generator.py @@ -151,7 +151,7 @@ def _retrieve_descendants(self, variant_study_id: str) -> t.Tuple[RawStudy, t.Se root_study = self.repository.one(descendant_ids[0]) return root_study, descendants - def _export_ref_study(self, snapshot_dir: Path, ref_study: t.Union[RawStudy, VariantStudy]) -> None: + def _export_ref_study(self, snapshot_dir: Path, ref_study: RawStudy | VariantStudy) -> None: if isinstance(ref_study, VariantStudy): snapshot_dir.parent.mkdir(parents=True, exist_ok=True) export_study_flat( @@ -224,13 +224,13 @@ class RefStudySearchResult(t.NamedTuple): Result of the search for the reference study. """ - ref_study: t.Union[RawStudy, VariantStudy] + ref_study: RawStudy | VariantStudy cmd_blocks: t.Sequence[CommandBlock] force_regenerate: bool = False def search_ref_study( - root_study: t.Union[RawStudy, VariantStudy], + root_study: RawStudy | VariantStudy, descendants: t.Sequence[VariantStudy], *, from_scratch: bool = False, @@ -251,7 +251,7 @@ def search_ref_study( return RefStudySearchResult(ref_study=root_study, cmd_blocks=[], force_regenerate=True) # The reference study is the root study or a variant study with a valid snapshot - ref_study: t.Union[RawStudy, VariantStudy] + ref_study: RawStudy | VariantStudy # The commands to apply on the reference study to generate the current variant cmd_blocks: t.List[CommandBlock] diff --git a/antarest/study/storage/variantstudy/variant_study_service.py b/antarest/study/storage/variantstudy/variant_study_service.py index ba865f5d12..a8b1db25fc 100644 --- a/antarest/study/storage/variantstudy/variant_study_service.py +++ b/antarest/study/storage/variantstudy/variant_study_service.py @@ -1206,8 +1206,8 @@ def _clear_all_snapshots(self) -> None: ) ) for variant in variant_list: - if variant.updated_at and variant.updated_at < datetime.now(timezone.utc) - self._retention_time: - if variant.last_access and variant.last_access < datetime.now(timezone.utc) - self._retention_time: + if variant.updated_at and variant.updated_at < datetime.utcnow() - self._retention_time: + if variant.last_access and variant.last_access < datetime.utcnow() - self._retention_time: self._variant_study_service.clear_snapshot(variant) def run_task(self, notifier: ITaskNotifier) -> TaskResult: diff --git a/antarest/study/web/study_data_blueprint.py b/antarest/study/web/study_data_blueprint.py index 95659ecd7f..23a7ded9ab 100644 --- a/antarest/study/web/study_data_blueprint.py +++ b/antarest/study/web/study_data_blueprint.py @@ -91,7 +91,7 @@ class BCKeyValueType(te.TypedDict): """Deprecated type for binding constraint key-value pair (used for update)""" key: str - value: t.Union[str, int, float, bool] + value: str | int | float | bool class ClusterType(enum.StrEnum): @@ -127,14 +127,14 @@ def create_study_data_routes(study_service: StudyService, config: Config) -> API "/studies/{uuid}/areas", tags=[APITag.study_data], summary="Get all areas basic info", - response_model=t.Union[t.List[AreaInfoDTO], t.Dict[str, t.Any]], + response_model=t.List[AreaInfoDTO] | t.Dict[str, t.Any], ) def get_areas( uuid: str, type: AreaType = Query(None), ui: bool = False, current_user: JWTUser = Depends(auth.get_current_user), - ) -> t.Union[t.List[AreaInfoDTO], t.Dict[str, t.Any]]: + ) -> t.List[AreaInfoDTO] | t.Dict[str, t.Any]: logger.info( f"Fetching area list (type={type}) for study {uuid}", extra={"user": current_user.id}, @@ -246,7 +246,7 @@ def update_area_ui( def update_area_info( uuid: str, area_id: str, - area_patch_dto: t.Union[PatchArea, t.Dict[str, PatchCluster]], + area_patch_dto: PatchArea | t.Dict[str, PatchCluster], current_user: JWTUser = Depends(auth.get_current_user), ) -> t.Any: logger.info( @@ -2599,7 +2599,7 @@ def duplicate_cluster( source_cluster_id: str, new_cluster_name: str = Query(..., alias="newName", title="New Cluster Name"), current_user: JWTUser = Depends(auth.get_current_user), - ) -> t.Union[STStorageOutput, ThermalClusterOutput, RenewableClusterOutput]: + ) -> STStorageOutput | ThermalClusterOutput | RenewableClusterOutput: logger.info( f"Duplicates {cluster_type.value} {source_cluster_id} of {area_id} for study {uuid}", extra={"user": current_user.id}, @@ -2607,7 +2607,7 @@ def duplicate_cluster( params = RequestParameters(user=current_user) study = study_service.check_study_access(uuid, StudyPermissionType.WRITE, params) - manager: t.Union[STStorageManager, RenewableManager, ThermalManager] + manager: STStorageManager | RenewableManager | ThermalManager if cluster_type == ClusterType.ST_STORAGES: manager = STStorageManager(study_service.storage_service) elif cluster_type == ClusterType.RENEWABLES: diff --git a/antarest/study/web/xpansion_studies_blueprint.py b/antarest/study/web/xpansion_studies_blueprint.py index 0871efd0f0..dcdc88dbf9 100644 --- a/antarest/study/web/xpansion_studies_blueprint.py +++ b/antarest/study/web/xpansion_studies_blueprint.py @@ -271,9 +271,7 @@ def get_resource_content( StudyPermissionType.READ, RequestParameters(user=current_user), ) - output: t.Union[JSON, bytes, str] = study_service.xpansion_manager.get_resource_content( - study, resource_type, filename - ) + output: JSON | bytes | str = study_service.xpansion_manager.get_resource_content(study, resource_type, filename) if isinstance(output, bytes): try: diff --git a/tests/integration/study_data_blueprint/test_link.py b/tests/integration/study_data_blueprint/test_link.py index 7ba267c7cd..9585ad674d 100644 --- a/tests/integration/study_data_blueprint/test_link.py +++ b/tests/integration/study_data_blueprint/test_link.py @@ -9,6 +9,8 @@ # SPDX-License-Identifier: MPL-2.0 # # This file is part of the Antares project. +import re + import pytest from starlette.testclient import TestClient @@ -284,11 +286,14 @@ def test_link_820(self, client: TestClient, user_access_token: str, study_type: ) assert res.status_code == 422, res.json() - expected = { - "description": "Invalid value(s) in filters: centurial. Allowed values are: hourly, daily, weekly, monthly, annual.", - "exception": "LinkValidationError", - } - assert expected == res.json() + + res_json = res.json() + assert res_json["exception"] == "LinkValidationError" + match = re.search(r"Allowed values are: (.*)\.", res_json["description"]) + assert match, f"Unexpected error message format: {res_json['description']}" + res_values = sorted(match.group(1).split(", ")) + expected_values = sorted(["daily", "hourly", "monthly", "weekly", "annual"]) + assert res_values == expected_values, f"Returned values: {res_values}, expected: {expected_values}" # Test create link with empty filters