Skip to content

Commit

Permalink
Add automatic archival service (#1045)
Browse files Browse the repository at this point in the history
  • Loading branch information
pl-buiquang authored Aug 29, 2022
1 parent 05d0f5b commit 69010dc
Show file tree
Hide file tree
Showing 12 changed files with 220 additions and 8 deletions.
34 changes: 34 additions & 0 deletions alembic/versions/e65e0c04606b_add_last_access_study_prop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""add_last_access_study_prop
Revision ID: e65e0c04606b
Revises: 26c50ef2a0e1
Create Date: 2022-08-09 12:48:55.860099
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'e65e0c04606b'
down_revision = '26c50ef2a0e1'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('study', schema=None) as batch_op:
batch_op.add_column(sa.Column('last_access', sa.DateTime(), nullable=True))



# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('study', schema=None) as batch_op:
batch_op.drop_column('last_access')

# ### end Alembic commands ###
4 changes: 4 additions & 0 deletions antarest/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class StorageConfig:
download_default_expiration_timeout_minutes: int = 1440
matrix_gc_sleeping_time: int = 3600
matrix_gc_dry_run: bool = False
auto_archive_threshold_days: int = 60

@staticmethod
def from_dict(data: JSON) -> "StorageConfig":
Expand All @@ -145,6 +146,9 @@ def from_dict(data: JSON) -> "StorageConfig":
),
matrix_gc_sleeping_time=data.get("matrix_gc_sleeping_time", 3600),
matrix_gc_dry_run=data.get("matrix_gc_dry_run", False),
auto_archive_threshold_days=data.get(
"auto_archive_threshold_days", 60
),
)


