Skip to content

Commit

Permalink
feat(retrival): correct the retrival of remote files and improve exce…
Browse files Browse the repository at this point in the history
…ption handling to avoid infinite loops
  • Loading branch information
laurent-laporte-pro committed Sep 22, 2023
1 parent 4ff6abf commit 88efc98
Show file tree
Hide file tree
Showing 15 changed files with 754 additions and 743 deletions.
70 changes: 27 additions & 43 deletions antareslauncher/use_cases/retrieve/clean_remote_server.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
import copy
from pathlib import Path

from antareslauncher.display.display_terminal import DisplayTerminal
from antareslauncher.remote_environnement.remote_environment_with_slurm import (
RemoteEnvironmentWithSlurm,
)
from antareslauncher.study_dto import StudyDTO


class RemoteServerNotCleanException(Exception):
pass
LOG_NAME = f"{__name__}.RemoteServerCleaner"


class RemoteServerCleaner:
Expand All @@ -20,42 +15,31 @@ def __init__(
):
self._display = display
self._env = env
self._current_study: StudyDTO = None

def clean(self, study: StudyDTO):
self._current_study = copy.copy(study)
if self._should_clean_remote_server():
self._do_clean_remote_server()
return self._current_study

def _should_clean_remote_server(self):
return (
self._current_study.remote_server_is_clean is False
) and self._final_zip_downloaded()

def _final_zip_downloaded(self) -> bool:
if isinstance(self._current_study.local_final_zipfile_path, str):
return bool(self._current_study.local_final_zipfile_path)
else:
return False

def _do_clean_remote_server(self):
success = self._env.clean_remote_server(copy.copy(self._current_study))
if success is True:
self._current_study.remote_server_is_clean = success
self._display_success_message()
else:
self._display_failure_error()
raise RemoteServerNotCleanException

def _display_failure_error(self):
self._display.show_error(
f'"{Path(self._current_study.path).name}": Clean remote server failed',
__name__ + "." + __class__.__name__,
)

def _display_success_message(self):
self._display.show_message(
f'"{Path(self._current_study.path).name}": Clean remote server finished',
__name__ + "." + __class__.__name__,
)
if not study.remote_server_is_clean and study.local_final_zipfile_path:
# If the cleanup procedure fails to remove remote files or
# delete the final ZIP, there's no need to raise an exception.
# Instead, it's sufficient to issue a warning to alert the user.
try:
removed = self._env.clean_remote_server(study)
except Exception as exc:
self._display.show_error(
f'"{study.name}": Clean remote server raised: {exc}',
LOG_NAME,
)
else:
if removed:
self._display.show_message(
f'"{study.name}": Clean remote server finished',
LOG_NAME,
)
else:
self._display.show_error(
f'"{study.name}": Clean remote server failed',
LOG_NAME,
)

# However, in such cases, it's advisable to indicate that the cleanup
# was successful to prevent an infinite loop.
study.remote_server_is_clean = True
75 changes: 21 additions & 54 deletions antareslauncher/use_cases/retrieve/download_final_zip.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import copy
from pathlib import Path

from antareslauncher.display.display_terminal import DisplayTerminal
from antareslauncher.remote_environnement.remote_environment_with_slurm import (
RemoteEnvironmentWithSlurm,
)
from antareslauncher.study_dto import StudyDTO


class FinalZipNotDownloadedException(Exception):
pass
LOG_NAME = f"{__name__}.FinalZipDownloader"


class FinalZipDownloader(object):
Expand All @@ -19,7 +17,6 @@ def __init__(
):
self._env = env
self._display = display
self._current_study = None

def download(self, study: StudyDTO):
"""
Expand All @@ -32,57 +29,27 @@ def download(self, study: StudyDTO):
Returns:
The updated data transfer object, with its `local_final_zipfile_path`
attribute set if the download was successful.
Raises:
FinalZipNotDownloadedException: If the download fails or no files are found.
"""
self._current_study = copy.copy(study)
if (
self._current_study.finished
and not self._current_study.with_error
and not self._current_study.local_final_zipfile_path
):
self._do_download()
return self._current_study

