Skip to content

Commit

Permalink
perf(variant): improve performances and correct snapshot generation (#…
Browse files Browse the repository at this point in the history
…1854)

Cette PR permet de corriger (en partie) le problème de lenteur de la
génération des variants en évitant de générer le snapshot s'il existe
déjà.

En effet, lorsqu'un variant est modifié, son snapshot n'est plus à jour
et il suffit d'appliquer les nouvelles commandes pour le mettre à jour.

Dans de rares cas, par exemple si l'utilisateur modifie l'historique des
commandes, le snapshot ne sera pas mis à jour correctement. Cette
situation n'est pour l'instant pas gérée automatiquement.
  • Loading branch information
laurent-laporte-pro authored and skamril committed Dec 13, 2023
1 parent ece239f commit 9b0d27c
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 143 deletions.
2 changes: 1 addition & 1 deletion antarest/study/storage/variantstudy/model/dbmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def snapshot_dir(self) -> Path:
"""Get the path of the snapshot directory."""
return Path(self.path) / "snapshot"

def is_snapshot_recent(self) -> bool:
def is_snapshot_up_to_date(self) -> bool:
"""Check if the snapshot exists and is up-to-date."""
return (
(self.snapshot is not None)
Expand Down
163 changes: 95 additions & 68 deletions antarest/study/storage/variantstudy/snapshot_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import datetime
import logging
import shutil
import tempfile
import typing as t
from pathlib import Path

Expand Down Expand Up @@ -51,8 +50,6 @@ def __init__(
self.study_factory = study_factory
self.patch_service = patch_service
self.repository = repository
# Temporary directory used to generate the snapshot
self._tmp_dir: Path = Path()

def generate_snapshot(
self,
Expand All @@ -75,32 +72,29 @@ def generate_snapshot(

root_study, descendants = self._retrieve_descendants(variant_study_id)
assert_permission_on_studies(jwt_user, [root_study, *descendants], StudyPermissionType.READ, raising=True)
ref_study, cmd_blocks = search_ref_study(root_study, descendants, from_scratch=from_scratch)
search_result = search_ref_study(root_study, descendants, from_scratch=from_scratch)

# We are going to generate the snapshot in a temporary directory which will be renamed
# at the end of the process. This prevents incomplete snapshots in case of error.
ref_study = search_result.ref_study
cmd_blocks = search_result.cmd_blocks

# Get snapshot directory and prepare a temporary directory next to it.
# Get snapshot directory
variant_study = descendants[-1]
snapshot_dir = variant_study.snapshot_dir
snapshot_dir.parent.mkdir(parents=True, exist_ok=True)
self._tmp_dir = Path(tempfile.mkdtemp(dir=snapshot_dir.parent, prefix=f"~{snapshot_dir.name}", suffix=".tmp"))

try:
logger.info(f"Exporting the reference study '{ref_study.id}' to '{self._tmp_dir.name}'...")
self._export_ref_study(ref_study)
if search_result.force_regenerate or not snapshot_dir.exists():
logger.info(f"Exporting the reference study '{ref_study.id}' to '{snapshot_dir.name}'...")
shutil.rmtree(snapshot_dir, ignore_errors=True)
self._export_ref_study(snapshot_dir, ref_study)

logger.info(f"Applying commands to the reference study '{ref_study.id}'...")
results = self._apply_commands(variant_study, ref_study, cmd_blocks)

if (snapshot_dir / "user").exists():
logger.info("Keeping previous unmanaged user config...")
shutil.copytree(snapshot_dir / "user", self._tmp_dir / "user", dirs_exist_ok=True)
results = self._apply_commands(snapshot_dir, variant_study, cmd_blocks)

# The snapshot is generated, we also need to de-normalize the matrices.
file_study = self.study_factory.create_from_fs(
self._tmp_dir,
snapshot_dir,
study_id=variant_study_id,
output_path=self._tmp_dir / OUTPUT_RELATIVE_PATH,
output_path=snapshot_dir / OUTPUT_RELATIVE_PATH,
use_cache=False, # Avoid saving the study config in the cache
)
if denormalize:
Expand All @@ -112,26 +106,20 @@ def generate_snapshot(
variant_study.snapshot = VariantStudySnapshot(
id=variant_study_id,
created_at=datetime.datetime.utcnow(),
last_executed_command=cmd_blocks[-1].id if cmd_blocks else None,
last_executed_command=variant_study.commands[-1].id if variant_study.commands else None,
)

logger.info(f"Reading additional data from files for study {file_study.config.study_id}")
variant_study.additional_data = self._read_additional_data(file_study)
self.repository.save(variant_study)

# Store the study config in the cache (with adjusted paths).
file_study.config.study_path = file_study.config.path = snapshot_dir
file_study.config.output_path = snapshot_dir / OUTPUT_RELATIVE_PATH
self._update_cache(file_study)

except Exception:
shutil.rmtree(self._tmp_dir, ignore_errors=True)
shutil.rmtree(snapshot_dir, ignore_errors=True)
raise

else:
# Rename the temporary directory to the final snapshot directory
shutil.rmtree(snapshot_dir, ignore_errors=True)
self._tmp_dir.rename(snapshot_dir)
try:
notifier(results.json())
except Exception as exc:
Expand All @@ -149,20 +137,20 @@ def _retrieve_descendants(self, variant_study_id: str) -> t.Tuple[RawStudy, t.Se
root_study = self.repository.one(descendant_ids[0])
return root_study, descendants

def _export_ref_study(self, ref_study: t.Union[RawStudy, VariantStudy]) -> None:
self._tmp_dir.rmdir() # remove the temporary directory for shutil.copytree
def _export_ref_study(self, snapshot_dir: Path, ref_study: t.Union[RawStudy, VariantStudy]) -> None:
if isinstance(ref_study, VariantStudy):
snapshot_dir.parent.mkdir(parents=True, exist_ok=True)
export_study_flat(
ref_study.snapshot_dir,
self._tmp_dir,
snapshot_dir,
self.study_factory,
denormalize=False, # de-normalization is done at the end
outputs=False, # do NOT export outputs
)
elif isinstance(ref_study, RawStudy):
self.raw_study_service.export_study_flat(
ref_study,
self._tmp_dir,
snapshot_dir,
denormalize=False, # de-normalization is done at the end
outputs=False, # do NOT export outputs
)
Expand All @@ -171,15 +159,15 @@ def _export_ref_study(self, ref_study: t.Union[RawStudy, VariantStudy]) -> None:

def _apply_commands(
self,
snapshot_dir: Path,
variant_study: VariantStudy,
ref_study: t.Union[RawStudy, VariantStudy],
cmd_blocks: t.Sequence[CommandBlock],
) -> GenerationResultInfoDTO:
commands = [self.command_factory.to_command(cb.to_dto()) for cb in cmd_blocks]
generator = VariantCommandGenerator(self.study_factory)
results = generator.generate(
commands,
self._tmp_dir,
snapshot_dir,
variant_study,
delete_on_failure=False, # Not needed, because we are using a temporary directory
notifier=None,
Expand Down Expand Up @@ -208,12 +196,22 @@ def _update_cache(self, file_study: FileStudy) -> None:
)


class RefStudySearchResult(t.NamedTuple):
"""
Result of the search for the reference study.
"""

ref_study: t.Union[RawStudy, VariantStudy]
cmd_blocks: t.Sequence[CommandBlock]
force_regenerate: bool = False


def search_ref_study(
root_study: t.Union[RawStudy, VariantStudy],
descendants: t.Sequence[VariantStudy],
*,
from_scratch: bool = False,
) -> t.Tuple[t.Union[RawStudy, VariantStudy], t.Sequence[CommandBlock]]:
) -> RefStudySearchResult:
"""
Search for the reference study and the commands to use for snapshot generation.
Expand All @@ -225,6 +223,9 @@ def search_ref_study(
Returns:
The reference study and the commands to use for snapshot generation.
"""
if not descendants:
# Edge case where the list of studies is empty.
return RefStudySearchResult(ref_study=root_study, cmd_blocks=[], force_regenerate=True)

# The reference study is the root study or a variant study with a valid snapshot
ref_study: t.Union[RawStudy, VariantStudy]
Expand All @@ -236,42 +237,68 @@ def search_ref_study(
# In the case of a from scratch generation, the root study will be used as the reference study.
# We need to retrieve all commands from the descendants of variants in order to apply them
# on the reference study.
ref_study = root_study
cmd_blocks = [c for v in descendants for c in v.commands]
return RefStudySearchResult(
ref_study=root_study,
cmd_blocks=[c for v in descendants for c in v.commands],
force_regenerate=True,
)

else:
# To generate the last variant of a descendant of variants, we must search for
# the most recent snapshot in order to use it as a reference study.
# If no snapshot is found, we use the root study as a reference study.

snapshot_vars = [v for v in descendants if v.is_snapshot_recent()]

if snapshot_vars:
# We use the most recent snapshot as a reference study
ref_study = max(snapshot_vars, key=lambda v: v.snapshot.created_at)

# This variant's snapshot corresponds to the commands actually generated
# at the time of the snapshot. However, we need to retrieve the remaining commands,
# because the snapshot generation may be incomplete.
last_exec_cmd = ref_study.snapshot.last_executed_command # ID of the command
if not last_exec_cmd:
# It is unlikely that this case will occur, but it means that
# the snapshot is not correctly generated (corrupted database).
# It better to use all commands to force snapshot re-generation.
cmd_blocks = ref_study.commands[:]
else:
command_ids = [c.id for c in ref_study.commands]
last_exec_index = command_ids.index(last_exec_cmd)
cmd_blocks = ref_study.commands[last_exec_index + 1 :]

# We need to add all commands from the descendants of variants
# starting at the first descendant of reference study.
index = descendants.index(ref_study)
cmd_blocks.extend([c for v in descendants[index + 1 :] for c in v.commands])
# To reuse the snapshot of the current variant, the last executed command
# must be one of the commands of the current variant.
curr_variant = descendants[-1]
if curr_variant.snapshot:
last_exec_cmd = curr_variant.snapshot.last_executed_command
command_ids = [c.id for c in curr_variant.commands]
# If the variant has no command, we can reuse the snapshot if it is recent
if not last_exec_cmd and not command_ids and curr_variant.is_snapshot_up_to_date():
return RefStudySearchResult(
ref_study=curr_variant,
cmd_blocks=[],
force_regenerate=False,
)
elif last_exec_cmd and last_exec_cmd in command_ids:
# We can reuse the snapshot of the current variant
last_exec_index = command_ids.index(last_exec_cmd)
return RefStudySearchResult(
ref_study=curr_variant,
cmd_blocks=curr_variant.commands[last_exec_index + 1 :],
force_regenerate=False,
)

# We cannot reuse the snapshot of the current variant
# To generate the last variant of a descendant of variants, we must search for
# the most recent snapshot in order to use it as a reference study.
# If no snapshot is found, we use the root study as a reference study.

snapshot_vars = [v for v in descendants if v.is_snapshot_up_to_date()]

if snapshot_vars:
# We use the most recent snapshot as a reference study
ref_study = max(snapshot_vars, key=lambda v: v.snapshot.created_at)

# This variant's snapshot corresponds to the commands actually generated
# at the time of the snapshot. However, we need to retrieve the remaining commands,
# because the snapshot generation may be incomplete.
last_exec_cmd = ref_study.snapshot.last_executed_command # ID of the command
command_ids = [c.id for c in ref_study.commands]
if not last_exec_cmd or last_exec_cmd not in command_ids:
# The last executed command may be missing (probably caused by a bug)
# or may reference a removed command.
# This requires regenerating the snapshot from scratch,
# with all commands from the reference study.
cmd_blocks = ref_study.commands[:]
else:
# We use the root study as a reference study
ref_study = root_study
cmd_blocks = [c for v in descendants for c in v.commands]
last_exec_index = command_ids.index(last_exec_cmd)
cmd_blocks = ref_study.commands[last_exec_index + 1 :]

# We need to add all commands from the descendants of variants
# starting at the first descendant of reference study.
index = descendants.index(ref_study)
cmd_blocks.extend([c for v in descendants[index + 1 :] for c in v.commands])

else:
# We use the root study as a reference study
ref_study = root_study
cmd_blocks = [c for v in descendants for c in v.commands]

return ref_study, cmd_blocks
return RefStudySearchResult(ref_study=ref_study, cmd_blocks=cmd_blocks, force_regenerate=True)
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ def generate(
if variant_study.parent_id is None:
raise NoParentStudyError(variant_study_id)

return self.generate_task(variant_study, denormalize)
return self.generate_task(variant_study, denormalize, from_scratch=from_scratch)

def generate_study_config(
self,
Expand Down
4 changes: 2 additions & 2 deletions tests/study/storage/variantstudy/model/test_dbmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def test_init__without_snapshot(self, db_session: Session, raw_study_id: str, us

# check Variant-specific properties
assert obj.snapshot_dir == Path(variant_study_path).joinpath("snapshot")
assert obj.is_snapshot_recent() is False
assert obj.is_snapshot_up_to_date() is False

@pytest.mark.parametrize(
"created_at, updated_at, study_antares_file, expected",
Expand Down Expand Up @@ -294,4 +294,4 @@ def test_is_snapshot_recent(

# Check the snapshot_uptodate() method
obj: VariantStudy = db_session.query(VariantStudy).filter(VariantStudy.id == variant_id).one()
assert obj.is_snapshot_recent() == expected
assert obj.is_snapshot_up_to_date() == expected
Loading

0 comments on commit 9b0d27c

Please sign in to comment.