diff --git a/antarest/core/filesystem_blueprint.py b/antarest/core/filesystem_blueprint.py index d625d19c07..804746993d 100644 --- a/antarest/core/filesystem_blueprint.py +++ b/antarest/core/filesystem_blueprint.py @@ -23,10 +23,11 @@ import typing_extensions as te from fastapi import APIRouter, Depends, HTTPException -from pydantic import BaseModel, Field +from pydantic import Field from starlette.responses import PlainTextResponse, StreamingResponse from antarest.core.config import Config +from antarest.core.serialization import AntaresBaseModel from antarest.core.utils.web import APITag from antarest.login.auth import Auth @@ -35,7 +36,7 @@ class FilesystemDTO( - BaseModel, + AntaresBaseModel, extra="forbid", json_schema_extra={ "example": { @@ -61,7 +62,7 @@ class FilesystemDTO( class MountPointDTO( - BaseModel, + AntaresBaseModel, extra="forbid", json_schema_extra={ "example": { @@ -109,7 +110,7 @@ async def from_path(cls, name: str, path: Path) -> "MountPointDTO": class FileInfoDTO( - BaseModel, + AntaresBaseModel, extra="forbid", json_schema_extra={ "example": { diff --git a/antarest/core/interfaces/eventbus.py b/antarest/core/interfaces/eventbus.py index 30771baff4..c590233f43 100644 --- a/antarest/core/interfaces/eventbus.py +++ b/antarest/core/interfaces/eventbus.py @@ -46,7 +46,7 @@ class EventType(StrEnum): WORKER_TASK_STARTED = "WORKER_TASK_STARTED" WORKER_TASK_ENDED = "WORKER_TASK_ENDED" LAUNCH_PROGRESS = "LAUNCH_PROGRESS" - TS_GENERATION_PROGRESS = "TS_GENERATION_PROGRESS" + TASK_PROGRESS = "TASK_PROGRESS" class EventChannelDirectory: @@ -137,6 +137,9 @@ def start(self, threaded: bool = True) -> None: class DummyEventBusService(IEventBus): + def __init__(self) -> None: + self.events: List[Event] = [] + def queue(self, event: Event, queue: str) -> None: # Noop pass @@ -150,7 +153,7 @@ def remove_queue_consumer(self, listener_id: str) -> None: def push(self, event: Event) -> None: # Noop - pass + self.events.append(event) def add_listener( self, diff --git a/antarest/core/tasks/service.py b/antarest/core/tasks/service.py index 2b8bfb725d..8a0ec83105 100644 --- a/antarest/core/tasks/service.py +++ b/antarest/core/tasks/service.py @@ -42,13 +42,23 @@ logger = logging.getLogger(__name__) -TaskUpdateNotifier = t.Callable[[str], None] -Task = t.Callable[[TaskUpdateNotifier], TaskResult] - DEFAULT_AWAIT_MAX_TIMEOUT = 172800 # 48 hours """Default timeout for `await_task` in seconds.""" +class ITaskNotifier(ABC): + @abstractmethod + def notify_message(self, message: str) -> None: + raise NotImplementedError() + + @abstractmethod + def notify_progress(self, progress: int) -> None: + raise NotImplementedError() + + +Task = t.Callable[[ITaskNotifier], TaskResult] + + class ITaskService(ABC): @abstractmethod def add_worker_task( @@ -94,11 +104,17 @@ def await_task(self, task_id: str, timeout_sec: int = DEFAULT_AWAIT_MAX_TIMEOUT) # noinspection PyUnusedLocal -def noop_notifier(message: str) -> None: - """This function is used in tasks when no notification is required.""" +class NoopNotifier(ITaskNotifier): + """This class is used in tasks when no notification is required.""" + + def notify_message(self, message: str) -> None: + return + def notify_progress(self, progress: int) -> None: + return -class TaskJobLogRecorder: + +class TaskLogAndProgressRecorder(ITaskNotifier): """ Callback used to register log messages in the TaskJob table. @@ -107,15 +123,32 @@ class TaskJobLogRecorder: session: The database session created in the same thread as the task thread. """ - def __init__(self, task_id: str, session: Session): + def __init__(self, task_id: str, session: Session, event_bus: IEventBus) -> None: self.session = session self.task_id = task_id + self.event_bus = event_bus - def __call__(self, message: str) -> None: + def notify_message(self, message: str) -> None: task = self.session.query(TaskJob).get(self.task_id) if task: task.logs.append(TaskJobLog(message=message, task_id=self.task_id)) - db.session.commit() + self.session.commit() + + def notify_progress(self, progress: int) -> None: + self.session.query(TaskJob).filter(TaskJob.id == self.task_id).update({TaskJob.progress: progress}) + self.session.commit() + + self.event_bus.push( + Event( + type=EventType.TASK_PROGRESS, + payload={ + "task_id": self.task_id, + "progress": progress, + }, + permissions=PermissionInfo(public_mode=PublicMode.READ), + channel=EventChannelDirectory.TASK + self.task_id, + ) + ) class TaskJobService(ITaskService): @@ -138,7 +171,7 @@ def _create_worker_task( task_id: str, task_type: str, task_args: t.Dict[str, t.Union[int, float, bool, str]], - ) -> t.Callable[[TaskUpdateNotifier], TaskResult]: + ) -> Task: task_result_wrapper: t.List[TaskResult] = [] def _create_awaiter( @@ -152,7 +185,7 @@ async def _await_task_end(event: Event) -> None: return _await_task_end # noinspection PyUnusedLocal - def _send_worker_task(logger_: TaskUpdateNotifier) -> TaskResult: + def _send_worker_task(logger_: ITaskNotifier) -> TaskResult: listener_id = self.event_bus.add_listener( _create_awaiter(task_result_wrapper), [EventType.WORKER_TASK_ENDED], @@ -380,7 +413,7 @@ def _run_task( try: with db(): # We must use the DB session attached to the current thread - result = callback(TaskJobLogRecorder(task_id, session=db.session)) + result = callback(TaskLogAndProgressRecorder(task_id, db.session, self.event_bus)) status = TaskStatus.COMPLETED if result.success else TaskStatus.FAILED logger.info(f"Task {task_id} ended with status {status}") diff --git a/antarest/launcher/service.py b/antarest/launcher/service.py index e5f78ede3f..7032e388c0 100644 --- a/antarest/launcher/service.py +++ b/antarest/launcher/service.py @@ -33,7 +33,7 @@ from antarest.core.model import PermissionInfo, PublicMode, StudyPermissionType from antarest.core.requests import RequestParameters, UserHasNotPermissionError from antarest.core.tasks.model import TaskResult, TaskType -from antarest.core.tasks.service import ITaskService, TaskUpdateNotifier +from antarest.core.tasks.service import ITaskNotifier, ITaskService from antarest.core.utils.archives import ArchiveFormat, archive_dir, is_zip, read_in_zip from antarest.core.utils.fastapi_sqlalchemy import db from antarest.core.utils.utils import StopWatch, concat_files, concat_files_to_str @@ -598,7 +598,7 @@ def _download_fallback_output(self, job_id: str, params: RequestParameters) -> F export_path = Path(export_file_download.path) export_id = export_file_download.id - def export_task(_: TaskUpdateNotifier) -> TaskResult: + def export_task(_: ITaskNotifier) -> TaskResult: try: # archive_dir(output_path, export_path, archive_format=ArchiveFormat.ZIP) diff --git a/antarest/matrixstore/service.py b/antarest/matrixstore/service.py index 1d7bc27955..c2a8ca5c5d 100644 --- a/antarest/matrixstore/service.py +++ b/antarest/matrixstore/service.py @@ -32,7 +32,7 @@ from antarest.core.requests import RequestParameters, UserHasNotPermissionError from antarest.core.serialization import from_json from antarest.core.tasks.model import TaskResult, TaskType -from antarest.core.tasks.service import ITaskService, TaskUpdateNotifier +from antarest.core.tasks.service import ITaskNotifier, ITaskService from antarest.core.utils.archives import ArchiveFormat, archive_dir from antarest.core.utils.fastapi_sqlalchemy import db from antarest.core.utils.utils import StopWatch @@ -510,7 +510,7 @@ def download_matrix_list( export_path = Path(export_file_download.path) export_id = export_file_download.id - def export_task(notifier: TaskUpdateNotifier) -> TaskResult: + def export_task(notifier: ITaskNotifier) -> TaskResult: try: self.create_matrix_files(matrix_ids=matrix_list, export_path=export_path) self.file_transfer_manager.set_ready(export_id) diff --git a/antarest/study/business/utils.py b/antarest/study/business/utils.py index 58682fd596..76671f2eec 100644 --- a/antarest/study/business/utils.py +++ b/antarest/study/business/utils.py @@ -13,11 +13,11 @@ import typing as t from antares.study.version import StudyVersion -from pydantic import BaseModel from antarest.core.exceptions import CommandApplicationError from antarest.core.jwt import DEFAULT_ADMIN_USER from antarest.core.requests import RequestParameters +from antarest.core.serialization import AntaresBaseModel from antarest.study.business.all_optional_meta import camel_case_model from antarest.study.model import RawStudy, Study from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy @@ -25,6 +25,7 @@ from antarest.study.storage.utils import is_managed from antarest.study.storage.variantstudy.business.utils import transform_command_to_dto from antarest.study.storage.variantstudy.model.command.icommand import ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener # noinspection SpellCheckingInspection GENERAL_DATA_PATH = "settings/generaldata" @@ -35,11 +36,12 @@ def execute_or_add_commands( file_study: FileStudy, commands: t.Sequence[ICommand], storage_service: StudyStorageService, + listener: t.Optional[ICommandListener] = None, ) -> None: if isinstance(study, RawStudy): executed_commands: t.MutableSequence[ICommand] = [] for command in commands: - result = command.apply(file_study) + result = command.apply(file_study, listener) if not result.status: raise CommandApplicationError(result.message) executed_commands.append(command) @@ -72,7 +74,7 @@ def execute_or_add_commands( @camel_case_model class FormFieldsBaseModel( - BaseModel, + AntaresBaseModel, extra="forbid", validate_assignment=True, populate_by_name=True, diff --git a/antarest/study/service.py b/antarest/study/service.py index 166c92f17b..5bd0cd4d88 100644 --- a/antarest/study/service.py +++ b/antarest/study/service.py @@ -57,7 +57,7 @@ from antarest.core.requests import RequestParameters, UserHasNotPermissionError from antarest.core.serialization import to_json from antarest.core.tasks.model import TaskListFilter, TaskResult, TaskStatus, TaskType -from antarest.core.tasks.service import ITaskService, TaskUpdateNotifier, noop_notifier +from antarest.core.tasks.service import ITaskNotifier, ITaskService, NoopNotifier from antarest.core.utils.archives import ArchiveFormat, is_archive_format from antarest.core.utils.fastapi_sqlalchemy import db from antarest.core.utils.utils import StopWatch @@ -156,6 +156,7 @@ from antarest.study.storage.variantstudy.model.command.update_comments import UpdateComments from antarest.study.storage.variantstudy.model.command.update_config import UpdateConfig from antarest.study.storage.variantstudy.model.command.update_raw_file import UpdateRawFile +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.dbmodel import VariantStudy from antarest.study.storage.variantstudy.model.model import CommandDTO from antarest.study.storage.variantstudy.variant_study_service import VariantStudyService @@ -181,6 +182,14 @@ def get_disk_usage(path: t.Union[str, Path]) -> int: return total_size +class TaskProgressRecorder(ICommandListener): + def __init__(self, notifier: ITaskNotifier) -> None: + self.notifier = notifier + + def notify_progress(self, progress: int) -> None: + return self.notifier.notify_progress(progress) + + class ThermalClusterTimeSeriesGeneratorTask: """ Task to generate thermal clusters time series @@ -198,14 +207,29 @@ def __init__( self.storage_service = storage_service self.event_bus = event_bus - def _generate_timeseries(self) -> None: + def _generate_timeseries(self, notifier: ITaskNotifier) -> None: """Run the task (lock the database).""" command_context = self.storage_service.variant_study_service.command_factory.command_context - command = GenerateThermalClusterTimeSeries(command_context=command_context) + command = GenerateThermalClusterTimeSeries.model_construct(command_context=command_context) + listener = TaskProgressRecorder(notifier=notifier) with db(): study = self.repository.one(self._study_id) file_study = self.storage_service.get_storage(study).get_raw(study) - execute_or_add_commands(study, file_study, [command], self.storage_service) + execute_or_add_commands(study, file_study, [command], self.storage_service, listener) + + if isinstance(file_study, VariantStudy): + # In this case we only added the command to the list. + # It means the generation will really be executed in the next snapshot generation. + # We don't want this, we want this task to generate the matrices no matter the study. + # Therefore, we have to launch a variant generation task inside the timeseries generation one. + variant_service = self.storage_service.variant_study_service + task_service = variant_service.task_service + generation_task_id = variant_service.generate_task(study, True, False, listener) + task_service.await_task(generation_task_id) + result = task_service.status_task(generation_task_id, RequestParameters(DEFAULT_ADMIN_USER)) + if not result.result or not result.result.success: + raise ValueError(f"Failed to generate variant study {self._study_id}") + self.event_bus.push( Event( type=EventType.STUDY_EDITED, @@ -214,12 +238,12 @@ def _generate_timeseries(self) -> None: ) ) - def run_task(self, notifier: TaskUpdateNotifier) -> TaskResult: + def run_task(self, notifier: ITaskNotifier) -> TaskResult: msg = f"Generating thermal timeseries for study '{self._study_id}'" - notifier(msg) - self._generate_timeseries() + notifier.notify_message(msg) + self._generate_timeseries(notifier) msg = f"Successfully generated thermal timeseries for study '{self._study_id}'" - notifier(msg) + notifier.notify_message(msg) return TaskResult(success=True, message=msg) # Make `ThermalClusterTimeSeriesGeneratorTask` object callable @@ -285,7 +309,7 @@ def _upgrade_study(self) -> None: file_study = self.storage_service.get_storage(study_to_upgrade).get_raw(study_to_upgrade) file_study.tree.normalize() - def run_task(self, notifier: TaskUpdateNotifier) -> TaskResult: + def run_task(self, notifier: ITaskNotifier) -> TaskResult: """ Run the study upgrade task. @@ -298,10 +322,10 @@ def run_task(self, notifier: TaskUpdateNotifier) -> TaskResult: # The call to `_upgrade_study` may raise an exception, which will be # handled in the task service (see: `TaskJobService._run_task`) msg = f"Upgrade study '{self._study_id}' to version {self._target_version}" - notifier(msg) + notifier.notify_message(msg) self._upgrade_study() msg = f"Successfully upgraded study '{self._study_id}' to version {self._target_version}" - notifier(msg) + notifier.notify_message(msg) return TaskResult(success=True, message=msg) # Make `StudyUpgraderTask` object is callable @@ -966,7 +990,7 @@ def copy_study( assert_permission(params.user, src_study, StudyPermissionType.READ) self._assert_study_unarchived(src_study) - def copy_task(notifier: TaskUpdateNotifier) -> TaskResult: + def copy_task(notifier: ITaskNotifier) -> TaskResult: origin_study = self.get_study(src_uuid) study = self.storage_service.get_storage(origin_study).copy( origin_study, @@ -1006,7 +1030,7 @@ def copy_task(notifier: TaskUpdateNotifier) -> TaskResult: request_params=params, ) else: - res = copy_task(noop_notifier) + res = copy_task(NoopNotifier()) task_or_study_id = res.return_value or "" return task_or_study_id @@ -1052,7 +1076,7 @@ def export_study( export_path = Path(export_file_download.path) export_id = export_file_download.id - def export_task(notifier: TaskUpdateNotifier) -> TaskResult: + def export_task(notifier: ITaskNotifier) -> TaskResult: try: target_study = self.get_study(uuid) self.storage_service.get_storage(target_study).export_study(target_study, export_path, outputs) @@ -1119,7 +1143,7 @@ def export_output( export_path = Path(export_file_download.path) export_id = export_file_download.id - def export_task(notifier: TaskUpdateNotifier) -> TaskResult: + def export_task(notifier: ITaskNotifier) -> TaskResult: try: target_study = self.get_study(study_uuid) self.storage_service.get_storage(target_study).export_output( @@ -1283,7 +1307,7 @@ def download_outputs( export_path = Path(export_file_download.path) export_id = export_file_download.id - def export_task(_notifier: TaskUpdateNotifier) -> TaskResult: + def export_task(_notifier: ITaskNotifier) -> TaskResult: try: _study = self.get_study(study_id) _stopwatch = StopWatch() @@ -2008,7 +2032,7 @@ def archive(self, uuid: str, params: RequestParameters) -> str: ): raise TaskAlreadyRunning() - def archive_task(notifier: TaskUpdateNotifier) -> TaskResult: + def archive_task(notifier: ITaskNotifier) -> TaskResult: study_to_archive = self.get_study(uuid) self.storage_service.raw_study_service.archive(study_to_archive) study_to_archive.archived = True @@ -2052,7 +2076,7 @@ def unarchive(self, uuid: str, params: RequestParameters) -> str: if not isinstance(study, RawStudy): raise StudyTypeUnsupported(study.id, study.type) - def unarchive_task(notifier: TaskUpdateNotifier) -> TaskResult: + def unarchive_task(notifier: ITaskNotifier) -> TaskResult: study_to_archive = self.get_study(uuid) self.storage_service.raw_study_service.unarchive(study_to_archive) study_to_archive.archived = False @@ -2360,9 +2384,7 @@ def archive_output( if len(list(filter(lambda t: t.name in archive_task_names, study_tasks))): raise TaskAlreadyRunning() - def archive_output_task( - notifier: TaskUpdateNotifier, - ) -> TaskResult: + def archive_output_task(notifier: ITaskNotifier) -> TaskResult: try: study = self.get_study(study_id) stopwatch = StopWatch() @@ -2422,9 +2444,7 @@ def unarchive_output( if len(list(filter(lambda t: t.name in archive_task_names, study_tasks))): raise TaskAlreadyRunning() - def unarchive_output_task( - notifier: TaskUpdateNotifier, - ) -> TaskResult: + def unarchive_output_task(notifier: ITaskNotifier) -> TaskResult: try: study = self.get_study(study_id) stopwatch = StopWatch() diff --git a/antarest/study/storage/rawstudy/model/filesystem/config/ini_properties.py b/antarest/study/storage/rawstudy/model/filesystem/config/ini_properties.py index e731ea203b..c5c3c83950 100644 --- a/antarest/study/storage/rawstudy/model/filesystem/config/ini_properties.py +++ b/antarest/study/storage/rawstudy/model/filesystem/config/ini_properties.py @@ -12,13 +12,11 @@ import typing as t -from pydantic import BaseModel - -from antarest.core.serialization import from_json, to_json +from antarest.core.serialization import AntaresBaseModel, from_json, to_json class IniProperties( - BaseModel, + AntaresBaseModel, # On reading, if the configuration contains an extra field, it is better # to forbid it, because it allows errors to be detected early. # Ignoring extra attributes can hide errors. diff --git a/antarest/study/storage/rawstudy/watcher.py b/antarest/study/storage/rawstudy/watcher.py index 5648629090..d00a7f7204 100644 --- a/antarest/study/storage/rawstudy/watcher.py +++ b/antarest/study/storage/rawstudy/watcher.py @@ -28,7 +28,7 @@ from antarest.core.interfaces.service import IService from antarest.core.requests import RequestParameters from antarest.core.tasks.model import TaskResult, TaskType -from antarest.core.tasks.service import ITaskService, TaskUpdateNotifier +from antarest.core.tasks.service import ITaskNotifier, ITaskService from antarest.core.utils.fastapi_sqlalchemy import db from antarest.core.utils.utils import StopWatch from antarest.login.model import Group @@ -186,7 +186,7 @@ def oneshot_scan( """ # noinspection PyUnusedLocal - def scan_task(notifier: TaskUpdateNotifier) -> TaskResult: + def scan_task(notifier: ITaskNotifier) -> TaskResult: self.scan(workspace, path) return TaskResult(success=True, message="Scan completed") diff --git a/antarest/study/storage/variantstudy/model/command/create_area.py b/antarest/study/storage/variantstudy/model/command/create_area.py index b3f1a30e7f..5c7f76a8d1 100644 --- a/antarest/study/storage/variantstudy/model/command/create_area.py +++ b/antarest/study/storage/variantstudy/model/command/create_area.py @@ -25,6 +25,7 @@ from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput, FilteringOptions from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO @@ -105,7 +106,7 @@ def _apply_config(self, study_data: FileStudyTreeConfig) -> t.Tuple[CommandOutpu {"area_id": area_id}, ) - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: config = study_data.config output, data = self._apply_config(config) diff --git a/antarest/study/storage/variantstudy/model/command/create_binding_constraint.py b/antarest/study/storage/variantstudy/model/command/create_binding_constraint.py index e84564de57..84a7c4ecee 100644 --- a/antarest/study/storage/variantstudy/model/command/create_binding_constraint.py +++ b/antarest/study/storage/variantstudy/model/command/create_binding_constraint.py @@ -39,6 +39,7 @@ ) from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO MatrixType = t.List[t.List[MatrixData]] @@ -424,7 +425,7 @@ def _apply_config(self, study_data_config: FileStudyTreeConfig) -> t.Tuple[Comma ) return CommandOutput(status=True), {} - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: binding_constraints = study_data.tree.get(["input", "bindingconstraints", "bindingconstraints"]) new_key = str(len(binding_constraints)) bd_id = transform_name_to_id(self.name) diff --git a/antarest/study/storage/variantstudy/model/command/create_cluster.py b/antarest/study/storage/variantstudy/model/command/create_cluster.py index ace36bce74..d45a223b39 100644 --- a/antarest/study/storage/variantstudy/model/command/create_cluster.py +++ b/antarest/study/storage/variantstudy/model/command/create_cluster.py @@ -28,6 +28,7 @@ from antarest.study.storage.variantstudy.business.utils import strip_matrix_protocol, validate_matrix from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO @@ -119,7 +120,7 @@ def _apply_config(self, study_data: FileStudyTreeConfig) -> t.Tuple[CommandOutpu {"cluster_id": cluster.id}, ) - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: output, data = self._apply_config(study_data.config) if not output.status: return output diff --git a/antarest/study/storage/variantstudy/model/command/create_district.py b/antarest/study/storage/variantstudy/model/command/create_district.py index afb9736806..9c5aff55f6 100644 --- a/antarest/study/storage/variantstudy/model/command/create_district.py +++ b/antarest/study/storage/variantstudy/model/command/create_district.py @@ -23,6 +23,7 @@ from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO @@ -83,7 +84,7 @@ def _apply_config(self, study_data: FileStudyTreeConfig) -> Tuple[CommandOutput, "item_key": item_key, } - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: Optional[ICommandListener] = None) -> CommandOutput: output, data = self._apply_config(study_data.config) if not output.status: return output diff --git a/antarest/study/storage/variantstudy/model/command/create_link.py b/antarest/study/storage/variantstudy/model/command/create_link.py index eec1c4f7ea..7954981d99 100644 --- a/antarest/study/storage/variantstudy/model/command/create_link.py +++ b/antarest/study/storage/variantstudy/model/command/create_link.py @@ -23,6 +23,7 @@ from antarest.study.storage.variantstudy.business.utils import strip_matrix_protocol, validate_matrix from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput, FilteringOptions from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO @@ -205,7 +206,7 @@ def _apply_config(self, study_data: FileStudyTreeConfig) -> Tuple[CommandOutput, {"area_from": area_from, "area_to": area_to}, ) - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: Optional[ICommandListener] = None) -> CommandOutput: version = study_data.config.version output, data = self._apply_config(study_data.config) if not output.status: diff --git a/antarest/study/storage/variantstudy/model/command/create_renewables_cluster.py b/antarest/study/storage/variantstudy/model/command/create_renewables_cluster.py index 1a932dd30d..5c0120f7a9 100644 --- a/antarest/study/storage/variantstudy/model/command/create_renewables_cluster.py +++ b/antarest/study/storage/variantstudy/model/command/create_renewables_cluster.py @@ -25,6 +25,7 @@ from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO @@ -100,7 +101,7 @@ def _apply_config(self, study_data: FileStudyTreeConfig) -> t.Tuple[CommandOutpu {"cluster_id": cluster.id}, ) - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: output, data = self._apply_config(study_data.config) if not output.status: return output diff --git a/antarest/study/storage/variantstudy/model/command/create_st_storage.py b/antarest/study/storage/variantstudy/model/command/create_st_storage.py index 4bebf009c3..ed68124bbb 100644 --- a/antarest/study/storage/variantstudy/model/command/create_st_storage.py +++ b/antarest/study/storage/variantstudy/model/command/create_st_storage.py @@ -25,6 +25,7 @@ from antarest.study.storage.variantstudy.business.utils import strip_matrix_protocol, validate_matrix from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO # noinspection SpellCheckingInspection @@ -214,7 +215,7 @@ def _apply_config(self, study_data: FileStudyTreeConfig) -> t.Tuple[CommandOutpu {"storage_id": self.storage_id}, ) - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: """ Applies the study data to update storage configurations and saves the changes. diff --git a/antarest/study/storage/variantstudy/model/command/generate_thermal_cluster_timeseries.py b/antarest/study/storage/variantstudy/model/command/generate_thermal_cluster_timeseries.py index c2709dfc86..1ff3dffc05 100644 --- a/antarest/study/storage/variantstudy/model/command/generate_thermal_cluster_timeseries.py +++ b/antarest/study/storage/variantstudy/model/command/generate_thermal_cluster_timeseries.py @@ -28,6 +28,7 @@ from antarest.study.storage.rawstudy.model.filesystem.matrix.matrix import dump_dataframe from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import ICommand, OutputTuple +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO logger = logging.getLogger(__name__) @@ -63,13 +64,13 @@ class GenerateThermalClusterTimeSeries(ICommand): def _apply_config(self, study_data: FileStudyTreeConfig) -> OutputTuple: return CommandOutput(status=True, message="Nothing to do"), {} - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: study_path = study_data.config.study_path with tempfile.TemporaryDirectory(suffix=TS_GEN_SUFFIX, prefix=TS_GEN_PREFIX, dir=study_path.parent) as path: tmp_dir = Path(path) try: shutil.copytree(study_path / "input" / "thermal" / "series", tmp_dir, dirs_exist_ok=True) - self._build_timeseries(study_data, tmp_dir) + self._build_timeseries(study_data, tmp_dir, listener) except Exception as e: logger.error(f"Unhandled exception when trying to generate thermal timeseries: {e}", exc_info=True) raise @@ -77,7 +78,9 @@ def _apply(self, study_data: FileStudy) -> CommandOutput: self._replace_safely_original_files(study_path, tmp_dir) return CommandOutput(status=True, message="All time series were generated successfully") - def _build_timeseries(self, study_data: FileStudy, tmp_path: Path) -> None: + def _build_timeseries( + self, study_data: FileStudy, tmp_path: Path, listener: t.Optional[ICommandListener] = None + ) -> None: # 1- Get the seed and nb_years to generate # NB: Default seed in IHM Legacy: 5489, default seed in web: 3005489. general_data = study_data.tree.get(["settings", "generaldata"], depth=3) @@ -86,17 +89,21 @@ def _build_timeseries(self, study_data: FileStudy, tmp_path: Path) -> None: # 2 - Build the generator rng = MersenneTwisterRNG(seed=thermal_seed) generator = ThermalDataGenerator(rng=rng, days=365) - # 3- Loop through areas in alphabetical order + # 3- Do a first loop to know how many operations will be performed + total_generations = sum(len(area.thermals) for area in study_data.config.areas.values()) + # 4- Loop through areas in alphabetical order areas: t.Dict[str, Area] = study_data.config.areas sorted_areas = {k: areas[k] for k in sorted(areas)} + generation_performed = 0 for area_id, area in sorted_areas.items(): - # 4- Loop through thermal clusters in alphabetical order + # 5- Loop through thermal clusters in alphabetical order sorted_thermals = sorted(area.thermals, key=lambda x: x.id) for thermal in sorted_thermals: - # 5 - Filters out clusters with no generation + # 6 - Filters out clusters with no generation if thermal.gen_ts == LocalTSGenerationBehavior.FORCE_NO_GENERATION: + generation_performed += 1 continue - # 6- Build the cluster + # 7- Build the cluster url = ["input", "thermal", "prepro", area_id, thermal.id.lower(), "modulation"] matrix = study_data.tree.get_node(url) matrix_df = matrix.parse(return_dataframe=True) # type: ignore @@ -123,13 +130,18 @@ def _build_timeseries(self, study_data: FileStudy, tmp_path: Path) -> None: npo_min=npo_min, npo_max=npo_max, ) - # 7- Generate the time-series + # 8- Generate the time-series results = generator.generate_time_series(cluster, nb_years) generated_matrix = results.available_power - # 8- Write the matrix inside the input folder. + # 9- Write the matrix inside the input folder. df = pd.DataFrame(data=generated_matrix, dtype=int) target_path = self._build_matrix_path(tmp_path / area_id / thermal.id.lower()) dump_dataframe(df, target_path, None) + # 10- Notify the progress to the notifier + generation_performed += 1 + if listener: + progress = int(100 * generation_performed / total_generations) + listener.notify_progress(progress) def to_dto(self) -> CommandDTO: return CommandDTO(action=self.command_name.value, args={}) diff --git a/antarest/study/storage/variantstudy/model/command/icommand.py b/antarest/study/storage/variantstudy/model/command/icommand.py index 537b5ddb5b..7ea0bd64ba 100644 --- a/antarest/study/storage/variantstudy/model/command/icommand.py +++ b/antarest/study/storage/variantstudy/model/command/icommand.py @@ -16,13 +16,14 @@ from abc import ABC, abstractmethod import typing_extensions as te -from pydantic import BaseModel +from antarest.core.serialization import AntaresBaseModel from antarest.core.utils.utils import assert_this from antarest.study.storage.rawstudy.model.filesystem.config.model import FileStudyTreeConfig from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command_context import CommandContext +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO if t.TYPE_CHECKING: # False at runtime, for mypy @@ -35,7 +36,7 @@ OutputTuple: te.TypeAlias = t.Tuple[CommandOutput, t.Dict[str, t.Any]] -class ICommand(ABC, BaseModel, extra="forbid", arbitrary_types_allowed=True): +class ICommand(ABC, AntaresBaseModel, extra="forbid", arbitrary_types_allowed=True): """ Interface for all commands that can be applied to a study. @@ -78,7 +79,7 @@ def apply_config(self, study_data: FileStudyTreeConfig) -> CommandOutput: return output @abstractmethod - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: """ Applies the study data to update storage configurations and saves the changes. @@ -90,18 +91,19 @@ def _apply(self, study_data: FileStudy) -> CommandOutput: """ raise NotImplementedError() - def apply(self, study_data: FileStudy) -> CommandOutput: + def apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: """ Applies the study data to update storage configurations and saves the changes. Args: study_data: The study data to be applied. + listener: Can be used by the command to notify anyone giving one. Returns: The output of the command execution. """ try: - return self._apply(study_data) + return self._apply(study_data, listener) except Exception as e: logger.warning( f"Failed to execute variant command {self.command_name}", diff --git a/antarest/study/storage/variantstudy/model/command/remove_area.py b/antarest/study/storage/variantstudy/model/command/remove_area.py index a61290bab1..39162252e8 100644 --- a/antarest/study/storage/variantstudy/model/command/remove_area.py +++ b/antarest/study/storage/variantstudy/model/command/remove_area.py @@ -30,6 +30,7 @@ ) from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO logger = logging.getLogger(__name__) @@ -222,7 +223,7 @@ def _remove_area_from_scenario_builder(self, study_data: FileStudy) -> None: study_data.tree.save(rulesets, ["settings", "scenariobuilder"]) # noinspection SpellCheckingInspection - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: study_data.tree.delete(["input", "areas", self.id]) study_data.tree.delete(["input", "hydro", "common", "capacity", f"maxpower_{self.id}"]) study_data.tree.delete(["input", "hydro", "common", "capacity", f"reservoir_{self.id}"]) diff --git a/antarest/study/storage/variantstudy/model/command/remove_binding_constraint.py b/antarest/study/storage/variantstudy/model/command/remove_binding_constraint.py index d497576b22..ca77cad860 100644 --- a/antarest/study/storage/variantstudy/model/command/remove_binding_constraint.py +++ b/antarest/study/storage/variantstudy/model/command/remove_binding_constraint.py @@ -10,7 +10,7 @@ # # This file is part of the Antares project. -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Optional, Tuple from antarest.core.model import JSON from antarest.study.model import STUDY_VERSION_8_7 @@ -20,6 +20,7 @@ from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.create_binding_constraint import remove_bc_from_scenario_builder from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO @@ -43,7 +44,7 @@ def _apply_config(self, study_data: FileStudyTreeConfig) -> Tuple[CommandOutput, study_data.bindings.remove(next(iter([bind for bind in study_data.bindings if bind.id == self.id]))) return CommandOutput(status=True), {} - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: Optional[ICommandListener] = None) -> CommandOutput: if self.id not in [bind.id for bind in study_data.config.bindings]: return CommandOutput(status=False, message=f"Binding constraint not found: '{self.id}'") binding_constraints = study_data.tree.get(["input", "bindingconstraints", "bindingconstraints"]) diff --git a/antarest/study/storage/variantstudy/model/command/remove_cluster.py b/antarest/study/storage/variantstudy/model/command/remove_cluster.py index 3895b423e9..76aafc36c1 100644 --- a/antarest/study/storage/variantstudy/model/command/remove_cluster.py +++ b/antarest/study/storage/variantstudy/model/command/remove_cluster.py @@ -19,6 +19,7 @@ ) from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO @@ -97,7 +98,7 @@ def _remove_cluster_from_scenario_builder(self, study_data: FileStudy) -> None: study_data.tree.save(rulesets, ["settings", "scenariobuilder"]) - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: """ Applies the study data to update thermal cluster configurations and saves the changes: remove corresponding the configuration and remove the attached time series. diff --git a/antarest/study/storage/variantstudy/model/command/remove_district.py b/antarest/study/storage/variantstudy/model/command/remove_district.py index 586a827943..9176aa9ac4 100644 --- a/antarest/study/storage/variantstudy/model/command/remove_district.py +++ b/antarest/study/storage/variantstudy/model/command/remove_district.py @@ -10,12 +10,13 @@ # # This file is part of the Antares project. -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Optional, Tuple from antarest.study.storage.rawstudy.model.filesystem.config.model import FileStudyTreeConfig from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO @@ -39,7 +40,7 @@ def _apply_config(self, study_data: FileStudyTreeConfig) -> Tuple[CommandOutput, del study_data.sets[self.id] return CommandOutput(status=True, message=self.id), dict() - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: Optional[ICommandListener] = None) -> CommandOutput: output, _ = self._apply_config(study_data.config) study_data.tree.delete(["input", "areas", "sets", self.id]) return output diff --git a/antarest/study/storage/variantstudy/model/command/remove_link.py b/antarest/study/storage/variantstudy/model/command/remove_link.py index eeb7ad81f7..154fd10ed1 100644 --- a/antarest/study/storage/variantstudy/model/command/remove_link.py +++ b/antarest/study/storage/variantstudy/model/command/remove_link.py @@ -19,6 +19,7 @@ from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand, OutputTuple +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO @@ -124,7 +125,7 @@ def _remove_link_from_scenario_builder(self, study_data: FileStudy) -> None: study_data.tree.save(rulesets, ["settings", "scenariobuilder"]) - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: """ Update the configuration and the study data by removing the link between the source and target areas. diff --git a/antarest/study/storage/variantstudy/model/command/remove_renewables_cluster.py b/antarest/study/storage/variantstudy/model/command/remove_renewables_cluster.py index 834dc1043b..b80eb83219 100644 --- a/antarest/study/storage/variantstudy/model/command/remove_renewables_cluster.py +++ b/antarest/study/storage/variantstudy/model/command/remove_renewables_cluster.py @@ -16,6 +16,7 @@ from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO @@ -92,7 +93,7 @@ def _remove_cluster_from_scenario_builder(self, study_data: FileStudy) -> None: study_data.tree.save(rulesets, ["settings", "scenariobuilder"]) - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: """ Applies the study data to update renewable cluster configurations and saves the changes: remove corresponding the configuration and remove the attached time series. diff --git a/antarest/study/storage/variantstudy/model/command/remove_st_storage.py b/antarest/study/storage/variantstudy/model/command/remove_st_storage.py index 9f60befabe..15e6e1c1ae 100644 --- a/antarest/study/storage/variantstudy/model/command/remove_st_storage.py +++ b/antarest/study/storage/variantstudy/model/command/remove_st_storage.py @@ -19,6 +19,7 @@ from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO # minimum required version. @@ -99,7 +100,7 @@ def _apply_config(self, study_data: FileStudyTreeConfig) -> t.Tuple[CommandOutpu {}, ) - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: """ Applies the study data to update storage configurations and saves the changes: remove the storage from the configuration and remove the attached time series. diff --git a/antarest/study/storage/variantstudy/model/command/replace_matrix.py b/antarest/study/storage/variantstudy/model/command/replace_matrix.py index 8a26f12f28..d74333cb2e 100644 --- a/antarest/study/storage/variantstudy/model/command/replace_matrix.py +++ b/antarest/study/storage/variantstudy/model/command/replace_matrix.py @@ -24,6 +24,7 @@ from antarest.study.storage.variantstudy.business.utils import AliasDecoder, strip_matrix_protocol, validate_matrix from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO @@ -57,7 +58,7 @@ def _apply_config(self, study_data: FileStudyTreeConfig) -> t.Tuple[CommandOutpu {}, ) - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: if self.target[0] == "@": self.target = AliasDecoder.decode(self.target, study_data) diff --git a/antarest/study/storage/variantstudy/model/command/update_binding_constraint.py b/antarest/study/storage/variantstudy/model/command/update_binding_constraint.py index 423c431d38..d9b1620c2a 100644 --- a/antarest/study/storage/variantstudy/model/command/update_binding_constraint.py +++ b/antarest/study/storage/variantstudy/model/command/update_binding_constraint.py @@ -30,6 +30,7 @@ create_binding_constraint_config, ) from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO @@ -150,7 +151,7 @@ def _find_binding_config(self, binding_constraints: t.Mapping[str, JSON]) -> t.O return str(index), binding_config return None - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: binding_constraints = study_data.tree.get(["input", "bindingconstraints", "bindingconstraints"]) # When all BC of a given group are removed, the group should be removed from the scenario builder diff --git a/antarest/study/storage/variantstudy/model/command/update_comments.py b/antarest/study/storage/variantstudy/model/command/update_comments.py index 5a3d57a670..7c77c920e8 100644 --- a/antarest/study/storage/variantstudy/model/command/update_comments.py +++ b/antarest/study/storage/variantstudy/model/command/update_comments.py @@ -10,13 +10,14 @@ # # This file is part of the Antares project. -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Optional, Tuple from antarest.core.model import JSON from antarest.study.storage.rawstudy.model.filesystem.config.model import FileStudyTreeConfig from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO @@ -45,7 +46,7 @@ def _apply_config(self, study_data: FileStudyTreeConfig) -> Tuple[CommandOutput, dict(), ) - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: Optional[ICommandListener] = None) -> CommandOutput: replace_comment_data: JSON = {"settings": {"comments": self.comments.encode("utf-8")}} study_data.tree.save(replace_comment_data) diff --git a/antarest/study/storage/variantstudy/model/command/update_config.py b/antarest/study/storage/variantstudy/model/command/update_config.py index 067b0ecba1..538d5c5923 100644 --- a/antarest/study/storage/variantstudy/model/command/update_config.py +++ b/antarest/study/storage/variantstudy/model/command/update_config.py @@ -20,6 +20,7 @@ from antarest.study.storage.rawstudy.model.filesystem.ini_file_node import IniFileNode from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO _ENR_MODELLING_KEY = "settings/generaldata/other preferences/renewable-generation-modelling" @@ -63,7 +64,7 @@ def _apply_config(self, study_data: FileStudyTreeConfig) -> t.Tuple[CommandOutpu return CommandOutput(status=True, message="ok"), {} - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: url = self.target.split("/") tree_node = study_data.tree.get_node(url) if not isinstance(tree_node, IniFileNode): diff --git a/antarest/study/storage/variantstudy/model/command/update_district.py b/antarest/study/storage/variantstudy/model/command/update_district.py index e0d63dfafd..e5ceffb2d2 100644 --- a/antarest/study/storage/variantstudy/model/command/update_district.py +++ b/antarest/study/storage/variantstudy/model/command/update_district.py @@ -17,6 +17,7 @@ from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.create_district import DistrictBaseFilter from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO @@ -66,7 +67,7 @@ def _apply_config(self, study_data: FileStudyTreeConfig) -> Tuple[CommandOutput, "item_key": item_key, } - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: Optional[ICommandListener] = None) -> CommandOutput: output, data = self._apply_config(study_data.config) if not output.status: return output diff --git a/antarest/study/storage/variantstudy/model/command/update_playlist.py b/antarest/study/storage/variantstudy/model/command/update_playlist.py index 52f6f70c5c..5dbb63a2a6 100644 --- a/antarest/study/storage/variantstudy/model/command/update_playlist.py +++ b/antarest/study/storage/variantstudy/model/command/update_playlist.py @@ -17,6 +17,7 @@ from antarest.study.storage.rawstudy.model.helpers import FileStudyHelpers from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO @@ -39,7 +40,7 @@ class UpdatePlaylist(ICommand): weights: Optional[Dict[int, float]] = None reverse: bool = False - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: Optional[ICommandListener] = None) -> CommandOutput: FileStudyHelpers.set_playlist( study_data, self.items or [], diff --git a/antarest/study/storage/variantstudy/model/command/update_raw_file.py b/antarest/study/storage/variantstudy/model/command/update_raw_file.py index 1a3414f90b..5b6ed99296 100644 --- a/antarest/study/storage/variantstudy/model/command/update_raw_file.py +++ b/antarest/study/storage/variantstudy/model/command/update_raw_file.py @@ -11,13 +11,14 @@ # This file is part of the Antares project. import base64 -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Optional, Tuple from antarest.study.storage.rawstudy.model.filesystem.config.model import FileStudyTreeConfig from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy from antarest.study.storage.rawstudy.model.filesystem.raw_file_node import RawFileNode from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO @@ -50,7 +51,7 @@ def __repr__(self) -> str: def _apply_config(self, study_data: FileStudyTreeConfig) -> Tuple[CommandOutput, Dict[str, Any]]: return CommandOutput(status=True, message="ok"), {} - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: Optional[ICommandListener] = None) -> CommandOutput: url = self.target.split("/") tree_node = study_data.tree.get_node(url) if not isinstance(tree_node, RawFileNode): diff --git a/antarest/study/storage/variantstudy/model/command/update_scenario_builder.py b/antarest/study/storage/variantstudy/model/command/update_scenario_builder.py index 86a127f776..c660dc226b 100644 --- a/antarest/study/storage/variantstudy/model/command/update_scenario_builder.py +++ b/antarest/study/storage/variantstudy/model/command/update_scenario_builder.py @@ -19,6 +19,7 @@ from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.model import CommandDTO @@ -53,7 +54,7 @@ class UpdateScenarioBuilder(ICommand): data: t.Union[t.Dict[str, t.Any], t.Mapping[str, t.Any], t.MutableMapping[str, t.Any]] - def _apply(self, study_data: FileStudy) -> CommandOutput: + def _apply(self, study_data: FileStudy, listener: t.Optional[ICommandListener] = None) -> CommandOutput: """ Apply the command to the study data. diff --git a/antarest/study/storage/variantstudy/model/command_listener/__init__.py b/antarest/study/storage/variantstudy/model/command_listener/__init__.py new file mode 100644 index 0000000000..058c6b221a --- /dev/null +++ b/antarest/study/storage/variantstudy/model/command_listener/__init__.py @@ -0,0 +1,11 @@ +# Copyright (c) 2024, RTE (https://www.rte-france.com) +# +# See AUTHORS.txt +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# SPDX-License-Identifier: MPL-2.0 +# +# This file is part of the Antares project. diff --git a/antarest/study/storage/variantstudy/model/command_listener/command_listener.py b/antarest/study/storage/variantstudy/model/command_listener/command_listener.py new file mode 100644 index 0000000000..c2c4da8f4e --- /dev/null +++ b/antarest/study/storage/variantstudy/model/command_listener/command_listener.py @@ -0,0 +1,26 @@ +# Copyright (c) 2024, RTE (https://www.rte-france.com) +# +# See AUTHORS.txt +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# SPDX-License-Identifier: MPL-2.0 +# +# This file is part of the Antares project. + +from abc import ABC, abstractmethod + + +class ICommandListener(ABC): + """ + Interface for all listeners that can be given inside the `apply` method of a command. + """ + + @abstractmethod + def notify_progress(self, progress: int) -> None: + """ + Given a command progression, notifies the information. + """ + raise NotImplementedError() diff --git a/antarest/study/storage/variantstudy/snapshot_generator.py b/antarest/study/storage/variantstudy/snapshot_generator.py index 086c6d3952..afde1004eb 100644 --- a/antarest/study/storage/variantstudy/snapshot_generator.py +++ b/antarest/study/storage/variantstudy/snapshot_generator.py @@ -23,7 +23,7 @@ from antarest.core.interfaces.cache import CacheConstants, ICache from antarest.core.jwt import JWTUser from antarest.core.model import StudyPermissionType -from antarest.core.tasks.service import TaskUpdateNotifier, noop_notifier +from antarest.core.tasks.service import ITaskNotifier, NoopNotifier from antarest.study.model import RawStudy, StudyAdditionalData from antarest.study.storage.patch_service import PatchService from antarest.study.storage.rawstudy.model.filesystem.config.model import FileStudyTreeConfigDTO @@ -31,6 +31,7 @@ from antarest.study.storage.rawstudy.raw_study_service import RawStudyService from antarest.study.storage.utils import assert_permission_on_studies, export_study_flat from antarest.study.storage.variantstudy.command_factory import CommandFactory +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.dbmodel import CommandBlock, VariantStudy, VariantStudySnapshot from antarest.study.storage.variantstudy.model.model import GenerationResultInfoDTO from antarest.study.storage.variantstudy.repository import VariantStudyRepository @@ -70,7 +71,8 @@ def generate_snapshot( *, denormalize: bool = True, from_scratch: bool = False, - notifier: TaskUpdateNotifier = noop_notifier, + notifier: ITaskNotifier = NoopNotifier(), + listener: t.Optional[ICommandListener] = None, ) -> GenerationResultInfoDTO: # ATTENTION: since we are making changes to disk, a file lock is needed. # The locking is currently done in the `VariantStudyService.generate_task` function @@ -100,7 +102,7 @@ def generate_snapshot( self._export_ref_study(snapshot_dir, ref_study) logger.info(f"Applying commands to the reference study '{ref_study.id}'...") - results = self._apply_commands(snapshot_dir, variant_study, cmd_blocks) + results = self._apply_commands(snapshot_dir, variant_study, cmd_blocks, listener) # The snapshot is generated, we also need to de-normalize the matrices. file_study = self.study_factory.create_from_fs( @@ -133,7 +135,7 @@ def generate_snapshot( else: try: - notifier(results.model_dump_json()) + notifier.notify_message(results.model_dump_json()) except Exception as exc: # This exception is ignored, because it is not critical. logger.warning(f"Error while sending notification: {exc}", exc_info=True) @@ -174,6 +176,7 @@ def _apply_commands( snapshot_dir: Path, variant_study: VariantStudy, cmd_blocks: t.Sequence[CommandBlock], + listener: t.Optional[ICommandListener] = None, ) -> GenerationResultInfoDTO: commands = [self.command_factory.to_command(cb.to_dto()) for cb in cmd_blocks] generator = VariantCommandGenerator(self.study_factory) @@ -183,6 +186,7 @@ def _apply_commands( variant_study, delete_on_failure=False, # Not needed, because we are using a temporary directory notifier=None, + listener=listener, ) if not results.success: message = f"Failed to generate variant study {variant_study.id}" diff --git a/antarest/study/storage/variantstudy/variant_command_generator.py b/antarest/study/storage/variantstudy/variant_command_generator.py index de4bd60c0f..13e93a561f 100644 --- a/antarest/study/storage/variantstudy/variant_command_generator.py +++ b/antarest/study/storage/variantstudy/variant_command_generator.py @@ -14,7 +14,7 @@ import shutil import uuid from pathlib import Path -from typing import Callable, List, Optional, Tuple, Union, cast +from typing import Any, Callable, List, Optional, Tuple, Union, cast from antarest.core.utils.utils import StopWatch from antarest.study.storage.rawstudy.model.filesystem.config.model import FileStudyTreeConfig @@ -22,12 +22,13 @@ from antarest.study.storage.utils import update_antares_info from antarest.study.storage.variantstudy.model.command.common import CommandOutput from antarest.study.storage.variantstudy.model.command.icommand import ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.dbmodel import VariantStudy from antarest.study.storage.variantstudy.model.model import GenerationResultInfoDTO, NewDetailsDTO logger = logging.getLogger(__name__) -APPLY_CALLBACK = Callable[[ICommand, Union[FileStudyTreeConfig, FileStudy]], CommandOutput] +APPLY_CALLBACK = Callable[[ICommand, Union[FileStudyTreeConfig, FileStudy], Optional[ICommandListener]], CommandOutput] class CmdNotifier: @@ -51,6 +52,7 @@ def _generate( applier: APPLY_CALLBACK, metadata: Optional[VariantStudy] = None, notifier: Optional[Callable[[int, bool, str], None]] = None, + listener: Optional[ICommandListener] = None, ) -> GenerationResultInfoDTO: stopwatch = StopWatch() # Apply commands @@ -69,7 +71,7 @@ def _generate( # Store all the outputs for index, cmd in enumerate(all_commands, 1): try: - output = applier(cmd, data) + output = applier(cmd, data, listener) except Exception as e: # Unhandled exception output = CommandOutput( @@ -116,6 +118,7 @@ def generate( metadata: Optional[VariantStudy] = None, delete_on_failure: bool = True, notifier: Optional[Callable[[int, bool, str], None]] = None, + listener: Optional[ICommandListener] = None, ) -> GenerationResultInfoDTO: # Build file study logger.info("Building study tree") @@ -126,7 +129,7 @@ def generate( results = VariantCommandGenerator._generate( commands, study, - lambda command, data: command.apply(cast(FileStudy, data)), + lambda command, data, listener: command.apply(cast(FileStudy, data), listener), metadata, notifier, ) @@ -146,7 +149,7 @@ def generate_config( results = VariantCommandGenerator._generate( commands, config, - lambda command, data: command.apply_config(cast(FileStudyTreeConfig, data)), + lambda command, data, listener: command.apply_config(cast(FileStudyTreeConfig, data)), metadata, notifier, ) diff --git a/antarest/study/storage/variantstudy/variant_study_service.py b/antarest/study/storage/variantstudy/variant_study_service.py index 72bd1804c7..df5fde4456 100644 --- a/antarest/study/storage/variantstudy/variant_study_service.py +++ b/antarest/study/storage/variantstudy/variant_study_service.py @@ -45,7 +45,7 @@ from antarest.core.requests import RequestParameters, UserHasNotPermissionError from antarest.core.serialization import to_json_string from antarest.core.tasks.model import CustomTaskEventMessages, TaskDTO, TaskResult, TaskType -from antarest.core.tasks.service import DEFAULT_AWAIT_MAX_TIMEOUT, ITaskService, TaskUpdateNotifier, noop_notifier +from antarest.core.tasks.service import DEFAULT_AWAIT_MAX_TIMEOUT, ITaskNotifier, ITaskService, NoopNotifier from antarest.core.utils.fastapi_sqlalchemy import db from antarest.core.utils.utils import assert_this, suppress_exception from antarest.matrixstore.service import MatrixService @@ -60,6 +60,7 @@ from antarest.study.storage.variantstudy.business.utils import transform_command_to_dto from antarest.study.storage.variantstudy.command_factory import CommandFactory from antarest.study.storage.variantstudy.model.command.icommand import ICommand +from antarest.study.storage.variantstudy.model.command_listener.command_listener import ICommandListener from antarest.study.storage.variantstudy.model.dbmodel import CommandBlock, VariantStudy from antarest.study.storage.variantstudy.model.model import ( CommandDTO, @@ -587,6 +588,7 @@ def generate_task( metadata: VariantStudy, denormalize: bool = False, from_scratch: bool = False, + listener: t.Optional[ICommandListener] = None, ) -> str: study_id = metadata.id with FileLock(str(self.config.storage.tmp_dir / f"study-generation-{study_id}.lock")): @@ -611,7 +613,7 @@ def generate_task( # db context, so we need to fetch the id attribute before study_id = metadata.id - def callback(notifier: TaskUpdateNotifier) -> TaskResult: + def callback(notifier: ITaskNotifier) -> TaskResult: generator = SnapshotGenerator( cache=self.cache, raw_study_service=self.raw_study_service, @@ -626,6 +628,7 @@ def callback(notifier: TaskUpdateNotifier) -> TaskResult: denormalize=denormalize, from_scratch=from_scratch, notifier=notifier, + listener=listener, ) return TaskResult( success=generate_result.success, @@ -710,7 +713,7 @@ def _generate_study_config( def _get_commands_and_notifier( self, variant_study: VariantStudy, - notifier: TaskUpdateNotifier, + notifier: ITaskNotifier, from_index: int = 0, ) -> t.Tuple[t.List[t.List[ICommand]], t.Callable[[int, bool, str], None]]: # Generate @@ -724,7 +727,7 @@ def notify(command_index: int, command_result: bool, command_message: str) -> No success=command_result, message=command_message, ) - notifier(command_result_obj.model_dump_json()) + notifier.notify_message(command_result_obj.model_dump_json()) self.event_bus.push( Event( type=EventType.STUDY_VARIANT_GENERATION_COMMAND_RESULT, @@ -753,7 +756,7 @@ def _generate_config( self, variant_study: VariantStudy, config: FileStudyTreeConfig, - notifier: TaskUpdateNotifier = noop_notifier, + notifier: ITaskNotifier = NoopNotifier(), ) -> t.Tuple[GenerationResultInfoDTO, FileStudyTreeConfig]: commands, notify = self._get_commands_and_notifier(variant_study=variant_study, notifier=notifier) return self.generator.generate_config(commands, config, variant_study, notifier=notify) @@ -762,7 +765,7 @@ def _generate_snapshot( self, variant_study: VariantStudy, dst_path: Path, - notifier: TaskUpdateNotifier = noop_notifier, + notifier: ITaskNotifier = NoopNotifier(), from_command_index: int = 0, ) -> GenerationResultInfoDTO: commands, notify = self._get_commands_and_notifier( @@ -1112,12 +1115,12 @@ def _clear_all_snapshots(self) -> None: if variant.last_access and variant.last_access < datetime.utcnow() - self._retention_time: self._variant_study_service.clear_snapshot(variant) - def run_task(self, notifier: TaskUpdateNotifier) -> TaskResult: + def run_task(self, notifier: ITaskNotifier) -> TaskResult: msg = f"Start cleaning all snapshots updated or accessed {humanize.precisedelta(self._retention_time)} ago." - notifier(msg) + notifier.notify_message(msg) self._clear_all_snapshots() msg = "All selected snapshots were successfully cleared." - notifier(msg) + notifier.notify_message(msg) return TaskResult(success=True, message=msg) __call__ = run_task diff --git a/tests/conftest_services.py b/tests/conftest_services.py index 70abcd6007..aef3457e3b 100644 --- a/tests/conftest_services.py +++ b/tests/conftest_services.py @@ -29,7 +29,7 @@ from antarest.core.interfaces.eventbus import IEventBus from antarest.core.requests import RequestParameters from antarest.core.tasks.model import CustomTaskEventMessages, TaskDTO, TaskListFilter, TaskResult, TaskStatus, TaskType -from antarest.core.tasks.service import ITaskService, Task +from antarest.core.tasks.service import ITaskService, NoopNotifier, Task from antarest.core.utils.fastapi_sqlalchemy import DBSessionMiddleware from antarest.eventbus.business.local_eventbus import LocalEventBus from antarest.eventbus.service import EventBusService @@ -89,7 +89,7 @@ def add_task( custom_event_messages: t.Optional[CustomTaskEventMessages], request_params: RequestParameters, ) -> str: - self._task_result = action(lambda message: None) + self._task_result = action(NoopNotifier()) return str(uuid.uuid4()) def status_task( diff --git a/tests/core/test_tasks.py b/tests/core/test_tasks.py index 3ae45525c5..ba3f6de813 100644 --- a/tests/core/test_tasks.py +++ b/tests/core/test_tasks.py @@ -16,6 +16,8 @@ from pathlib import Path from unittest.mock import ANY, Mock +import numpy as np +import pandas as pd import pytest from fastapi import HTTPException from sqlalchemy import create_engine # type: ignore @@ -23,8 +25,8 @@ from sqlalchemy.orm import Session, sessionmaker # type: ignore from antarest.core.config import Config -from antarest.core.interfaces.eventbus import EventType, IEventBus -from antarest.core.jwt import JWTUser +from antarest.core.interfaces.eventbus import DummyEventBusService, EventType, IEventBus +from antarest.core.jwt import DEFAULT_ADMIN_USER, JWTUser from antarest.core.model import PermissionInfo, PublicMode from antarest.core.persistence import Base from antarest.core.requests import RequestParameters, UserHasNotPermissionError @@ -38,15 +40,22 @@ cancel_orphan_tasks, ) from antarest.core.tasks.repository import TaskJobRepository -from antarest.core.tasks.service import TaskJobService +from antarest.core.tasks.service import ITaskNotifier, TaskJobService from antarest.core.utils.fastapi_sqlalchemy import db from antarest.eventbus.business.local_eventbus import LocalEventBus from antarest.eventbus.service import EventBusService from antarest.login.model import User from antarest.service_creator import SESSION_ARGS from antarest.study.model import RawStudy +from antarest.study.repository import StudyMetadataRepository +from antarest.study.service import ThermalClusterTimeSeriesGeneratorTask +from antarest.study.storage.rawstudy.raw_study_service import RawStudyService +from antarest.study.storage.variantstudy.command_factory import CommandFactory +from antarest.study.storage.variantstudy.model.command_context import CommandContext +from antarest.study.storage.variantstudy.variant_study_service import VariantStudyService from antarest.worker.worker import AbstractWorker, WorkerTaskCommand from tests.helpers import with_db_context +from tests.storage.test_service import build_study_service @pytest.fixture(name="db_engine", autouse=True) @@ -128,7 +137,7 @@ def test_service(core_config: Config, event_bus: IEventBus, admin_user: JWTUser) # ================================================ # noinspection PyUnusedLocal - def action_fail(update_msg: t.Callable[[str], None]) -> TaskResult: + def action_fail(notifier: ITaskNotifier) -> TaskResult: raise Exception("this action failed") failed_id = service.add_task( @@ -155,9 +164,9 @@ def action_fail(update_msg: t.Callable[[str], None]) -> TaskResult: # Test Case: add a task that succeeds and wait for it # =================================================== - def action_ok(update_msg: t.Callable[[str], None]) -> TaskResult: - update_msg("start") - update_msg("end") + def action_ok(notifier: ITaskNotifier) -> TaskResult: + notifier.notify_message("start") + notifier.notify_message("end") return TaskResult(success=True, message="OK") ok_id = service.add_task( @@ -440,3 +449,151 @@ def test_get_progress(db_session: Session, admin_user: JWTUser, core_config: Con wrong_id = "foo_bar" with pytest.raises(HTTPException, match=f"Task {wrong_id} not found"): service.get_task_progress(wrong_id, RequestParameters(user)) + + +def test_ts_generation_task( + tmp_path: Path, + core_config: Config, + admin_user: JWTUser, + raw_study_service: RawStudyService, + db_session: Session, +) -> None: + # ======================= + # SET UP + # ======================= + + event_bus = DummyEventBusService() + + # Create a TaskJobService and add tasks + task_job_repo = TaskJobRepository(db_session) + + # Create a TaskJobService + task_job_service = TaskJobService(config=core_config, repository=task_job_repo, event_bus=event_bus) + + # Create a raw study + raw_study_path = tmp_path / "study" + + regular_user = User(id=99, name="regular") + db_session.add(regular_user) + db_session.commit() + + raw_study = RawStudy( + id="my_raw_study", + name="my_raw_study", + version="860", + author="John Smith", + created_at=datetime.datetime(2023, 7, 15, 16, 45), + updated_at=datetime.datetime(2023, 7, 19, 8, 15), + last_access=datetime.datetime.utcnow(), + public_mode=PublicMode.FULL, + owner=regular_user, + path=str(raw_study_path), + ) + study_metadata_repository = StudyMetadataRepository(Mock(), None) + db_session.add(raw_study) + db_session.commit() + + # Set up the Raw Study + raw_study_service.create(raw_study) + # Create an area + areas_path = raw_study_path / "input" / "areas" + areas_path.mkdir(parents=True, exist_ok=True) + (areas_path / "fr").mkdir(parents=True, exist_ok=True) + (areas_path / "list.txt").touch(exist_ok=True) + with open(areas_path / "list.txt", mode="w") as f: + f.writelines(["fr"]) + # Create 2 thermal clusters + thermal_path = raw_study_path / "input" / "thermal" + thermal_path.mkdir(parents=True, exist_ok=True) + fr_path = thermal_path / "clusters" / "fr" + fr_path.mkdir(parents=True, exist_ok=True) + (fr_path / "list.ini").touch(exist_ok=True) + content = """ + [th_1] +name = th_1 +nominalcapacity = 14.0 + +[th_2] +name = th_2 +nominalcapacity = 14.0 +""" + (fr_path / "list.ini").write_text(content) + # Create matrix files + for th_name in ["th_1", "th_2"]: + prepro_folder = thermal_path / "prepro" / "fr" / th_name + prepro_folder.mkdir(parents=True, exist_ok=True) + # Modulation + modulation_df = pd.DataFrame(data=np.ones((8760, 3))) + modulation_df.to_csv(prepro_folder / "modulation.txt", sep="\t", header=False, index=False) + (prepro_folder / "data.txt").touch() + # Data + data_df = pd.DataFrame(data=np.zeros((365, 6))) + data_df[0] = [1] * 365 + data_df[1] = [1] * 365 + data_df.to_csv(prepro_folder / "data.txt", sep="\t", header=False, index=False) + # Series + series_path = thermal_path / "series" / "fr" / th_name + series_path.mkdir(parents=True, exist_ok=True) + (series_path / "series.txt").touch() + + # Set up the mocks + variant_study_service = Mock(spec=VariantStudyService) + command_factory = Mock(spec=CommandFactory) + variant_study_service.command_factory = command_factory + command_factory.command_context = Mock(spec=CommandContext) + config = Mock(spec=Config) + + study_service = build_study_service( + raw_study_service, + study_metadata_repository, + config, + task_service=task_job_service, + event_bus=event_bus, + variant_study_service=variant_study_service, + ) + + # ======================= + # TEST CASE + # ======================= + + task = ThermalClusterTimeSeriesGeneratorTask( + raw_study.id, + repository=study_service.repository, + storage_service=study_service.storage_service, + event_bus=study_service.event_bus, + ) + + task_id = study_service.task_service.add_task( + task, + "test_generation", + task_type=TaskType.THERMAL_CLUSTER_SERIES_GENERATION, + ref_id=raw_study.id, + progress=0, + custom_event_messages=None, + request_params=RequestParameters(DEFAULT_ADMIN_USER), + ) + + # Await task + study_service.task_service.await_task(task_id, 2) + tasks = study_service.task_service.list_tasks(TaskListFilter(), RequestParameters(DEFAULT_ADMIN_USER)) + assert len(tasks) == 1 + task = tasks[0] + assert task.ref_id == raw_study.id + assert task.id == task_id + assert task.name == "test_generation" + assert task.status == TaskStatus.COMPLETED + assert task.progress == 100 + + # Check eventbus + events = event_bus.events + assert len(events) == 6 + assert events[0].type == EventType.TASK_ADDED + assert events[1].type == EventType.TASK_RUNNING + + assert events[2].type == EventType.TASK_PROGRESS + assert events[2].payload == {"task_id": task_id, "progress": 50} + assert events[3].type == EventType.TASK_PROGRESS + assert events[3].payload == {"task_id": task_id, "progress": 100} + + assert events[4].type == EventType.STUDY_EDITED + assert events[5].type == EventType.TASK_COMPLETED diff --git a/tests/storage/conftest.py b/tests/storage/conftest.py index bad7def6a6..589d023919 100644 --- a/tests/storage/conftest.py +++ b/tests/storage/conftest.py @@ -25,7 +25,7 @@ from antarest.core.model import JSON from antarest.core.requests import RequestParameters from antarest.core.tasks.model import CustomTaskEventMessages, TaskDTO, TaskListFilter, TaskStatus, TaskType -from antarest.core.tasks.service import ITaskService, Task +from antarest.core.tasks.service import ITaskService, NoopNotifier, Task @pytest.fixture @@ -296,7 +296,7 @@ def add_task( custom_event_messages: Optional[CustomTaskEventMessages], request_params: RequestParameters, ) -> str: - action(lambda message: None) + action(NoopNotifier()) return str(uuid.uuid4()) def status_task( diff --git a/tests/storage/test_service.py b/tests/storage/test_service.py index 6490a46096..d15e8f6d7a 100644 --- a/tests/storage/test_service.py +++ b/tests/storage/test_service.py @@ -28,7 +28,7 @@ from antarest.core.exceptions import StudyVariantUpgradeError, TaskAlreadyRunning from antarest.core.filetransfer.model import FileDownload, FileDownloadTaskDTO from antarest.core.interfaces.cache import ICache -from antarest.core.interfaces.eventbus import Event, EventType +from antarest.core.interfaces.eventbus import Event, EventType, IEventBus from antarest.core.jwt import DEFAULT_ADMIN_USER, JWTGroup, JWTUser from antarest.core.model import JSON, SUB_JSON, PermissionInfo, PublicMode, StudyPermissionType from antarest.core.requests import RequestParameters, UserHasNotPermissionError @@ -92,13 +92,14 @@ def build_study_service( cache_service: ICache = Mock(spec=ICache), variant_study_service: VariantStudyService = Mock(spec=VariantStudyService), task_service: ITaskService = Mock(spec=ITaskService), + event_bus: IEventBus = Mock(spec=IEventBus), ) -> StudyService: return StudyService( raw_study_service=raw_study_service, variant_study_service=variant_study_service, user_service=user_service, repository=repository, - event_bus=Mock(), + event_bus=event_bus, task_service=task_service, file_transfer_manager=Mock(), cache_service=cache_service, diff --git a/tests/study/storage/variantstudy/test_snapshot_generator.py b/tests/study/storage/variantstudy/test_snapshot_generator.py index 0f2d4775ef..5fcd7c5dbd 100644 --- a/tests/study/storage/variantstudy/test_snapshot_generator.py +++ b/tests/study/storage/variantstudy/test_snapshot_generator.py @@ -28,6 +28,7 @@ from antarest.core.jwt import JWTGroup, JWTUser from antarest.core.requests import RequestParameters from antarest.core.roles import RoleType +from antarest.core.tasks.service import ITaskNotifier from antarest.core.utils.fastapi_sqlalchemy import db from antarest.login.model import Group, Role, User from antarest.study.model import RawStudy, Study, StudyAdditionalData @@ -39,6 +40,8 @@ from tests.db_statement_recorder import DBStatementRecorder from tests.helpers import AnyUUID, with_db_context +logger = logging.getLogger(__name__) + def _create_variant( tmp_path: Path, @@ -682,7 +685,7 @@ def test_search_ref_study__deleted_last_command(self, tmp_path: Path) -> None: assert search_result.force_regenerate is True -class RegisterNotification: +class RegisterNotification(ITaskNotifier): """ Callable used to register notifications. """ @@ -690,9 +693,23 @@ class RegisterNotification: def __init__(self) -> None: self.notifications: t.MutableSequence[str] = [] - def __call__(self, notification: str) -> None: + def notify_message(self, notification: str) -> None: self.notifications.append(json.loads(notification)) + def notify_progress(self, progress: int) -> None: + return + + +class FailingNotifier(ITaskNotifier): + def __init__(self) -> None: + pass + + def notify_message(self, notification: str) -> None: + logger.warning("Something went wrong") + + def notify_progress(self, progress: int) -> None: + return + class TestSnapshotGenerator: """ @@ -1159,7 +1176,7 @@ def test_generate__notification_failure( repository=variant_study_service.repository, ) - notifier = Mock(side_effect=Exception("Something went wrong")) + notifier = FailingNotifier() with caplog.at_level(logging.WARNING): results = generator.generate_snapshot(