def _do_download(self):
"""
Perform the download of the final ZIP file for the current study,
and update its `local_final_zipfile_path` attribute.
Raises:
FinalZipNotDownloadedException: If the download fails or no files are found.
Note:
This function delegates the download operation to the
`_env.download_final_zip` method, which is assumed to return
the path to the downloaded zip file on the local filesystem
or `None` if the download fails or no files are found.
If the download succeeds, the `local_final_zipfile_path` attribute
of the `_current_study` object is updated with the path to the
downloaded file, and a success message is displayed.
If the download fails, an error message is displayed and a
`FinalZipNotDownloadedException` exception is raised.
"""
self._display.show_message(
f'"{self._current_study.name}": downloading final ZIP...',
f"{__name__}.{__class__.__name__}",
)
if local_final_zipfile_path := self._env.download_final_zip(
copy.copy(self._current_study)
study.finished
and not study.with_error
and not study.local_final_zipfile_path
):
self._current_study.local_final_zipfile_path = str(local_final_zipfile_path)
self._display.show_message(
f'"{self._current_study.name}": Final ZIP downloaded',
f"{__name__}.{__class__.__name__}",
)
else:
self._display.show_error(
f'"{self._current_study.name}": Final ZIP not downloaded',
f"{__name__}.{__class__.__name__}",
)
raise FinalZipNotDownloadedException(
self._current_study.local_final_zipfile_path
f'"{study.name}": downloading final ZIP...',
LOG_NAME,
)
dst_dir = Path(study.output_dir)
dst_dir.mkdir(parents=True, exist_ok=True)
zip_path = self._env.download_final_zip(study)
study.local_final_zipfile_path = str(zip_path) if zip_path else ""
if study.local_final_zipfile_path:
self._display.show_message(
f'"{study.name}": Final ZIP downloaded',
LOG_NAME,
)
else:
self._display.show_error(
f'"{study.name}": Final ZIP NOT downloaded',
LOG_NAME,
)
85 changes: 45 additions & 40 deletions antareslauncher/use_cases/retrieve/final_zip_extractor.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,55 @@
import zipfile
from pathlib import Path

from antareslauncher.display.display_terminal import DisplayTerminal
from antareslauncher.file_manager.file_manager import FileManager
from antareslauncher.study_dto import StudyDTO


class ResultNotExtractedException(Exception):
pass
LOG_NAME = f"{__name__}.FinalZipDownloader"


class FinalZipExtractor:
def __init__(self, file_manager: FileManager, display: DisplayTerminal):
self._file_manager = file_manager
def __init__(self, display: DisplayTerminal):
self._display = display
self._current_study: StudyDTO = None

def extract_final_zip(self, study: StudyDTO) -> StudyDTO:
self._current_study = study
if self._study_final_zip_should_be_extracted():
self._do_extract()
return self._current_study

def _do_extract(self):
zipfile_to_extract = self._current_study.local_final_zipfile_path
success = self._file_manager.unzip(zipfile_to_extract)
if success:
self._current_study.final_zip_extracted = success
self._show_success_message()
else:
self._show_failure_error()
raise ResultNotExtractedException

def _show_failure_error(self):
self._display.show_error(
f'"{Path(self._current_study.path).name}": Final zip not extracted',
__name__ + "." + __class__.__name__,
)

def _show_success_message(self):
self._display.show_message(
f'"{Path(self._current_study.path).name}": Final zip extracted',
__name__ + "." + __class__.__name__,
)

