Skip to content

Commit

Permalink
feat(launcher): allow local launcher to work with xpress (#2251)
Browse files Browse the repository at this point in the history
- allow user to launch a simulation with xpress and presolve in local 
- use simulator 8.8.11 instead of 8.8.5
- Don't import failed outputs inside the study
- separate logs in stdout and stderr as it was done with the slurm launcher
- Persist the logs if the simulation fails (by introducing a local_workspace)
  • Loading branch information
MartinBelthle authored Jan 16, 2025
1 parent e70e523 commit 36ebe7b
Show file tree
Hide file tree
Showing 15 changed files with 177 additions and 155 deletions.
6 changes: 6 additions & 0 deletions antarest/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ class LocalConfig:
enable_nb_cores_detection: bool = True
nb_cores: NbCoresConfig = NbCoresConfig()
time_limit: TimeLimitConfig = TimeLimitConfig()
xpress_dir: Optional[str] = None
local_workspace: Path = Path("./local_workspace")

@classmethod
def from_dict(cls, data: JSON) -> "LocalConfig":
Expand All @@ -278,10 +280,14 @@ def from_dict(cls, data: JSON) -> "LocalConfig":
nb_cores = data.get("nb_cores", asdict(defaults.nb_cores))
if enable_nb_cores_detection:
nb_cores.update(cls._autodetect_nb_cores())
xpress_dir = data.get("xpress_dir", defaults.xpress_dir)
local_workspace = Path(data["local_workspace"]) if "local_workspace" in data else defaults.local_workspace
return cls(
binaries={str(v): Path(p) for v, p in binaries.items()},
enable_nb_cores_detection=enable_nb_cores_detection,
nb_cores=NbCoresConfig(**nb_cores),
xpress_dir=xpress_dir,
local_workspace=local_workspace,
)

@classmethod
Expand Down
8 changes: 1 addition & 7 deletions antarest/launcher/adapters/abstractlauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from antarest.core.interfaces.cache import ICache
from antarest.core.interfaces.eventbus import Event, EventChannelDirectory, EventType, IEventBus
from antarest.core.model import PermissionInfo, PublicMode
from antarest.core.requests import RequestParameters
from antarest.launcher.adapters.log_parser import LaunchProgressDTO
from antarest.launcher.model import JobStatus, LauncherParametersDTO, LogType

Expand Down Expand Up @@ -70,12 +69,7 @@ def __init__(

@abstractmethod
def run_study(
self,
study_uuid: str,
job_id: str,
version: SolverVersion,
launcher_parameters: LauncherParametersDTO,
params: RequestParameters,
self, study_uuid: str, job_id: str, version: SolverVersion, launcher_parameters: LauncherParametersDTO
) -> None:
raise NotImplementedError()

Expand Down
161 changes: 77 additions & 84 deletions antarest/launcher/adapters/local_launcher/local_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,24 @@
#
# This file is part of the Antares project.

import io
import logging
import os
import shutil
import signal
import subprocess
import tempfile
import threading
import time
from pathlib import Path
from typing import Callable, Dict, Optional, Tuple, cast
from uuid import UUID
from typing import Any, Callable, Dict, List, Optional, Tuple

from antares.study.version import SolverVersion
from typing_extensions import override

from antarest.core.config import Config
from antarest.core.interfaces.cache import ICache
from antarest.core.interfaces.eventbus import IEventBus
from antarest.core.requests import RequestParameters
from antarest.launcher.adapters.abstractlauncher import AbstractLauncher, LauncherCallbacks, LauncherInitException
from antarest.launcher.adapters.log_manager import follow
from antarest.launcher.adapters.log_manager import LogTailManager
from antarest.launcher.model import JobStatus, LauncherParametersDTO, LogType

logger = logging.getLogger(__name__)
Expand All @@ -51,7 +48,11 @@ def __init__(
super().__init__(config, callbacks, event_bus, cache)
if self.config.launcher.local is None:
raise LauncherInitException("Missing parameter 'launcher.local'")
self.tmpdir = config.storage.tmp_dir
self.local_workspace = self.config.launcher.local.local_workspace
logs_path = self.local_workspace / "LOGS"
logs_path.mkdir(parents=True, exist_ok=True)
self.log_directory = logs_path
self.log_tail_manager = LogTailManager(self.local_workspace)
self.job_id_to_study_id: Dict[str, Tuple[str, Path, subprocess.Popen]] = {} # type: ignore
self.logs: Dict[str, str] = {}

Expand All @@ -76,12 +77,7 @@ def _select_best_binary(self, version: str) -> Path:

@override
def run_study(
self,
study_uuid: str,
job_id: str,
version: SolverVersion,
launcher_parameters: LauncherParametersDTO,
params: RequestParameters,
self, study_uuid: str, job_id: str, version: SolverVersion, launcher_parameters: LauncherParametersDTO
) -> None:
antares_solver_path = self._select_best_binary(f"{version:ddd}")

Expand All @@ -98,68 +94,37 @@ def run_study(
)
job.start()

def _get_job_final_output_path(self, job_id: str) -> Path:
return self.config.storage.tmp_dir / f"antares_solver-{job_id}.log"

def _compute(
self,
antares_solver_path: Path,
study_uuid: str,
uuid: UUID,
job_id: str,
launcher_parameters: LauncherParametersDTO,
) -> None:
end = False

def stop_reading_output() -> bool:
if end and str(uuid) in self.logs:
with open(
self._get_job_final_output_path(str(uuid)),
"w",
) as log_file:
log_file.write(self.logs[str(uuid)])
del self.logs[str(uuid)]
return end

tmp_path = tempfile.mkdtemp(prefix="local_launch_", dir=str(self.tmpdir))
export_path = Path(tmp_path) / "export"
export_path = self.local_workspace / job_id
logs_path = self.log_directory / job_id
logs_path.mkdir()
try:
self.callbacks.export_study(str(uuid), study_uuid, export_path, launcher_parameters)

args = [
str(antares_solver_path),
f"--force-parallel={launcher_parameters.nb_cpu}",
str(export_path),
]
process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
encoding="utf-8",
)
self.job_id_to_study_id[str(uuid)] = (
study_uuid,
export_path,
process,
)
self.callbacks.update_status(
str(uuid),
JobStatus.RUNNING,
None,
None,
)
self.callbacks.export_study(job_id, study_uuid, export_path, launcher_parameters)

simulator_args, environment_variables = self._parse_launcher_options(launcher_parameters)
new_args = [str(antares_solver_path)] + simulator_args + [str(export_path)]

std_err_file = logs_path / f"{job_id}-err.log"
std_out_file = logs_path / f"{job_id}-out.log"
with open(std_err_file, "w") as err_file, open(std_out_file, "w") as out_file:
process = subprocess.Popen(
new_args,
env=environment_variables,
stdout=out_file,
stderr=err_file,
universal_newlines=True,
encoding="utf-8",
)
self.job_id_to_study_id[job_id] = (study_uuid, export_path, process)
self.callbacks.update_status(job_id, JobStatus.RUNNING, None, None)

thread = threading.Thread(
target=lambda: follow(
cast(io.StringIO, process.stdout),
self.create_update_log(str(uuid)),
stop_reading_output,
None,
),
name=f"{self.__class__.__name__}-LogsWatcher",
daemon=True,
)
thread.start()
self.log_tail_manager.track(std_out_file, self.create_update_log(job_id))

while process.poll() is None:
time.sleep(1)
Expand All @@ -170,32 +135,59 @@ def stop_reading_output() -> bool:
subprocess.run(["Rscript", "post-processing.R"], cwd=export_path)

output_id: Optional[str] = None
try:
output_id = self.callbacks.import_output(str(uuid), export_path / "output", {})
except Exception as e:
logger.error(
f"Failed to import output for study {study_uuid} located at {export_path}",
exc_info=e,
)
del self.job_id_to_study_id[str(uuid)]
if process.returncode == 0:
# The job succeed we need to import the output
try:
launcher_logs = self._import_launcher_logs(job_id)
output_id = self.callbacks.import_output(job_id, export_path / "output", launcher_logs)
except Exception as e:
logger.error(
f"Failed to import output for study {study_uuid} located at {export_path}",
exc_info=e,
)
del self.job_id_to_study_id[job_id]
self.callbacks.update_status(
str(uuid),
job_id,
JobStatus.FAILED if process.returncode != 0 or not output_id else JobStatus.SUCCESS,
None,
output_id,
)
except Exception as e:
logger.error(f"Unexpected error happened during launch {uuid}", exc_info=e)
logger.error(f"Unexpected error happened during launch {job_id}", exc_info=e)
self.callbacks.update_status(
str(uuid),
job_id,
JobStatus.FAILED,
str(e),
None,
)
finally:
logger.info(f"Removing launch {uuid} export path at {tmp_path}")
end = True
shutil.rmtree(tmp_path)
logger.info(f"Removing launch {job_id} export path at {export_path}")
shutil.rmtree(export_path, ignore_errors=True)

def _import_launcher_logs(self, job_id: str) -> Dict[str, List[Path]]:
logs_path = self.log_directory / job_id
return {
"antares-out.log": [logs_path / f"{job_id}-out.log"],
"antares-err.log": [logs_path / f"{job_id}-err.log"],
}

def _parse_launcher_options(self, launcher_parameters: LauncherParametersDTO) -> Tuple[List[str], Dict[str, Any]]:
simulator_args = [f"--force-parallel={launcher_parameters.nb_cpu}"]
environment_variables = os.environ.copy()
if launcher_parameters.other_options:
solver = []
if "xpress" in launcher_parameters.other_options:
solver = ["--use-ortools", "--ortools-solver=xpress"]
if xpress_dir_path := self.config.launcher.local.xpress_dir: # type: ignore
environment_variables["XPRESSDIR"] = xpress_dir_path
environment_variables["XPRESS"] = environment_variables["XPRESSDIR"] + os.sep + "bin"
elif "coin" in launcher_parameters.other_options:
solver = ["--use-ortools", "--ortools-solver=coin"]
if solver:
simulator_args += solver
if "presolve" in launcher_parameters.other_options:
simulator_args += ["--solver-parameters", "PRESOLVE 1"]
return simulator_args, environment_variables

@override
def create_update_log(self, job_id: str) -> Callable[[str], None]:
Expand All @@ -210,10 +202,11 @@ def append_to_log(log_line: str) -> None:

@override
def get_log(self, job_id: str, log_type: LogType) -> Optional[str]:
if job_id in self.job_id_to_study_id and job_id in self.logs:
if job_id in self.job_id_to_study_id and job_id in self.logs and log_type == LogType.STDOUT:
return self.logs[job_id]
elif self._get_job_final_output_path(job_id).exists():
return self._get_job_final_output_path(job_id).read_text()
job_path = self.log_directory / job_id / f"{job_id}-{log_type.to_suffix()}"
if job_path.exists():
return job_path.read_text()
return None

@override
Expand Down
8 changes: 1 addition & 7 deletions antarest/launcher/adapters/slurm_launcher/slurm_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
from antarest.core.interfaces.cache import ICache
from antarest.core.interfaces.eventbus import Event, EventType, IEventBus
from antarest.core.model import PermissionInfo, PublicMode
from antarest.core.requests import RequestParameters
from antarest.core.utils.archives import unzip
from antarest.core.utils.utils import assert_this
from antarest.launcher.adapters.abstractlauncher import AbstractLauncher, LauncherCallbacks, LauncherInitException
Expand Down Expand Up @@ -592,12 +591,7 @@ def _apply_params(self, launcher_params: LauncherParametersDTO) -> argparse.Name

@override
def run_study(
self,
study_uuid: str,
job_id: str,
version: SolverVersion,
launcher_parameters: LauncherParametersDTO,
params: RequestParameters,
self, study_uuid: str, job_id: str, version: SolverVersion, launcher_parameters: LauncherParametersDTO
) -> None:
thread = threading.Thread(
target=self._run_study,
Expand Down
8 changes: 1 addition & 7 deletions antarest/launcher/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,7 @@ def run_study(
)
self.job_result_repository.save(job_status)

self.launchers[launcher].run_study(
study_uuid,
job_uuid,
solver_version,
launcher_parameters,
params,
)
self.launchers[launcher].run_study(study_uuid, job_uuid, solver_version, launcher_parameters)

self.event_bus.push(
Event(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

from typing_extensions import override

from antarest.study.storage.rawstudy.model.filesystem.bucket_node import BucketNode
from antarest.study.storage.rawstudy.model.filesystem.folder_node import FolderNode
from antarest.study.storage.rawstudy.model.filesystem.inode import TREE
from antarest.study.storage.rawstudy.model.filesystem.root.desktop import Desktop
from antarest.study.storage.rawstudy.model.filesystem.root.input.input import Input
from antarest.study.storage.rawstudy.model.filesystem.root.layers.layers import Layers
from antarest.study.storage.rawstudy.model.filesystem.root.logs import Logs
from antarest.study.storage.rawstudy.model.filesystem.root.output.output import Output
from antarest.study.storage.rawstudy.model.filesystem.root.settings.settings import Settings
from antarest.study.storage.rawstudy.model.filesystem.root.study_antares import StudyAntares
Expand All @@ -40,11 +40,13 @@ def build(self) -> TREE:
"study": StudyAntares(self.context, self.config.next_file("study.antares")),
"settings": Settings(self.context, self.config.next_file("settings")),
"layers": Layers(self.context, self.config.next_file("layers")),
"logs": Logs(self.context, self.config.next_file("logs")),
"input": Input(self.context, self.config.next_file("input")),
"user": User(self.context, self.config.next_file("user")),
}

if (self.config.path / "logs").exists():
children["logs"] = BucketNode(self.context, self.config.next_file("logs"))

if self.config.outputs:
output_config = self.config.next_file("output")
output_config.path = self.config.output_path or output_config.path
Expand Down
17 changes: 0 additions & 17 deletions antarest/study/storage/rawstudy/model/filesystem/root/logs.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ def build(self) -> TREE:
for i, s in self.config.outputs.items()
}

children["logs"] = BucketNode(self.context, self.config.next_file("logs"))
if (self.config.path / "logs").exists():
children["logs"] = BucketNode(self.context, self.config.next_file("logs"))
return children
13 changes: 13 additions & 0 deletions docs/developer-guide/install/1-CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,19 @@ it is instantiated on shared servers using `slurm`.

> NOTE: As you can see, you can use newer solver for older study version thanks to the solver retro-compatibility

### **xpress_dir**

- **Type:** str
- **Default value:** None
- **Description:** Path towards your xpress_dir. Needed if you want to launch a study with xpress. If the environment
variables "XPRESS_DIR" and "XPRESS" are set on your local environment it should work without setting them.

### **local_workspace**

- **Type:** Path
- **Default value:** `./local_workspace`
- **Description:** Antares Web uses this directory to run the simulations.

## **slurm**

SLURM (Simple Linux Utility for Resource Management) is used to interact with a remote environment (for Antares it's
Expand Down
Loading

0 comments on commit 36ebe7b

Please sign in to comment.