Skip to content

Commit

Permalink
Allow slurm launcher to retrieve results after server restart (#754)
Browse files Browse the repository at this point in the history
  • Loading branch information
pl-buiquang authored Mar 1, 2022
1 parent 3eb2ff8 commit 300e3b2
Show file tree
Hide file tree
Showing 11 changed files with 255 additions and 29 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ resources/templates/index.html
resources/commit_id
/examples/studies/

watcher.lock
watcher
matrix_constant_init.lock
*.lock

**/bucket
3 changes: 2 additions & 1 deletion antarest/launcher/adapters/abstractlauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
)
from antarest.core.model import JSON
from antarest.core.requests import RequestParameters
from antarest.launcher.model import JobStatus, LogType
from antarest.launcher.model import JobStatus, LogType, JobResult
from antarest.study.service import StudyService


Expand All @@ -29,6 +29,7 @@ class LauncherCallbacks(NamedTuple):
after_export_flat: Callable[[str, str, Path, Optional[JSON]], None]
append_before_log: Callable[[str, str], None]
append_after_log: Callable[[str, str], None]
get_job_result: Callable[[str], Optional[JobResult]]


class AbstractLauncher(ABC):
Expand Down
6 changes: 5 additions & 1 deletion antarest/launcher/adapters/factory_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ def build_launcher(
)
if config.launcher.slurm is not None:
dict_launchers["slurm"] = SlurmLauncher(
config, storage_service, callbacks, event_bus
config,
storage_service,
callbacks,
event_bus,
retrieve_existing_jobs=True,
)
return dict_launchers
110 changes: 93 additions & 17 deletions antarest/launcher/adapters/slurm_launcher/slurm_launcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import argparse
import logging
import os
import re
import shutil
import tempfile
import threading
Expand All @@ -10,6 +11,8 @@
from typing import Callable, Optional, Dict, Awaitable, List
from uuid import UUID, uuid4

from filelock import FileLock

