From 69010dcb791381849c6c1a3ccd629989fc8fce63 Mon Sep 17 00:00:00 2001 From: Paul Bui-Quang Date: Mon, 29 Aug 2022 16:09:02 +0200 Subject: [PATCH] Add automatic archival service (#1045) --- ...e65e0c04606b_add_last_access_study_prop.py | 34 ++++++++++ antarest/core/config.py | 4 ++ antarest/main.py | 8 +++ antarest/singleton_services.py | 5 ++ antarest/study/model.py | 3 +- antarest/study/service.py | 17 +++++ .../study/storage/auto_archive_service.py | 57 ++++++++++++++++ .../variantstudy/variant_study_service.py | 5 ++ antarest/utils.py | 17 +++-- scripts/rollback.sh | 2 +- .../business/test_autoarchive_service.py | 67 +++++++++++++++++++ tests/storage/test_service.py | 9 ++- 12 files changed, 220 insertions(+), 8 deletions(-) create mode 100644 alembic/versions/e65e0c04606b_add_last_access_study_prop.py create mode 100644 antarest/study/storage/auto_archive_service.py create mode 100644 tests/storage/business/test_autoarchive_service.py diff --git a/alembic/versions/e65e0c04606b_add_last_access_study_prop.py b/alembic/versions/e65e0c04606b_add_last_access_study_prop.py new file mode 100644 index 0000000000..31bced91d7 --- /dev/null +++ b/alembic/versions/e65e0c04606b_add_last_access_study_prop.py @@ -0,0 +1,34 @@ +"""add_last_access_study_prop + +Revision ID: e65e0c04606b +Revises: 26c50ef2a0e1 +Create Date: 2022-08-09 12:48:55.860099 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'e65e0c04606b' +down_revision = '26c50ef2a0e1' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('study', schema=None) as batch_op: + batch_op.add_column(sa.Column('last_access', sa.DateTime(), nullable=True)) + + + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('study', schema=None) as batch_op: + batch_op.drop_column('last_access') + + # ### end Alembic commands ### diff --git a/antarest/core/config.py b/antarest/core/config.py index 6ede37e3d5..6ffaf32a08 100644 --- a/antarest/core/config.py +++ b/antarest/core/config.py @@ -126,6 +126,7 @@ class StorageConfig: download_default_expiration_timeout_minutes: int = 1440 matrix_gc_sleeping_time: int = 3600 matrix_gc_dry_run: bool = False + auto_archive_threshold_days: int = 60 @staticmethod def from_dict(data: JSON) -> "StorageConfig": @@ -145,6 +146,9 @@ def from_dict(data: JSON) -> "StorageConfig": ), matrix_gc_sleeping_time=data.get("matrix_gc_sleeping_time", 3600), matrix_gc_dry_run=data.get("matrix_gc_dry_run", False), + auto_archive_threshold_days=data.get( + "auto_archive_threshold_days", 60 + ), ) diff --git a/antarest/main.py b/antarest/main.py index 9d3b8d2089..217022f162 100644 --- a/antarest/main.py +++ b/antarest/main.py @@ -30,6 +30,7 @@ MatrixGarbageCollector, ) from antarest.singleton_services import SingletonServices +from antarest.study.storage.auto_archive_service import AutoArchiveService from antarest.study.storage.rawstudy.watcher import Watcher from antarest.tools.admin_lib import clean_locks from antarest.utils import ( @@ -261,6 +262,13 @@ def handle_all_exception(request: Request, exc: Exception) -> Any: matrix_gc = cast(MatrixGarbageCollector, services["matrix_gc"]) matrix_gc.start() + if ( + config.server.services + and Module.AUTO_ARCHIVER.value in config.server.services + ): + auto_archiver = cast(AutoArchiveService, services["auto_archiver"]) + auto_archiver.start() + customize_openapi(application) return application, services diff --git a/antarest/singleton_services.py b/antarest/singleton_services.py index 54b86278e0..9ebbf4f25b 100644 --- a/antarest/singleton_services.py +++ b/antarest/singleton_services.py @@ -7,6 +7,7 @@ from antarest.core.interfaces.service import IService from antarest.core.logging.utils import configure_logger from antarest.core.utils.utils import get_local_path +from antarest.study.storage.auto_archive_service import AutoArchiveService from antarest.utils import ( Module, init_db, @@ -63,6 +64,10 @@ def _init( worker = create_archive_worker(config, "test", event_bus=event_bus) services[Module.ARCHIVE_WORKER] = worker + if Module.AUTO_ARCHIVER in services_list: + auto_archive_service = AutoArchiveService(study_service, config) + services[Module.AUTO_ARCHIVER] = auto_archive_service + return services def start(self) -> None: diff --git a/antarest/study/model.py b/antarest/study/model.py index 297c8ca943..a2b67ad456 100644 --- a/antarest/study/model.py +++ b/antarest/study/model.py @@ -98,6 +98,7 @@ class Study(Base): # type: ignore author = Column(String(255)) created_at = Column(DateTime) updated_at = Column(DateTime) + last_access = Column(DateTime) path = Column(String()) folder = Column(String, nullable=True) parent_id = Column( @@ -116,7 +117,7 @@ class Study(Base): # type: ignore __mapper_args__ = {"polymorphic_identity": "study", "polymorphic_on": type} def __str__(self) -> str: - return f"[Study] id={self.id}, type={self.type}, name={self.name}, version={self.version}, updated_at={self.updated_at}, owner={self.owner}, groups={[str(u) + ',' for u in self.groups]}" + return f"[Study] id={self.id}, type={self.type}, name={self.name}, version={self.version}, updated_at={self.updated_at}, last_access={self.last_access}, owner={self.owner}, groups={[str(u) + ',' for u in self.groups]}" def __eq__(self, other: Any) -> bool: if not isinstance(other, Study): diff --git a/antarest/study/service.py b/antarest/study/service.py index ec90480407..c847b702bb 100644 --- a/antarest/study/service.py +++ b/antarest/study/service.py @@ -1819,6 +1819,7 @@ def delete_link( ) def archive(self, uuid: str, params: RequestParameters) -> str: + logger.info(f"Archiving study {uuid}") study = self.get_study(uuid) assert_permission(params.user, study, StudyPermissionType.DELETE) @@ -1981,6 +1982,9 @@ def get_study(self, uuid: str) -> Study: sanitized, ) raise StudyNotFoundError(uuid) + # todo debounce this with a "update_study_last_access" method updating only every some seconds + study.last_access = datetime.utcnow() + self.repository.save(study) return study def _assert_study_unarchived( @@ -2210,6 +2214,19 @@ def check_and_update_all_study_versions_in_database( storage = self.storage_service.raw_study_service storage.check_and_update_study_version_in_database(study) + def archive_outputs( + self, study_id: str, params: RequestParameters + ) -> None: + logger.info(f"Archiving all outputs for study {study_id}") + study = self.get_study(study_id) + assert_permission(params.user, study, StudyPermissionType.WRITE) + self._assert_study_unarchived(study) + study = self.get_study(study_id) + file_study = self.storage_service.get_storage(study).get_raw(study) + for output in file_study.config.outputs: + if not file_study.config.outputs[output].archived: + self.archive_output(study_id, output, True, params) + def archive_output( self, study_id: str, diff --git a/antarest/study/storage/auto_archive_service.py b/antarest/study/storage/auto_archive_service.py new file mode 100644 index 0000000000..ce77fe35d4 --- /dev/null +++ b/antarest/study/storage/auto_archive_service.py @@ -0,0 +1,57 @@ +import datetime +import logging +import time +from typing import List + +from antarest.core.config import Config +from antarest.core.interfaces.service import IService +from antarest.core.jwt import DEFAULT_ADMIN_USER +from antarest.core.requests import RequestParameters +from antarest.study.model import Study, RawStudy +from antarest.study.service import StudyService +from antarest.study.storage.utils import is_managed +from antarest.study.storage.variantstudy.model.dbmodel import VariantStudy + +logger = logging.getLogger(__name__) + + +class AutoArchiveService(IService): + def __init__(self, study_service: StudyService, config: Config): + super(AutoArchiveService, self).__init__() + self.study_service = study_service + self.config = config + + def _try_archive_studies(self) -> None: + now = datetime.datetime.utcnow() + studies: List[Study] = self.study_service.repository.get_all() + for study in studies: + if is_managed( + study + ) and study.updated_at < now - datetime.timedelta( + days=self.config.storage.auto_archive_threshold_days + ): + if isinstance(study, RawStudy) and not study.archived: + self.study_service.archive( + study.id, + params=RequestParameters(DEFAULT_ADMIN_USER), + ) + elif isinstance(study, VariantStudy): + self.study_service.storage_service.variant_study_service.clear_snapshot( + study + ) + self.study_service.archive_outputs( + study.id, + params=RequestParameters(DEFAULT_ADMIN_USER), + ) + + def _loop(self) -> None: + while True: + try: + self._try_archive_studies() + except Exception as e: + logger.error( + "Unexpected error happened when processing auto archive service loop", + exc_info=e, + ) + finally: + time.sleep(2) diff --git a/antarest/study/storage/variantstudy/variant_study_service.py b/antarest/study/storage/variantstudy/variant_study_service.py index ae7d740f67..0bcf3d2e0f 100644 --- a/antarest/study/storage/variantstudy/variant_study_service.py +++ b/antarest/study/storage/variantstudy/variant_study_service.py @@ -417,6 +417,11 @@ def invalidate_cache( for child in self.repository.get_children(parent_id=variant_study.id): self.invalidate_cache(child, invalidate_self_snapshot=True) + def clear_snapshot(self, variant_study: Study) -> None: + logger.info(f"Clearing snapshot for study {variant_study.id}") + self.invalidate_cache(variant_study, invalidate_self_snapshot=True) + shutil.rmtree(self.get_study_path(variant_study), ignore_errors=True) + def has_children(self, study: VariantStudy) -> bool: return len(self.repository.get_children(parent_id=study.id)) > 0 diff --git a/antarest/utils.py b/antarest/utils.py index 2070e77ec1..fc33c6f6ec 100644 --- a/antarest/utils.py +++ b/antarest/utils.py @@ -42,6 +42,7 @@ from antarest.matrixstore.service import MatrixService from antarest.study.main import build_study_service from antarest.study.service import StudyService +from antarest.study.storage.auto_archive_service import AutoArchiveService from antarest.study.storage.rawstudy.watcher import Watcher from antarest.study.web.watcher_blueprint import create_watcher_routes from antarest.worker.archive_worker import ArchiveWorker @@ -55,6 +56,7 @@ class Module(str, Enum): WATCHER = "watcher" MATRIX_GC = "matrix_gc" ARCHIVE_WORKER = "archive_worker" + AUTO_ARCHIVER = "auto_archiver" def get_default_config_path_or_raise() -> Path: @@ -268,15 +270,14 @@ def create_services( cache=cache, ) - watcher = create_watcher( - config=config, application=application, study_service=study_service - ) - if ( config.server.services and Module.WATCHER.value in config.server.services or create_all ): + watcher = create_watcher( + config=config, application=application, study_service=study_service + ) services["watcher"] = watcher if ( @@ -292,6 +293,14 @@ def create_services( ) services["matrix_gc"] = matrix_garbage_collector + if ( + config.server.services + and Module.AUTO_ARCHIVER.value in config.server.services + or create_all + ): + auto_archiver = AutoArchiveService(study_service, config) + services["auto_archiver"] = auto_archiver + services["event_bus"] = event_bus services["study"] = study_service services["launcher"] = launcher diff --git a/scripts/rollback.sh b/scripts/rollback.sh index dbddf396e6..54b7bb9844 100755 --- a/scripts/rollback.sh +++ b/scripts/rollback.sh @@ -4,5 +4,5 @@ CURDIR=$(cd `dirname $0` && pwd) BASEDIR=`dirname $CURDIR` cd $BASEDIR -alembic downgrade c9dcca887456 +alembic downgrade 26c50ef2a0e1 cd - diff --git a/tests/storage/business/test_autoarchive_service.py b/tests/storage/business/test_autoarchive_service.py new file mode 100644 index 0000000000..a5d572b1e6 --- /dev/null +++ b/tests/storage/business/test_autoarchive_service.py @@ -0,0 +1,67 @@ +import datetime +from pathlib import Path +from unittest.mock import Mock, call + +from antarest.core.config import Config, StorageConfig, WorkspaceConfig +from antarest.core.jwt import DEFAULT_ADMIN_USER +from antarest.core.requests import RequestParameters +from antarest.study.model import RawStudy, DEFAULT_WORKSPACE_NAME +from antarest.study.storage.auto_archive_service import AutoArchiveService +from antarest.study.storage.variantstudy.model.dbmodel import VariantStudy + + +def test_auto_archival(tmp_path: Path): + workspace_path = tmp_path / "workspace_test" + auto_archive_service = AutoArchiveService( + Mock(), + Config( + storage=StorageConfig( + workspaces={"test": WorkspaceConfig(path=workspace_path)} + ) + ), + ) + + now = datetime.datetime.now() + + auto_archive_service.study_service.repository = Mock() + auto_archive_service.study_service.repository.get_all.return_value = [ + RawStudy( + id="a", + workspace="not default", + updated_at=now - datetime.timedelta(days=61), + ), + RawStudy( + id="b", + workspace=DEFAULT_WORKSPACE_NAME, + updated_at=now - datetime.timedelta(days=59), + ), + RawStudy( + id="c", + workspace=DEFAULT_WORKSPACE_NAME, + updated_at=now - datetime.timedelta(days=61), + archived=True, + ), + RawStudy( + id="d", + workspace=DEFAULT_WORKSPACE_NAME, + updated_at=now - datetime.timedelta(days=61), + archived=False, + ), + VariantStudy(id="e", updated_at=now - datetime.timedelta(days=61)), + ] + auto_archive_service.study_service.storage_service = Mock() + auto_archive_service.study_service.storage_service.variant_study_service = ( + Mock() + ) + + auto_archive_service._try_archive_studies() + + auto_archive_service.study_service.archive.assert_called_once_with( + "d", params=RequestParameters(DEFAULT_ADMIN_USER) + ) + auto_archive_service.study_service.storage_service.variant_study_service.clear_snapshot.assert_called_once_with( + VariantStudy(id="e", updated_at=now - datetime.timedelta(days=61)) + ) + auto_archive_service.study_service.archive_outputs.assert_called_once_with( + "e", params=RequestParameters(DEFAULT_ADMIN_USER) + ) diff --git a/tests/storage/test_service.py b/tests/storage/test_service.py index 5e47a5c95b..e32250a288 100644 --- a/tests/storage/test_service.py +++ b/tests/storage/test_service.py @@ -793,7 +793,10 @@ def test_change_owner() -> None: user_service.get_user.assert_called_once_with( 2, RequestParameters(JWTUser(id=2, impersonator=2, type="users")) ) - repository.save.assert_called_once_with(RawStudy(id=uuid, owner=bob)) + repository.save.assert_called_with( + RawStudy(id=uuid, owner=bob, last_access=ANY) + ) + repository.save.assert_called_with(RawStudy(id=uuid, owner=bob)) service._edit_study_using_command.assert_called_once_with( study=study, url="study/antares/author", data="Bob" @@ -921,7 +924,7 @@ def test_set_public_mode() -> None: JWTUser(id=2, impersonator=2, type="users", groups=[group_admin]) ), ) - repository.save.assert_called_once_with( + repository.save.assert_called_with( Study(id=uuid, public_mode=PublicMode.FULL) ) @@ -1113,6 +1116,7 @@ def test_delete_with_prefetch(tmp_path: Path): groups=[], public_mode=PublicMode.NONE, workspace=DEFAULT_WORKSPACE_NAME, + last_access=datetime.utcnow(), ) study_mock.to_json_summary.return_value = {"id": "my_study", "name": "foo"} @@ -1138,6 +1142,7 @@ def test_delete_with_prefetch(tmp_path: Path): owner=None, groups=[], public_mode=PublicMode.NONE, + last_access=datetime.utcnow(), ) study_mock.to_json_summary.return_value = {"id": "my_study", "name": "foo"}