Expand Down
8 changes: 8 additions & 0 deletions antarest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
MatrixGarbageCollector,
)
from antarest.singleton_services import SingletonServices
from antarest.study.storage.auto_archive_service import AutoArchiveService
from antarest.study.storage.rawstudy.watcher import Watcher
from antarest.tools.admin_lib import clean_locks
from antarest.utils import (
Expand Down Expand Up @@ -261,6 +262,13 @@ def handle_all_exception(request: Request, exc: Exception) -> Any:
matrix_gc = cast(MatrixGarbageCollector, services["matrix_gc"])
matrix_gc.start()

if (
config.server.services
and Module.AUTO_ARCHIVER.value in config.server.services
):
auto_archiver = cast(AutoArchiveService, services["auto_archiver"])
auto_archiver.start()

customize_openapi(application)
return application, services

Expand Down
5 changes: 5 additions & 0 deletions antarest/singleton_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from antarest.core.interfaces.service import IService
from antarest.core.logging.utils import configure_logger
from antarest.core.utils.utils import get_local_path
from antarest.study.storage.auto_archive_service import AutoArchiveService
from antarest.utils import (
Module,
init_db,
Expand Down Expand Up @@ -63,6 +64,10 @@ def _init(
worker = create_archive_worker(config, "test", event_bus=event_bus)
services[Module.ARCHIVE_WORKER] = worker

if Module.AUTO_ARCHIVER in services_list:
auto_archive_service = AutoArchiveService(study_service, config)
services[Module.AUTO_ARCHIVER] = auto_archive_service

return services

def start(self) -> None:
Expand Down
3 changes: 2 additions & 1 deletion antarest/study/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class Study(Base): # type: ignore
author = Column(String(255))
created_at = Column(DateTime)
updated_at = Column(DateTime)
last_access = Column(DateTime)
path = Column(String())
folder = Column(String, nullable=True)
parent_id = Column(
Expand All @@ -116,7 +117,7 @@ class Study(Base): # type: ignore
__mapper_args__ = {"polymorphic_identity": "study", "polymorphic_on": type}

def __str__(self) -> str:
return f"[Study] id={self.id}, type={self.type}, name={self.name}, version={self.version}, updated_at={self.updated_at}, owner={self.owner}, groups={[str(u) + ',' for u in self.groups]}"
return f"[Study] id={self.id}, type={self.type}, name={self.name}, version={self.version}, updated_at={self.updated_at}, last_access={self.last_access}, owner={self.owner}, groups={[str(u) + ',' for u in self.groups]}"

def __eq__(self, other: Any) -> bool:
if not isinstance(other, Study):
Expand Down
17 changes: 17 additions & 0 deletions antarest/study/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1819,6 +1819,7 @@ def delete_link(
)

def archive(self, uuid: str, params: RequestParameters) -> str:
logger.info(f"Archiving study {uuid}")
study = self.get_study(uuid)
assert_permission(params.user, study, StudyPermissionType.DELETE)

Expand Down Expand Up @@ -1981,6 +1982,9 @@ def get_study(self, uuid: str) -> Study:
sanitized,
)
raise StudyNotFoundError(uuid)
# todo debounce this with a "update_study_last_access" method updating only every some seconds
study.last_access = datetime.utcnow()
self.repository.save(study)
return study

def _assert_study_unarchived(
Expand Down Expand Up @@ -2210,6 +2214,19 @@ def check_and_update_all_study_versions_in_database(
storage = self.storage_service.raw_study_service
storage.check_and_update_study_version_in_database(study)

def archive_outputs(
self, study_id: str, params: RequestParameters
) -> None:
logger.info(f"Archiving all outputs for study {study_id}")
study = self.get_study(study_id)
assert_permission(params.user, study, StudyPermissionType.WRITE)
self._assert_study_unarchived(study)
study = self.get_study(study_id)
file_study = self.storage_service.get_storage(study).get_raw(study)
for output in file_study.config.outputs:
if not file_study.config.outputs[output].archived:
self.archive_output(study_id, output, True, params)

def archive_output(
self,
study_id: str,
Expand Down
57 changes: 57 additions & 0 deletions antarest/study/storage/auto_archive_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import datetime
import logging
import time
from typing import List

from antarest.core.config import Config
from antarest.core.interfaces.service import IService
from antarest.core.jwt import DEFAULT_ADMIN_USER
from antarest.core.requests import RequestParameters
from antarest.study.model import Study, RawStudy
from antarest.study.service import StudyService
from antarest.study.storage.utils import is_managed
from antarest.study.storage.variantstudy.model.dbmodel import VariantStudy

logger = logging.getLogger(__name__)


class AutoArchiveService(IService):
def __init__(self, study_service: StudyService, config: Config):
super(AutoArchiveService, self).__init__()
self.study_service = study_service
self.config = config

def _try_archive_studies(self) -> None:
now = datetime.datetime.utcnow()
studies: List[Study] = self.study_service.repository.get_all()
for study in studies:
if is_managed(
study
) and study.updated_at < now - datetime.timedelta(
days=self.config.storage.auto_archive_threshold_days
):
if isinstance(study, RawStudy) and not study.archived:
self.study_service.archive(
study.id,
params=RequestParameters(DEFAULT_ADMIN_USER),
)
elif isinstance(study, VariantStudy):
self.study_service.storage_service.variant_study_service.clear_snapshot(
study
)
self.study_service.archive_outputs(
study.id,
params=RequestParameters(DEFAULT_ADMIN_USER),
)

def _loop(self) -> None:
while True:
try:
self._try_archive_studies()
except Exception as e:
logger.error(
"Unexpected error happened when processing auto archive service loop",
exc_info=e,
)
finally:
time.sleep(2)
5 changes: 5 additions & 0 deletions antarest/study/storage/variantstudy/variant_study_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@ def invalidate_cache(
for child in self.repository.get_children(parent_id=variant_study.id):
self.invalidate_cache(child, invalidate_self_snapshot=True)

def clear_snapshot(self, variant_study: Study) -> None:
logger.info(f"Clearing snapshot for study {variant_study.id}")
self.invalidate_cache(variant_study, invalidate_self_snapshot=True)
shutil.rmtree(self.get_study_path(variant_study), ignore_errors=True)

def has_children(self, study: VariantStudy) -> bool:
return len(self.repository.get_children(parent_id=study.id)) > 0

Expand Down
17 changes: 13 additions & 4 deletions antarest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from antarest.matrixstore.service import MatrixService
from antarest.study.main import build_study_service
from antarest.study.service import StudyService
from antarest.study.storage.auto_archive_service import AutoArchiveService
from antarest.study.storage.rawstudy.watcher import Watcher
from antarest.study.web.watcher_blueprint import create_watcher_routes
from antarest.worker.archive_worker import ArchiveWorker
Expand All @@ -55,6 +56,7 @@ class Module(str, Enum):
WATCHER = "watcher"
MATRIX_GC = "matrix_gc"
ARCHIVE_WORKER = "archive_worker"
AUTO_ARCHIVER = "auto_archiver"


def get_default_config_path_or_raise() -> Path:
Expand Down Expand Up @@ -268,15 +270,14 @@ def create_services(
cache=cache,
)

watcher = create_watcher(
config=config, application=application, study_service=study_service
)

if (
config.server.services
and Module.WATCHER.value in config.server.services
or create_all
):
watcher = create_watcher(
config=config, application=application, study_service=study_service
)
services["watcher"] = watcher

if (
Expand All @@ -292,6 +293,14 @@ def create_services(
)
services["matrix_gc"] = matrix_garbage_collector

if (
config.server.services
and Module.AUTO_ARCHIVER.value in config.server.services
or create_all
):
auto_archiver = AutoArchiveService(study_service, config)
services["auto_archiver"] = auto_archiver

services["event_bus"] = event_bus
services["study"] = study_service
services["launcher"] = launcher
Expand Down
2 changes: 1 addition & 1 deletion scripts/rollback.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ CURDIR=$(cd `dirname $0` && pwd)
BASEDIR=`dirname $CURDIR`

cd $BASEDIR
alembic downgrade c9dcca887456
alembic downgrade 26c50ef2a0e1
cd -
67 changes: 67 additions & 0 deletions tests/storage/business/test_autoarchive_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import datetime
from pathlib import Path
from unittest.mock import Mock, call

from antarest.core.config import Config, StorageConfig, WorkspaceConfig
from antarest.core.jwt import DEFAULT_ADMIN_USER
from antarest.core.requests import RequestParameters
from antarest.study.model import RawStudy, DEFAULT_WORKSPACE_NAME
from antarest.study.storage.auto_archive_service import AutoArchiveService
from antarest.study.storage.variantstudy.model.dbmodel import VariantStudy


def test_auto_archival(tmp_path: Path):
workspace_path = tmp_path / "workspace_test"
auto_archive_service = AutoArchiveService(
Mock(),
Config(
storage=StorageConfig(
workspaces={"test": WorkspaceConfig(path=workspace_path)}
)
),
)

now = datetime.datetime.now()

auto_archive_service.study_service.repository = Mock()
auto_archive_service.study_service.repository.get_all.return_value = [
RawStudy(
id="a",
workspace="not default",
updated_at=now - datetime.timedelta(days=61),
),
RawStudy(
id="b",
workspace=DEFAULT_WORKSPACE_NAME,
updated_at=now - datetime.timedelta(days=59),
),
RawStudy(
id="c",
workspace=DEFAULT_WORKSPACE_NAME,
updated_at=now - datetime.timedelta(days=61),
archived=True,
),
RawStudy(
id="d",
workspace=DEFAULT_WORKSPACE_NAME,
updated_at=now - datetime.timedelta(days=61),
archived=False,
),
VariantStudy(id="e", updated_at=now - datetime.timedelta(days=61)),
]
auto_archive_service.study_service.storage_service = Mock()
auto_archive_service.study_service.storage_service.variant_study_service = (
Mock()
)

auto_archive_service._try_archive_studies()

auto_archive_service.study_service.archive.assert_called_once_with(
"d", params=RequestParameters(DEFAULT_ADMIN_USER)
)
auto_archive_service.study_service.storage_service.variant_study_service.clear_snapshot.assert_called_once_with(
VariantStudy(id="e", updated_at=now - datetime.timedelta(days=61))
)
auto_archive_service.study_service.archive_outputs.assert_called_once_with(
"e", params=RequestParameters(DEFAULT_ADMIN_USER)
)
9 changes: 7 additions & 2 deletions tests/storage/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,10 @@ def test_change_owner() -> None:
user_service.get_user.assert_called_once_with(
2, RequestParameters(JWTUser(id=2, impersonator=2, type="users"))
)
repository.save.assert_called_once_with(RawStudy(id=uuid, owner=bob))
repository.save.assert_called_with(
RawStudy(id=uuid, owner=bob, last_access=ANY)
)
repository.save.assert_called_with(RawStudy(id=uuid, owner=bob))

service._edit_study_using_command.assert_called_once_with(
study=study, url="study/antares/author", data="Bob"
Expand Down Expand Up @@ -921,7 +924,7 @@ def test_set_public_mode() -> None:
JWTUser(id=2, impersonator=2, type="users", groups=[group_admin])
),
)
repository.save.assert_called_once_with(
repository.save.assert_called_with(
Study(id=uuid, public_mode=PublicMode.FULL)
)

Expand Down Expand Up @@ -1113,6 +1116,7 @@ def test_delete_with_prefetch(tmp_path: Path):
groups=[],
public_mode=PublicMode.NONE,
workspace=DEFAULT_WORKSPACE_NAME,
last_access=datetime.utcnow(),
)
study_mock.to_json_summary.return_value = {"id": "my_study", "name": "foo"}

Expand All @@ -1138,6 +1142,7 @@ def test_delete_with_prefetch(tmp_path: Path):
owner=None,
groups=[],
public_mode=PublicMode.NONE,
last_access=datetime.utcnow(),
)
study_mock.to_json_summary.return_value = {"id": "my_study", "name": "foo"}

Expand Down

0 comments on commit 69010dc

Please sign in to comment.