Skip to content

Commit

Permalink
feat(watcher): add new endpoint for optimized scanning (#2193)
Browse files Browse the repository at this point in the history
  • Loading branch information
smailio authored Oct 31, 2024
1 parent bd5ddba commit 392fc14
Show file tree
Hide file tree
Showing 14 changed files with 769 additions and 88 deletions.
20 changes: 19 additions & 1 deletion antarest/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ def __init__(self, message: str) -> None:
super().__init__(HTTPStatus.UNPROCESSABLE_ENTITY, message)


class CannotScanInternalWorkspace(HTTPException):
class CannotAccessInternalWorkspace(HTTPException):
def __init__(self) -> None:
super().__init__(
HTTPStatus.BAD_REQUEST,
Expand All @@ -668,10 +668,28 @@ def __init__(self, message: str) -> None:
super().__init__(HTTPStatus.NOT_FOUND, message)


class WorkspaceNotFound(HTTPException):
"""
This will be raised when we try to load a workspace that does not exist
"""

def __init__(self, message: str) -> None:
super().__init__(HTTPStatus.UNPROCESSABLE_ENTITY, message)


class BadArchiveContent(Exception):
"""
Exception raised when the archive file is corrupted (or unknown).
"""

def __init__(self, message: str = "Unsupported archive format") -> None:
super().__init__(message)


class FolderNotFoundInWorkspace(HTTPException):
"""
This will be raised when we try to load a folder that does not exist
"""

def __init__(self, message: str) -> None:
super().__init__(HTTPStatus.UNPROCESSABLE_ENTITY, message)
13 changes: 13 additions & 0 deletions antarest/service_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
from antarest.study.main import build_study_service
from antarest.study.service import StudyService
from antarest.study.storage.auto_archive_service import AutoArchiveService
from antarest.study.storage.explorer_service import Explorer
from antarest.study.storage.rawstudy.watcher import Watcher
from antarest.study.web.explorer_blueprint import create_explorer_routes
from antarest.study.web.watcher_blueprint import create_watcher_routes
from antarest.worker.archive_worker import ArchiveWorker
from antarest.worker.worker import AbstractWorker
Expand Down Expand Up @@ -187,6 +189,14 @@ def create_watcher(
return watcher


def create_explorer(config: Config, app_ctxt: t.Optional[AppBuildContext]) -> t.Any:
explorer = Explorer(config=config)
if app_ctxt:
app_ctxt.api_root.include_router(create_explorer_routes(config=config, explorer=explorer))

return explorer


def create_matrix_gc(
config: Config,
app_ctxt: t.Optional[AppBuildContext],
Expand Down Expand Up @@ -249,6 +259,9 @@ def create_services(
watcher = create_watcher(config=config, app_ctxt=app_ctxt, study_service=study_service)
services["watcher"] = watcher

explorer_service = create_explorer(config=config, app_ctxt=app_ctxt)
services["explorer"] = explorer_service

if config.server.services and Module.MATRIX_GC.value in config.server.services or create_all:
matrix_garbage_collector = create_matrix_gc(
config=config,
Expand Down
19 changes: 19 additions & 0 deletions antarest/study/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,25 @@ class StudyFolder:
groups: t.List[Group]


class NonStudyFolder(AntaresBaseModel):
"""
DTO used by the explorer to list directories that aren't studies directory, this will be usefull for the front
so the user can navigate in the hierarchy
"""

path: Path
workspace: str
name: str


class WorkspaceMetadata(AntaresBaseModel):
"""
DTO used by the explorer to list all workspaces
"""

name: str


class PatchStudy(AntaresBaseModel):
scenario: t.Optional[str] = None
doc: t.Optional[str] = None
Expand Down
14 changes: 11 additions & 3 deletions antarest/study/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,13 +858,17 @@ def remove_duplicates(self) -> None:
if ids: # Check if ids is not empty
self.repository.delete(*ids)

def sync_studies_on_disk(self, folders: t.List[StudyFolder], directory: t.Optional[Path] = None) -> None:
def sync_studies_on_disk(
self, folders: t.List[StudyFolder], directory: t.Optional[Path] = None, recursive: bool = True
) -> None:
"""
Used by watcher to send list of studies present on filesystem.
Args:
folders: list of studies currently present on folder
directory: directory of studies that will be watched
recursive: if False, the delta will apply only to the studies in "directory", otherwise
it will apply to all studies having a path that descend from "directory".
Returns:
Expand All @@ -873,11 +877,15 @@ def sync_studies_on_disk(self, folders: t.List[StudyFolder], directory: t.Option
clean_up_missing_studies_threshold = now - timedelta(days=MAX_MISSING_STUDY_TIMEOUT)
all_studies = self.repository.get_all_raw()
if directory:
all_studies = [raw_study for raw_study in all_studies if directory in Path(raw_study.path).parents]
if recursive:
all_studies = [raw_study for raw_study in all_studies if directory in Path(raw_study.path).parents]
else:
all_studies = [raw_study for raw_study in all_studies if directory == Path(raw_study.path).parent]
studies_by_path = {study.path: study for study in all_studies}

# delete orphan studies on database
paths = [str(f.path) for f in folders]

for study in all_studies:
if (
isinstance(study, RawStudy)
Expand All @@ -900,7 +908,7 @@ def sync_studies_on_disk(self, folders: t.List[StudyFolder], directory: t.Option
permissions=PermissionInfo.from_study(study),
)
)
elif study.missing < clean_up_missing_studies_threshold:
if study.missing < clean_up_missing_studies_threshold:
logger.info(
"Study %s at %s is not present in disk and will be deleted",
study.id,
Expand Down
60 changes: 60 additions & 0 deletions antarest/study/storage/explorer_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright (c) 2024, RTE (https://www.rte-france.com)
#
# See AUTHORS.txt
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# SPDX-License-Identifier: MPL-2.0
#
# This file is part of the Antares project.

import logging
from typing import List

from antarest.core.config import Config
from antarest.study.model import DEFAULT_WORKSPACE_NAME, NonStudyFolder, WorkspaceMetadata
from antarest.study.storage.utils import (
get_folder_from_workspace,
get_workspace_from_config,
is_study_folder,
should_ignore_folder_for_scan,
)

logger = logging.getLogger(__name__)


class Explorer:
def __init__(self, config: Config):
self.config = config

def list_dir(
self,
workspace_name: str,
workspace_directory_path: str,
) -> List[NonStudyFolder]:
"""
return a list of all directories under workspace_directory_path, that aren't studies.
"""
workspace = get_workspace_from_config(self.config, workspace_name, default_allowed=False)
directory_path = get_folder_from_workspace(workspace, workspace_directory_path)
directories = []
for child in directory_path.iterdir():
if child.is_dir() and not is_study_folder(child) and not should_ignore_folder_for_scan(child):
# we don't want to expose the full absolute path on the server
child_rel_path = child.relative_to(workspace.path)
directories.append(NonStudyFolder(path=child_rel_path, workspace=workspace_name, name=child.name))
return directories

def list_workspaces(
self,
) -> List[WorkspaceMetadata]:
"""
Return the list of all configured workspace name, except the default one.
"""
return [
WorkspaceMetadata(name=workspace_name)
for workspace_name in self.config.storage.workspaces.keys()
if workspace_name != DEFAULT_WORKSPACE_NAME
]
75 changes: 30 additions & 45 deletions antarest/study/storage/rawstudy/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,13 @@
import re
import tempfile
from html import escape
from http import HTTPStatus
from http.client import HTTPException
from pathlib import Path
from time import sleep, time
from typing import List, Optional

from antares.study.version.upgrade_app import is_temporary_upgrade_dir
from filelock import FileLock

from antarest.core.config import Config
from antarest.core.exceptions import CannotScanInternalWorkspace
from antarest.core.interfaces.service import IService
from antarest.core.requests import RequestParameters
from antarest.core.tasks.model import TaskResult, TaskType
Expand All @@ -34,7 +30,11 @@
from antarest.login.model import Group
from antarest.study.model import DEFAULT_WORKSPACE_NAME, StudyFolder
from antarest.study.service import StudyService
from antarest.study.storage.variantstudy.model.command.generate_thermal_cluster_timeseries import is_ts_gen_tmp_dir
from antarest.study.storage.utils import (
get_folder_from_workspace,
get_workspace_from_config,
should_ignore_folder_for_scan,
)

logger = logging.getLogger(__name__)

Expand All @@ -49,11 +49,6 @@ def __call__(self, duration: float) -> None:
logger.info(f"Workspace {self.workspace_name} scanned in {duration}s")


class WorkspaceNotFound(HTTPException):
def __init__(self, message: str) -> None:
super().__init__(HTTPStatus.BAD_REQUEST, message)


class Watcher(IService):
"""
Files Watcher to listen raw studies changes and trigger a database update.
Expand Down Expand Up @@ -128,40 +123,34 @@ def _rec_scan(
groups: List[Group],
filter_in: List[str],
filter_out: List[str],
max_depth: Optional[int] = None,
) -> List[StudyFolder]:
try:
if (path / "AW_NO_SCAN").exists():
logger.info(f"No scan directive file found. Will skip further scan of folder {path}")
return []

if is_temporary_upgrade_dir(path):
logger.info(f"Upgrade temporary folder found. Will skip further scan of folder {path}")
return []

if is_ts_gen_tmp_dir(path):
logger.info(f"TS generation temporary folder found. Will skip further scan of folder {path}")
if should_ignore_folder_for_scan(path):
return []

if (path / "study.antares").exists():
logger.debug(f"Study {path.name} found in {workspace}")
return [StudyFolder(path, workspace, groups)]

if max_depth is not None and max_depth <= 0:
logger.info(f"Scan was configured to not go any deeper, max _depth : {max_depth}")
return []

else:
folders: List[StudyFolder] = list()
if path.is_dir():
for child in path.iterdir():
if max_depth is not None:
max_depth = max_depth - 1
try:
if (
(child.is_dir())
and any([re.search(regex, child.name) for regex in filter_in])
and not any([re.search(regex, child.name) for regex in filter_out])
):
folders = folders + self._rec_scan(
child,
workspace,
groups,
filter_in,
filter_out,
child, workspace, groups, filter_in, filter_out, max_depth
)
except Exception as e:
logger.error(f"Failed to scan dir {child}", exc_info=e)
Expand All @@ -173,6 +162,7 @@ def _rec_scan(
def oneshot_scan(
self,
params: RequestParameters,
recursive: bool,
workspace: Optional[str] = None,
path: Optional[str] = None,
) -> str:
Expand All @@ -183,11 +173,12 @@ def oneshot_scan(
params: user parameters
workspace: workspace to scan
path: relative path to folder to scan
recursive: if true, scan recursively all subfolders otherwise only the first level
"""

# noinspection PyUnusedLocal
def scan_task(notifier: ITaskNotifier) -> TaskResult:
self.scan(workspace, path)
self.scan(recursive, workspace, path)
return TaskResult(success=True, message="Scan completed")

return self.task_service.add_task(
Expand All @@ -202,46 +193,40 @@ def scan_task(notifier: ITaskNotifier) -> TaskResult:

def scan(
self,
recursive: bool = True,
workspace_name: Optional[str] = None,
workspace_directory_path: Optional[str] = None,
) -> None:
"""
Scan recursively list of studies present on disk. Send updated list to study service.
Args:
recursive: if true, scan recursively all subfolders otherwise only the first level
Returns:
"""
stopwatch = StopWatch()
studies: List[StudyFolder] = list()
directory_path: Optional[Path] = None

# max depth when we call _rec_scan
max_depth = None if recursive else 1

if workspace_directory_path is not None and workspace_name:
if workspace_name == DEFAULT_WORKSPACE_NAME:
raise CannotScanInternalWorkspace
try:
workspace = self.config.storage.workspaces[workspace_name]
except KeyError:
logger.error(f"Workspace {workspace_name} not found")
raise WorkspaceNotFound(f"Workspace {workspace_name} not found")
workspace = get_workspace_from_config(self.config, workspace_name)
directory_path = get_folder_from_workspace(workspace, workspace_directory_path)

groups = [Group(id=escape(g), name=escape(g)) for g in workspace.groups]
directory_path = workspace.path / workspace_directory_path
studies = self._rec_scan(
directory_path,
workspace_name,
groups,
workspace.filter_in,
workspace.filter_out,
directory_path, workspace_name, groups, workspace.filter_in, workspace.filter_out, max_depth=max_depth
)
elif workspace_directory_path is None and workspace_name is None:
for name, workspace in self.config.storage.workspaces.items():
if name != DEFAULT_WORKSPACE_NAME:
path = Path(workspace.path)
groups = [Group(id=escape(g), name=escape(g)) for g in workspace.groups]
studies = studies + self._rec_scan(
path,
name,
groups,
workspace.filter_in,
workspace.filter_out,
path, name, groups, workspace.filter_in, workspace.filter_out, max_depth=max_depth
)
stopwatch.log_elapsed(_LogScanDuration(name))
else:
Expand All @@ -250,7 +235,7 @@ def scan(
logger.info(f"Waiting for FileLock to synchronize {directory_path or 'all studies'}")
with FileLock(Watcher.SCAN_LOCK):
logger.info(f"FileLock acquired to synchronize for {directory_path or 'all studies'}")
self.study_service.sync_studies_on_disk(studies, directory_path)
self.study_service.sync_studies_on_disk(studies, directory_path, recursive)
stopwatch.log_elapsed(
lambda x: logger.info(f"{directory_path or 'All studies'} synchronized in {x}s"),
since_start=True,
Expand Down
Loading

0 comments on commit 392fc14

Please sign in to comment.