from antareslauncher.data_repo.data_repo_tinydb import DataRepoTinydb
from antareslauncher.main import MainParameters, run_with
from antareslauncher.main_option_parser import (
Expand Down Expand Up @@ -43,6 +46,11 @@
MAX_NB_CPU = 24
MAX_TIME_LIMIT = 604800
MIN_TIME_LIMIT = 3600
WORKSPACE_LOCK_FILE_NAME = ".lock"
LOCK_FILE_NAME = "slurm_launcher_init.lock"
LOG_DIR_NAME = "LOGS"
STUDIES_INPUT_DIR_NAME = "STUDIES_IN"
STUDIES_OUTPUT_DIR_NAME = "OUTPUT"


class VersionNotSupportedError(Exception):
Expand All @@ -61,6 +69,7 @@ def __init__(
callbacks: LauncherCallbacks,
event_bus: IEventBus,
use_private_workspace: bool = True,
retrieve_existing_jobs: bool = False,
) -> None:
super().__init__(config, study_service, callbacks, event_bus)
if config.launcher.slurm is None:
Expand All @@ -76,11 +85,8 @@ def __init__(
self.job_id_to_study_id: Dict[str, str] = {}
self._check_config()
self.antares_launcher_lock = threading.Lock()
self.local_workspace = (
Path(tempfile.mkdtemp(dir=str(self.slurm_config.local_workspace)))
if use_private_workspace
else Path(self.slurm_config.local_workspace)
)
with FileLock(LOCK_FILE_NAME):
self.local_workspace = self._init_workspace(use_private_workspace)

self.log_tail_manager = LogTailManager(
Path(self.slurm_config.local_workspace)
Expand All @@ -98,13 +104,60 @@ def __init__(
),
db_primary_key=self.launcher_params.db_primary_key,
)
if retrieve_existing_jobs:
self._retrieve_running_jobs()

def _check_config(self) -> None:
assert (
self.slurm_config.local_workspace.exists()
and self.slurm_config.local_workspace.is_dir()
) # and check write permission

def _init_workspace(self, use_private_workspace: bool) -> Path:
if use_private_workspace:
for (
existing_workspace
) in self.slurm_config.local_workspace.iterdir():
lock_file = existing_workspace / WORKSPACE_LOCK_FILE_NAME
if (
existing_workspace.is_dir()
and existing_workspace
!= self.slurm_config.local_workspace / LOG_DIR_NAME
and not lock_file.exists()
):
logger.info(
f"Initiating slurm workspace into existing directory {existing_workspace}"
)
lock_file.touch()
return existing_workspace
new_workspace = Path(
tempfile.mkdtemp(dir=str(self.slurm_config.local_workspace))
)
lock_file = new_workspace / WORKSPACE_LOCK_FILE_NAME
lock_file.touch()
logger.info(
f"Initiating slurm workspace in new directory {new_workspace}"
)
return new_workspace
else:
return Path(self.slurm_config.local_workspace)

def _retrieve_running_jobs(self) -> None:
with db():
for study in self.data_repo_tinydb.get_list_of_studies():
job_result = self.callbacks.get_job_result(study.name)
if job_result:
logger.info(
f"Adding job/study mapping for launch id {study.name}"
)
self.job_id_to_study_id[study.name] = job_result.study_id
else:
logger.warning(
f"Failed to retrieve job result for job launch {study.name}"
)
if len(self.job_id_to_study_id) > 0:
self.start()

def _loop(self) -> None:
while self.check_state:
self._check_studies_state()
Expand All @@ -129,14 +182,16 @@ def _init_launcher_arguments(
studies_in_dir=str(
(
Path(local_workspace or self.slurm_config.local_workspace)
/ "STUDIES_IN"
/ STUDIES_INPUT_DIR_NAME
)
),
log_dir=str((Path(self.slurm_config.local_workspace) / "LOGS")),
log_dir=str(
(Path(self.slurm_config.local_workspace) / LOG_DIR_NAME)
),
finished_dir=str(
(
Path(local_workspace or self.slurm_config.local_workspace)
/ "OUTPUT"
/ STUDIES_OUTPUT_DIR_NAME
)
),
ssh_config_file_is_required=False,
Expand Down Expand Up @@ -178,8 +233,8 @@ def _init_launcher_parameters(
)
return main_parameters

def _delete_study(self, study_path: Path) -> None:
logger.info(f"Deleting study export at {study_path}")
def _delete_workspace_file(self, study_path: Path) -> None:
logger.info(f"Deleting workspace file at {study_path}")
if self.local_workspace.absolute() in study_path.absolute().parents:
if study_path.exists():
shutil.rmtree(study_path)
Expand Down Expand Up @@ -210,23 +265,29 @@ def _import_study_output(
]
return self.storage_service.import_output(
study_id,
self.local_workspace / "OUTPUT" / job_id / "output",
self.local_workspace / STUDIES_OUTPUT_DIR_NAME / job_id / "output",
params=RequestParameters(DEFAULT_ADMIN_USER),
additional_logs=launcher_logs,
)

def _import_xpansion_result(self, job_id: str, xpansion_mode: str) -> None:
output_path = self.local_workspace / "OUTPUT" / job_id / "output"
output_path = (
self.local_workspace / STUDIES_OUTPUT_DIR_NAME / job_id / "output"
)
if output_path.exists() and len(os.listdir(output_path)) == 1:
output_path = output_path / os.listdir(output_path)[0]
shutil.copytree(
self.local_workspace / "OUTPUT" / job_id / "input" / "links",
self.local_workspace
/ STUDIES_OUTPUT_DIR_NAME
/ job_id
/ "input"
/ "links",
output_path / "updated_links",
)
if xpansion_mode == "r":
shutil.copytree(
self.local_workspace
/ "OUTPUT"
/ STUDIES_OUTPUT_DIR_NAME
/ job_id
/ "user"
/ "expansion",
Expand All @@ -252,7 +313,9 @@ def _check_studies_state(self) -> None:

for study in study_list:
if study.name not in self.job_id_to_study_id:
# this job is handled by another worker process
logger.warning(
f"Antares launcher job {study.name} not found in local job list !"
)
continue

all_done = all_done and (study.finished or study.with_error)
Expand Down Expand Up @@ -353,7 +416,20 @@ def _assert_study_version_is_supported(
def _clean_up_study(self, launch_id: str) -> None:
logger.info(f"Cleaning up study with launch_id {launch_id}")
self.data_repo_tinydb.remove_study(launch_id)
self._delete_study(self.local_workspace / "OUTPUT" / launch_id)
self._delete_workspace_file(
self.local_workspace / STUDIES_OUTPUT_DIR_NAME / launch_id
)
self._delete_workspace_file(
self.local_workspace / STUDIES_INPUT_DIR_NAME / launch_id
)
if (self.local_workspace / STUDIES_OUTPUT_DIR_NAME).exists():
for finished_zip in (
self.local_workspace / STUDIES_OUTPUT_DIR_NAME
).iterdir():
if finished_zip.is_file() and re.match(
f"finished_{launch_id}_\\d+", finished_zip.name
):
self._delete_workspace_file(finished_zip)
del self.job_id_to_study_id[launch_id]

def _run_study(
Expand Down Expand Up @@ -419,7 +495,7 @@ def _run_study(
if not self.thread:
self.start()

self._delete_study(study_path)
self._delete_workspace_file(study_path)

def _check_and_apply_launcher_params(
self, launcher_params: Optional[JSON]
Expand Down
3 changes: 3 additions & 0 deletions antarest/launcher/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ def __init__(
append_after_log=lambda jobid, message: self.append_log(
jobid, message, JobLogType.AFTER
),
get_job_result=lambda jobid: self.job_result_repository.get(
jobid
),
),
event_bus,
)
Expand Down
2 changes: 2 additions & 0 deletions antarest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from antarest.study.main import build_study_service
from antarest.study.service import StudyService
from antarest.study.storage.rawstudy.watcher import Watcher
from antarest.tools.admin_lib import clean_locks

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -471,6 +472,7 @@ def handle_all_exception(request: Request, exc: Exception) -> Any:
sys.exit()
else:
if module == Module.APP:
clean_locks(config_file)
app, _ = fastapi_app(
config_file,
mount_front=not no_front,
Expand Down
32 changes: 32 additions & 0 deletions antarest/tools/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import logging
from pathlib import Path

import click

from antarest.tools.admin_lib import clean_locks as do_clean_locks

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@click.group()
def commands() -> None:
pass


@commands.command()
@click.option(
"--config",
"-c",
nargs=1,
required=True,
type=click.Path(exists=True),
help="Application config",
)
def clean_locks(config: str) -> None:
"""Clean app locks"""
do_clean_locks(Path(config))


if __name__ == "__main__":
commands()
30 changes: 30 additions & 0 deletions antarest/tools/admin_lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import logging
from pathlib import Path

from antarest.core.config import Config
from antarest.core.utils.utils import get_local_path
from antarest.launcher.adapters.slurm_launcher.slurm_launcher import (
WORKSPACE_LOCK_FILE_NAME,
)

logger = logging.getLogger(__name__)


def clean_locks_from_config(config: Config) -> None:
if config.launcher.slurm:
slurm_workspace = config.launcher.slurm.local_workspace
if slurm_workspace.exists() and slurm_workspace.is_dir():
for workspace in slurm_workspace.iterdir():
lock_file = workspace / WORKSPACE_LOCK_FILE_NAME
if lock_file.exists():
logger.info(
f"Removing slurm workspace lock file {lock_file}"
)
lock_file.unlink()


def clean_locks(config: Path) -> None:
"""Clean app locks"""
res = get_local_path() / "resources"
config_obj = Config.from_yaml_file(res=res, file=config)
clean_locks_from_config(config_obj)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ line-length = 79
exclude = "(antares-?launcher/*|alembic/*)"

[tool.coverage.run]
omit = ["antarest/tools/cli.py"]
omit = ["antarest/tools/cli.py", "antarest/tools/admin.py"]
5 changes: 4 additions & 1 deletion scripts/pre-start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ BASEDIR=`dirname $CURDIR`

cd $BASEDIR
alembic upgrade head
cd -
cd -

export PYTHONPATH=$BASEDIR
python3 $BASEDIR/antarest/tools/admin.py clean-locks -c $ANTAREST_CONF
Loading

0 comments on commit 300e3b2

Please sign in to comment.