def _study_final_zip_should_be_extracted(self):
return (
self._current_study.local_final_zipfile_path
and not self._current_study.final_zip_extracted
)
def extract_final_zip(self, study: StudyDTO) -> None:
"""
Extracts the simulation results, which are in the form of a ZIP file,
after it has been downloaded from Antares.
Args:
study: The current study
"""
if (
study.finished
and not study.with_error
and study.local_final_zipfile_path
and not study.final_zip_extracted
):
zip_path = Path(study.local_final_zipfile_path)
target_dir = zip_path.with_suffix("")
try:
with zipfile.ZipFile(zip_path) as zf:
names = zf.namelist()
progress_bar = self._display.generate_progress_bar(
names, desc="Extracting archive:", total=len(names)
)
for file in progress_bar:
zf.extract(member=file, path=target_dir)
except (OSError, zipfile.BadZipFile) as exc:
# If we cannot extract the final ZIP file, either because the file
# doesn't exist or the ZIP file is corrupted, we find ourselves
# in a situation where the results are unusable.
# In such cases, it's best to consider the simulation as failed,
# enabling the user to restart its simulation.
study.final_zip_extracted = False
study.with_error = True
self._display.show_error(
f'"{study.name}": Final zip not extracted: {exc}',
LOG_NAME,
)
else:
study.final_zip_extracted = True
self._display.show_message(
f'"{study.name}": Final zip extracted',
LOG_NAME,
)
71 changes: 30 additions & 41 deletions antareslauncher/use_cases/retrieve/log_downloader.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,52 @@
import copy
from pathlib import Path

from antareslauncher.display.display_terminal import DisplayTerminal
from antareslauncher.file_manager.file_manager import FileManager
from antareslauncher.remote_environnement.remote_environment_with_slurm import (
RemoteEnvironmentWithSlurm,
)
from antareslauncher.study_dto import StudyDTO

LOG_NAME = f"{__name__}.LogDownloader"


class LogDownloader:
def __init__(
self,
env: RemoteEnvironmentWithSlurm,
file_manager: FileManager,
display: DisplayTerminal,
):
self.env = env
self.display = display
self.file_manager = file_manager
self._current_study = None

def _create_logs_subdirectory(self):
self._set_current_study_log_dir_path()
self.file_manager.make_dir(self._current_study.job_log_dir)

def _set_current_study_log_dir_path(self):
directory_name = self._get_log_dir_name()
if Path(self._current_study.job_log_dir).name != directory_name:
self._current_study.job_log_dir = str(
Path(self._current_study.job_log_dir) / directory_name
)

def _get_log_dir_name(self):
return (
Path(self._current_study.path).name + "_" + str(self._current_study.job_id)
)

def run(self, study: StudyDTO):
"""Downloads slurm logs from the server then save study if the study is running
def run(self, study: StudyDTO) -> None:
"""
Downloads slurm logs from the server then save study if the study is running
Args:
study: The study data transfer object
"""
self._current_study = copy.copy(study)
if self._current_study.started:
self._create_logs_subdirectory()
self._do_download_logs()
return self._current_study

def _do_download_logs(self):
if self.env.download_logs(copy.copy(self._current_study)):
self._current_study.logs_downloaded = True
self.display.show_message(
f'"{Path(self._current_study.path).name}": Logs downloaded',
f"{__name__}.{__class__.__name__}",
)
else:
self.display.show_error(
f'"{Path(self._current_study.path).name}": Logs not downloaded',
f"{__name__}.{__class__.__name__}",
)
if study.started:
# set_current_study_log_dir_path
directory_name = f"{study.name}_{study.job_id}"
job_log_dir = Path(study.job_log_dir)
if job_log_dir.name != directory_name:
job_log_dir = job_log_dir / directory_name
study.job_log_dir = str(job_log_dir)

# create logs subdirectory
job_log_dir.mkdir(parents=True, exist_ok=True)

# make an attempt to download logs
downloaded_logs = self.env.download_logs(study)
if downloaded_logs:
study.logs_downloaded = True
self.display.show_message(
f'"{study.name}": Logs downloaded',
LOG_NAME,
)
else:
# No file to download
self.display.show_error(
f'"{study.name}": Logs NOT downloaded',
LOG_NAME,
)
Loading

0 comments on commit 88efc98

Please sign in to comment.