diff --git a/.gitignore b/.gitignore index 33fe12bcb4..ae0f738dfc 100644 --- a/.gitignore +++ b/.gitignore @@ -151,8 +151,7 @@ resources/templates/index.html resources/commit_id /examples/studies/ -watcher.lock watcher -matrix_constant_init.lock +*.lock **/bucket \ No newline at end of file diff --git a/antarest/launcher/adapters/abstractlauncher.py b/antarest/launcher/adapters/abstractlauncher.py index 31cd3b391c..b02150a621 100644 --- a/antarest/launcher/adapters/abstractlauncher.py +++ b/antarest/launcher/adapters/abstractlauncher.py @@ -12,7 +12,7 @@ ) from antarest.core.model import JSON from antarest.core.requests import RequestParameters -from antarest.launcher.model import JobStatus, LogType +from antarest.launcher.model import JobStatus, LogType, JobResult from antarest.study.service import StudyService @@ -29,6 +29,7 @@ class LauncherCallbacks(NamedTuple): after_export_flat: Callable[[str, str, Path, Optional[JSON]], None] append_before_log: Callable[[str, str], None] append_after_log: Callable[[str, str], None] + get_job_result: Callable[[str], Optional[JobResult]] class AbstractLauncher(ABC): diff --git a/antarest/launcher/adapters/factory_launcher.py b/antarest/launcher/adapters/factory_launcher.py index f3d539fd60..a7d089b63a 100644 --- a/antarest/launcher/adapters/factory_launcher.py +++ b/antarest/launcher/adapters/factory_launcher.py @@ -33,6 +33,10 @@ def build_launcher( ) if config.launcher.slurm is not None: dict_launchers["slurm"] = SlurmLauncher( - config, storage_service, callbacks, event_bus + config, + storage_service, + callbacks, + event_bus, + retrieve_existing_jobs=True, ) return dict_launchers diff --git a/antarest/launcher/adapters/slurm_launcher/slurm_launcher.py b/antarest/launcher/adapters/slurm_launcher/slurm_launcher.py index 2fb0960b2a..52d16f9f20 100644 --- a/antarest/launcher/adapters/slurm_launcher/slurm_launcher.py +++ b/antarest/launcher/adapters/slurm_launcher/slurm_launcher.py @@ -1,6 +1,7 @@ import argparse import logging import os +import re import shutil import tempfile import threading @@ -10,6 +11,8 @@ from typing import Callable, Optional, Dict, Awaitable, List from uuid import UUID, uuid4 +from filelock import FileLock + from antareslauncher.data_repo.data_repo_tinydb import DataRepoTinydb from antareslauncher.main import MainParameters, run_with from antareslauncher.main_option_parser import ( @@ -43,6 +46,11 @@ MAX_NB_CPU = 24 MAX_TIME_LIMIT = 604800 MIN_TIME_LIMIT = 3600 +WORKSPACE_LOCK_FILE_NAME = ".lock" +LOCK_FILE_NAME = "slurm_launcher_init.lock" +LOG_DIR_NAME = "LOGS" +STUDIES_INPUT_DIR_NAME = "STUDIES_IN" +STUDIES_OUTPUT_DIR_NAME = "OUTPUT" class VersionNotSupportedError(Exception): @@ -61,6 +69,7 @@ def __init__( callbacks: LauncherCallbacks, event_bus: IEventBus, use_private_workspace: bool = True, + retrieve_existing_jobs: bool = False, ) -> None: super().__init__(config, study_service, callbacks, event_bus) if config.launcher.slurm is None: @@ -76,11 +85,8 @@ def __init__( self.job_id_to_study_id: Dict[str, str] = {} self._check_config() self.antares_launcher_lock = threading.Lock() - self.local_workspace = ( - Path(tempfile.mkdtemp(dir=str(self.slurm_config.local_workspace))) - if use_private_workspace - else Path(self.slurm_config.local_workspace) - ) + with FileLock(LOCK_FILE_NAME): + self.local_workspace = self._init_workspace(use_private_workspace) self.log_tail_manager = LogTailManager( Path(self.slurm_config.local_workspace) @@ -98,6 +104,8 @@ def __init__( ), db_primary_key=self.launcher_params.db_primary_key, ) + if retrieve_existing_jobs: + self._retrieve_running_jobs() def _check_config(self) -> None: assert ( @@ -105,6 +113,51 @@ def _check_config(self) -> None: and self.slurm_config.local_workspace.is_dir() ) # and check write permission + def _init_workspace(self, use_private_workspace: bool) -> Path: + if use_private_workspace: + for ( + existing_workspace + ) in self.slurm_config.local_workspace.iterdir(): + lock_file = existing_workspace / WORKSPACE_LOCK_FILE_NAME + if ( + existing_workspace.is_dir() + and existing_workspace + != self.slurm_config.local_workspace / LOG_DIR_NAME + and not lock_file.exists() + ): + logger.info( + f"Initiating slurm workspace into existing directory {existing_workspace}" + ) + lock_file.touch() + return existing_workspace + new_workspace = Path( + tempfile.mkdtemp(dir=str(self.slurm_config.local_workspace)) + ) + lock_file = new_workspace / WORKSPACE_LOCK_FILE_NAME + lock_file.touch() + logger.info( + f"Initiating slurm workspace in new directory {new_workspace}" + ) + return new_workspace + else: + return Path(self.slurm_config.local_workspace) + + def _retrieve_running_jobs(self) -> None: + with db(): + for study in self.data_repo_tinydb.get_list_of_studies(): + job_result = self.callbacks.get_job_result(study.name) + if job_result: + logger.info( + f"Adding job/study mapping for launch id {study.name}" + ) + self.job_id_to_study_id[study.name] = job_result.study_id + else: + logger.warning( + f"Failed to retrieve job result for job launch {study.name}" + ) + if len(self.job_id_to_study_id) > 0: + self.start() + def _loop(self) -> None: while self.check_state: self._check_studies_state() @@ -129,14 +182,16 @@ def _init_launcher_arguments( studies_in_dir=str( ( Path(local_workspace or self.slurm_config.local_workspace) - / "STUDIES_IN" + / STUDIES_INPUT_DIR_NAME ) ), - log_dir=str((Path(self.slurm_config.local_workspace) / "LOGS")), + log_dir=str( + (Path(self.slurm_config.local_workspace) / LOG_DIR_NAME) + ), finished_dir=str( ( Path(local_workspace or self.slurm_config.local_workspace) - / "OUTPUT" + / STUDIES_OUTPUT_DIR_NAME ) ), ssh_config_file_is_required=False, @@ -178,8 +233,8 @@ def _init_launcher_parameters( ) return main_parameters - def _delete_study(self, study_path: Path) -> None: - logger.info(f"Deleting study export at {study_path}") + def _delete_workspace_file(self, study_path: Path) -> None: + logger.info(f"Deleting workspace file at {study_path}") if self.local_workspace.absolute() in study_path.absolute().parents: if study_path.exists(): shutil.rmtree(study_path) @@ -210,23 +265,29 @@ def _import_study_output( ] return self.storage_service.import_output( study_id, - self.local_workspace / "OUTPUT" / job_id / "output", + self.local_workspace / STUDIES_OUTPUT_DIR_NAME / job_id / "output", params=RequestParameters(DEFAULT_ADMIN_USER), additional_logs=launcher_logs, ) def _import_xpansion_result(self, job_id: str, xpansion_mode: str) -> None: - output_path = self.local_workspace / "OUTPUT" / job_id / "output" + output_path = ( + self.local_workspace / STUDIES_OUTPUT_DIR_NAME / job_id / "output" + ) if output_path.exists() and len(os.listdir(output_path)) == 1: output_path = output_path / os.listdir(output_path)[0] shutil.copytree( - self.local_workspace / "OUTPUT" / job_id / "input" / "links", + self.local_workspace + / STUDIES_OUTPUT_DIR_NAME + / job_id + / "input" + / "links", output_path / "updated_links", ) if xpansion_mode == "r": shutil.copytree( self.local_workspace - / "OUTPUT" + / STUDIES_OUTPUT_DIR_NAME / job_id / "user" / "expansion", @@ -252,7 +313,9 @@ def _check_studies_state(self) -> None: for study in study_list: if study.name not in self.job_id_to_study_id: - # this job is handled by another worker process + logger.warning( + f"Antares launcher job {study.name} not found in local job list !" + ) continue all_done = all_done and (study.finished or study.with_error) @@ -353,7 +416,20 @@ def _assert_study_version_is_supported( def _clean_up_study(self, launch_id: str) -> None: logger.info(f"Cleaning up study with launch_id {launch_id}") self.data_repo_tinydb.remove_study(launch_id) - self._delete_study(self.local_workspace / "OUTPUT" / launch_id) + self._delete_workspace_file( + self.local_workspace / STUDIES_OUTPUT_DIR_NAME / launch_id + ) + self._delete_workspace_file( + self.local_workspace / STUDIES_INPUT_DIR_NAME / launch_id + ) + if (self.local_workspace / STUDIES_OUTPUT_DIR_NAME).exists(): + for finished_zip in ( + self.local_workspace / STUDIES_OUTPUT_DIR_NAME + ).iterdir(): + if finished_zip.is_file() and re.match( + f"finished_{launch_id}_\\d+", finished_zip.name + ): + self._delete_workspace_file(finished_zip) del self.job_id_to_study_id[launch_id] def _run_study( @@ -419,7 +495,7 @@ def _run_study( if not self.thread: self.start() - self._delete_study(study_path) + self._delete_workspace_file(study_path) def _check_and_apply_launcher_params( self, launcher_params: Optional[JSON] diff --git a/antarest/launcher/service.py b/antarest/launcher/service.py index f024a9607d..4ead0dfdc1 100644 --- a/antarest/launcher/service.py +++ b/antarest/launcher/service.py @@ -90,6 +90,9 @@ def __init__( append_after_log=lambda jobid, message: self.append_log( jobid, message, JobLogType.AFTER ), + get_job_result=lambda jobid: self.job_result_repository.get( + jobid + ), ), event_bus, ) diff --git a/antarest/main.py b/antarest/main.py index 736cbba0d0..6a09cb611d 100644 --- a/antarest/main.py +++ b/antarest/main.py @@ -54,6 +54,7 @@ from antarest.study.main import build_study_service from antarest.study.service import StudyService from antarest.study.storage.rawstudy.watcher import Watcher +from antarest.tools.admin_lib import clean_locks logger = logging.getLogger(__name__) @@ -471,6 +472,7 @@ def handle_all_exception(request: Request, exc: Exception) -> Any: sys.exit() else: if module == Module.APP: + clean_locks(config_file) app, _ = fastapi_app( config_file, mount_front=not no_front, diff --git a/antarest/tools/admin.py b/antarest/tools/admin.py new file mode 100644 index 0000000000..679ee078d7 --- /dev/null +++ b/antarest/tools/admin.py @@ -0,0 +1,32 @@ +import logging +from pathlib import Path + +import click + +from antarest.tools.admin_lib import clean_locks as do_clean_locks + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +@click.group() +def commands() -> None: + pass + + +@commands.command() +@click.option( + "--config", + "-c", + nargs=1, + required=True, + type=click.Path(exists=True), + help="Application config", +) +def clean_locks(config: str) -> None: + """Clean app locks""" + do_clean_locks(Path(config)) + + +if __name__ == "__main__": + commands() diff --git a/antarest/tools/admin_lib.py b/antarest/tools/admin_lib.py new file mode 100644 index 0000000000..d410d15b5e --- /dev/null +++ b/antarest/tools/admin_lib.py @@ -0,0 +1,30 @@ +import logging +from pathlib import Path + +from antarest.core.config import Config +from antarest.core.utils.utils import get_local_path +from antarest.launcher.adapters.slurm_launcher.slurm_launcher import ( + WORKSPACE_LOCK_FILE_NAME, +) + +logger = logging.getLogger(__name__) + + +def clean_locks_from_config(config: Config) -> None: + if config.launcher.slurm: + slurm_workspace = config.launcher.slurm.local_workspace + if slurm_workspace.exists() and slurm_workspace.is_dir(): + for workspace in slurm_workspace.iterdir(): + lock_file = workspace / WORKSPACE_LOCK_FILE_NAME + if lock_file.exists(): + logger.info( + f"Removing slurm workspace lock file {lock_file}" + ) + lock_file.unlink() + + +def clean_locks(config: Path) -> None: + """Clean app locks""" + res = get_local_path() / "resources" + config_obj = Config.from_yaml_file(res=res, file=config) + clean_locks_from_config(config_obj) diff --git a/pyproject.toml b/pyproject.toml index 4b938bb952..1026826fce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,4 +4,4 @@ line-length = 79 exclude = "(antares-?launcher/*|alembic/*)" [tool.coverage.run] -omit = ["antarest/tools/cli.py"] \ No newline at end of file +omit = ["antarest/tools/cli.py", "antarest/tools/admin.py"] \ No newline at end of file diff --git a/scripts/pre-start.sh b/scripts/pre-start.sh index eb894630a5..e2ff85b128 100755 --- a/scripts/pre-start.sh +++ b/scripts/pre-start.sh @@ -7,4 +7,7 @@ BASEDIR=`dirname $CURDIR` cd $BASEDIR alembic upgrade head -cd - \ No newline at end of file +cd - + +export PYTHONPATH=$BASEDIR +python3 $BASEDIR/antarest/tools/admin.py clean-locks -c $ANTAREST_CONF \ No newline at end of file diff --git a/tests/launcher/test_slurm_launcher.py b/tests/launcher/test_slurm_launcher.py index 9230097244..bdec8050ae 100644 --- a/tests/launcher/test_slurm_launcher.py +++ b/tests/launcher/test_slurm_launcher.py @@ -10,14 +10,19 @@ from antareslauncher.data_repo.data_repo_tinydb import DataRepoTinydb from antareslauncher.main import MainParameters +from antareslauncher.study_dto import StudyDTO from antarest.core.config import Config, LauncherConfig, SlurmConfig from antarest.core.persistence import Base from antarest.core.utils.fastapi_sqlalchemy import DBSessionMiddleware +from antarest.launcher.adapters.abstractlauncher import LauncherCallbacks from antarest.launcher.adapters.slurm_launcher.slurm_launcher import ( SlurmLauncher, + WORKSPACE_LOCK_FILE_NAME, + LOG_DIR_NAME, ) -from antarest.launcher.model import JobStatus +from antarest.launcher.model import JobStatus, JobResult from antarest.study.model import StudyMetadataDTO, RawStudy +from antarest.tools.admin_lib import clean_locks_from_config @pytest.fixture @@ -155,7 +160,7 @@ def test_slurm_launcher_delete_function(tmp_path: str): directory_path.mkdir() (directory_path / "file.txt").touch() - slurm_launcher._delete_study(directory_path) + slurm_launcher._delete_workspace_file(directory_path) assert not directory_path.exists() @@ -247,7 +252,7 @@ def test_run_study( slurm_launcher.launcher_args = argument slurm_launcher._clean_local_workspace = Mock() slurm_launcher.start = Mock() - slurm_launcher._delete_study = Mock() + slurm_launcher._delete_workspace_file = Mock() slurm_launcher._run_study( study_uuid, str(uuid.uuid4()), None, params=params @@ -260,7 +265,7 @@ def test_run_study( ) slurm_launcher.start.assert_called_once() if job_status == JobStatus.RUNNING: - slurm_launcher._delete_study.assert_called_once() + slurm_launcher._delete_workspace_file.assert_called_once() @pytest.mark.unit_test @@ -281,7 +286,7 @@ def test_check_state(tmp_path: Path, launcher_config: Config): event_bus=Mock(), ) slurm_launcher._import_study_output = Mock() - slurm_launcher._delete_study = Mock() + slurm_launcher._delete_workspace_file = Mock() slurm_launcher.stop = Mock() study1 = Mock() @@ -310,7 +315,7 @@ def test_check_state(tmp_path: Path, launcher_config: Config): assert slurm_launcher.callbacks.update_status.call_count == 2 assert slurm_launcher._import_study_output.call_count == 1 - assert slurm_launcher._delete_study.call_count == 2 + assert slurm_launcher._delete_workspace_file.call_count == 4 assert data_repo_tinydb.remove_study.call_count == 2 slurm_launcher.stop.assert_called_once() @@ -470,3 +475,74 @@ def test_kill_job( run_with_mock.assert_called_with( launcher_arguments, launcher_parameters, show_banner=False ) + + +@patch("antarest.launcher.adapters.slurm_launcher.slurm_launcher.run_with") +def test_launcher_workspace_init( + run_with_mock, tmp_path: Path, launcher_config: Config +): + engine = create_engine("sqlite:///:memory:", echo=True) + Base.metadata.create_all(engine) + DBSessionMiddleware( + Mock(), + custom_engine=engine, + session_args={"autocommit": False, "autoflush": False}, + ) + + callbacks = Mock(get_job_result=lambda x: JobResult(study_id="study_id")) + (tmp_path / LOG_DIR_NAME).mkdir() + + slurm_launcher = SlurmLauncher( + config=launcher_config, + study_service=Mock(), + callbacks=callbacks, + event_bus=Mock(), + retrieve_existing_jobs=True, + ) + workspaces = list( + filter(lambda x: x.name != LOG_DIR_NAME, tmp_path.iterdir()) + ) + assert len(workspaces) == 1 + assert (workspaces[0] / WORKSPACE_LOCK_FILE_NAME).exists() + + clean_locks_from_config(launcher_config) + assert not (workspaces[0] / WORKSPACE_LOCK_FILE_NAME).exists() + + slurm_launcher.data_repo_tinydb.save_study( + StudyDTO( + path="somepath", + ) + ) + run_with_mock.assert_not_called() + + # will use existing private workspace + slurm_launcher = SlurmLauncher( + config=launcher_config, + study_service=Mock(), + callbacks=callbacks, + event_bus=Mock(), + retrieve_existing_jobs=True, + ) + assert ( + len(list(filter(lambda x: x.name != LOG_DIR_NAME, tmp_path.iterdir()))) + == 1 + ) + assert len(slurm_launcher.job_id_to_study_id) == 1 + assert slurm_launcher.job_id_to_study_id["somepath"] == "study_id" + run_with_mock.assert_called() + + run_with_mock.reset_mock() + # will create a new one since there is a lock on previous one + slurm_launcher = SlurmLauncher( + config=launcher_config, + study_service=Mock(), + callbacks=callbacks, + event_bus=Mock(), + retrieve_existing_jobs=True, + ) + assert ( + len(list(filter(lambda x: x.name != LOG_DIR_NAME, tmp_path.iterdir()))) + == 2 + ) + assert len(slurm_launcher.job_id_to_study_id) == 0 + run_with_mock.assert_not_called()