Skip to content

Commit

Permalink
feat(raw): add endpoint for matrix download (#1906)
Browse files Browse the repository at this point in the history
  • Loading branch information
skamril committed Feb 13, 2024
2 parents 188099c + 511df69 commit 4509af4
Show file tree
Hide file tree
Showing 10 changed files with 819 additions and 62 deletions.
42 changes: 24 additions & 18 deletions antarest/core/filetransfer/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def request_download(
filename: str,
name: Optional[str] = None,
owner: Optional[JWTUser] = None,
use_notification: bool = True,
expiration_time_in_minutes: int = 0,
) -> FileDownload:
fh, path = tempfile.mkstemp(dir=self.tmp_dir, suffix=filename)
os.close(fh)
Expand All @@ -55,36 +57,40 @@ def request_download(
path=str(tmpfile),
owner=owner.impersonator if owner is not None else None,
expiration_date=datetime.datetime.utcnow()
+ datetime.timedelta(minutes=self.download_default_expiration_timeout_minutes),
+ datetime.timedelta(
minutes=expiration_time_in_minutes or self.download_default_expiration_timeout_minutes
),
)
self.repository.add(download)
self.event_bus.push(
Event(
type=EventType.DOWNLOAD_CREATED,
payload=download.to_dto(),
permissions=PermissionInfo(owner=owner.impersonator)
if owner
else PermissionInfo(public_mode=PublicMode.READ),
if use_notification:
self.event_bus.push(
Event(
type=EventType.DOWNLOAD_CREATED,
payload=download.to_dto(),
permissions=PermissionInfo(owner=owner.impersonator)
if owner
else PermissionInfo(public_mode=PublicMode.READ),
)
)
)
return download

def set_ready(self, download_id: str) -> None:
def set_ready(self, download_id: str, use_notification: bool = True) -> None:
download = self.repository.get(download_id)
if not download:
raise FileDownloadNotFound()

download.ready = True
self.repository.save(download)
self.event_bus.push(
Event(
type=EventType.DOWNLOAD_READY,
payload=download.to_dto(),
permissions=PermissionInfo(owner=download.owner)
if download.owner
else PermissionInfo(public_mode=PublicMode.READ),
if use_notification:
self.event_bus.push(
Event(
type=EventType.DOWNLOAD_READY,
payload=download.to_dto(),
permissions=PermissionInfo(owner=download.owner)
if download.owner
else PermissionInfo(public_mode=PublicMode.READ),
)
)
)

def fail(self, download_id: str, reason: str = "") -> None:
download = self.repository.get(download_id)
Expand Down
46 changes: 46 additions & 0 deletions antarest/study/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from uuid import uuid4

import numpy as np
import pandas as pd
from fastapi import HTTPException, UploadFile
from markupsafe import escape
from starlette.responses import FileResponse, Response
Expand All @@ -20,6 +21,7 @@
from antarest.core.exceptions import (
BadEditInstructionException,
CommandApplicationError,
IncorrectPathError,
NotAManagedStudyException,
StudyDeletionNotAllowed,
StudyNotFoundError,
Expand Down Expand Up @@ -54,6 +56,7 @@
from antarest.study.business.areas.thermal_management import ThermalManager
from antarest.study.business.binding_constraint_management import BindingConstraintManager
from antarest.study.business.config_management import ConfigManager
from antarest.study.business.correlation_management import CorrelationManager
from antarest.study.business.district_manager import DistrictManager
from antarest.study.business.general_management import GeneralManager
from antarest.study.business.link_management import LinkInfoDTO, LinkManager
Expand Down Expand Up @@ -93,6 +96,7 @@
StudySimResultDTO,
)
from antarest.study.repository import StudyFilter, StudyMetadataRepository, StudyPagination, StudySortBy
from antarest.study.storage.matrix_profile import adjust_matrix_columns_index
from antarest.study.storage.rawstudy.model.filesystem.config.model import FileStudyTreeConfigDTO
from antarest.study.storage.rawstudy.model.filesystem.folder_node import ChildNotFoundError
from antarest.study.storage.rawstudy.model.filesystem.ini_file_node import IniFileNode
Expand Down Expand Up @@ -268,6 +272,7 @@ def __init__(
self.xpansion_manager = XpansionManager(self.storage_service)
self.matrix_manager = MatrixManager(self.storage_service)
self.binding_constraint_manager = BindingConstraintManager(self.storage_service)
self.correlation_manager = CorrelationManager(self.storage_service)
self.cache_service = cache_service
self.config = config
self.on_deletion_callbacks: t.List[t.Callable[[str], None]] = []
Expand Down Expand Up @@ -2379,3 +2384,44 @@ def get_disk_usage(self, uuid: str, params: RequestParameters) -> int:
study_path = self.storage_service.raw_study_service.get_study_path(study)
# If the study is a variant, it's possible that it only exists in DB and not on disk. If so, we return 0.
return get_disk_usage(study_path) if study_path.exists() else 0

def get_matrix_with_index_and_header(
self, *, study_id: str, path: str, with_index: bool, with_header: bool, parameters: RequestParameters
) -> pd.DataFrame:
matrix_path = Path(path)
study = self.get_study(study_id)

if matrix_path.parts in [("input", "hydro", "allocation"), ("input", "hydro", "correlation")]:
all_areas = t.cast(
t.List[AreaInfoDTO],
self.get_all_areas(study_id, area_type=AreaType.AREA, ui=False, params=parameters),
)
if matrix_path.parts[-1] == "allocation":
hydro_matrix = self.allocation_manager.get_allocation_matrix(study, all_areas)
else:
hydro_matrix = self.correlation_manager.get_correlation_matrix(all_areas, study, []) # type: ignore
return pd.DataFrame(data=hydro_matrix.data, columns=hydro_matrix.columns, index=hydro_matrix.index)

matrix_obj = self.get(study_id, path, depth=3, formatted=True, params=parameters)
if set(matrix_obj) != {"data", "index", "columns"}:
raise IncorrectPathError(f"The provided path does not point to a valid matrix: '{path}'")
if not matrix_obj["data"]:
return pd.DataFrame()

df_matrix = pd.DataFrame(**matrix_obj)
if with_index:
matrix_index = self.get_input_matrix_startdate(study_id, path, parameters)
time_column = pd.date_range(
start=matrix_index.start_date, periods=len(df_matrix), freq=matrix_index.level.value[0]
)
df_matrix.index = time_column

adjust_matrix_columns_index(
df_matrix,
path,
with_index=with_index,
with_header=with_header,
study_version=int(study.version),
)

return df_matrix
211 changes: 211 additions & 0 deletions antarest/study/storage/matrix_profile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
import calendar
import copy
import fnmatch
import typing as t
from pathlib import Path

import pandas as pd


class _MatrixProfile(t.NamedTuple):
"""
Matrix profile for time series or specific matrices.
"""

cols: t.Sequence[str]
rows: t.Sequence[str]

def process_dataframe(
self,
df: pd.DataFrame,
matrix_path: str,
*,
with_index: bool,
with_header: bool,
) -> None:
"""
Adjust the column names and index of a dataframe according to the matrix profile.
*NOTE:* The modification is done in place.
Args:
df: The dataframe to process.
matrix_path: The path of the matrix file, relative to the study directory.
with_index: Whether to set the index of the dataframe.
with_header: Whether to set the column names of the dataframe.
"""
if with_header:
if Path(matrix_path).parts[1] == "links":
cols = self._process_links_columns(matrix_path)
else:
cols = self.cols
if cols:
df.columns = pd.Index(cols)
else:
df.columns = pd.Index([f"TS-{i}" for i in range(1, len(df.columns) + 1)])

if with_index and self.rows:
df.index = pd.Index(self.rows)

def _process_links_columns(self, matrix_path: str) -> t.Sequence[str]:
"""Process column names specific to the links matrices."""
path_parts = Path(matrix_path).parts
area1_id = path_parts[2]
area2_id = path_parts[3]
result = list(self.cols)
for k, col in enumerate(result):
if col == "Hurdle costs direct":
result[k] = f"{col} ({area1_id}->{area2_id})"
elif col == "Hurdle costs indirect":
result[k] = f"{col} ({area2_id}->{area1_id})"
return result


_SPECIFIC_MATRICES: t.Dict[str, _MatrixProfile]
"""
The dictionary ``_SPECIFIC_MATRICES`` maps file patterns to ``_MatrixProfile`` objects,
representing non-time series matrices.
It's used in the `adjust_matrix_columns_index` method to fetch matrix profiles based on study versions.
"""


# noinspection SpellCheckingInspection
_SPECIFIC_MATRICES = {
"input/hydro/common/capacity/creditmodulations_*": _MatrixProfile(
cols=[str(i) for i in range(101)],
rows=["Generating Power", "Pumping Power"],
),
"input/hydro/common/capacity/maxpower_*": _MatrixProfile(
cols=[
"Generating Max Power (MW)",
"Generating Max Energy (Hours at Pmax)",
"Pumping Max Power (MW)",
"Pumping Max Energy (Hours at Pmax)",
],
rows=[],
),
"input/hydro/common/capacity/reservoir_*": _MatrixProfile(
# Values are displayed in % in the UI, but the actual values are in p.u. (per unit)
cols=["Lev Low (p.u)", "Lev Avg (p.u)", "Lev High (p.u)"],
rows=[],
),
"input/hydro/common/capacity/waterValues_*": _MatrixProfile(
cols=[f"{i}%" for i in range(101)],
rows=[],
),
"input/hydro/series/*/mod": _MatrixProfile(cols=[], rows=[]),
"input/hydro/series/*/ror": _MatrixProfile(cols=[], rows=[]),
"input/hydro/common/capacity/inflowPattern_*": _MatrixProfile(cols=["Inflow Pattern (X)"], rows=[]),
"input/hydro/prepro/*/energy": _MatrixProfile(
cols=["Expectation (MWh)", "Std Deviation (MWh)", "Min. (MWh)", "Max. (MWh)", "ROR Share"],
rows=calendar.month_name[1:],
),
"input/thermal/prepro/*/*/modulation": _MatrixProfile(
cols=["Marginal cost modulation", "Market bid modulation", "Capacity modulation", "Min gen modulation"],
rows=[],
),
"input/thermal/prepro/*/*/data": _MatrixProfile(
cols=["FO Duration", "PO Duration", "FO Rate", "PO Rate", "NPO Min", "NPO Max"],
rows=[],
),
"input/reserves/*": _MatrixProfile(
cols=["Primary Res. (draft)", "Strategic Res. (draft)", "DSM", "Day Ahead"],
rows=[],
),
"input/misc-gen/miscgen-*": _MatrixProfile(
cols=["CHP", "Bio Mass", "Bio Gaz", "Waste", "GeoThermal", "Other", "PSP", "ROW Balance"],
rows=[],
),
"input/bindingconstraints/*": _MatrixProfile(cols=["<", ">", "="], rows=[]),
"input/links/*/*": _MatrixProfile(
cols=[
"Capacités de transmission directes",
"Capacités de transmission indirectes",
"Hurdle costs direct",
"Hurdle costs indirect",
"Impedances",
"Loop flow",
"P.Shift Min",
"P.Shift Max",
],
rows=[],
),
}

_SPECIFIC_MATRICES_820 = copy.deepcopy(_SPECIFIC_MATRICES)
"""Specific matrices for study version 8.2."""

_SPECIFIC_MATRICES_820["input/links/*/*"] = _MatrixProfile(
cols=[
"Hurdle costs direct",
"Hurdle costs indirect",
"Impedances",
"Loop flow",
"P.Shift Min",
"P.Shift Max",
],
rows=[],
)

# Specific matrices for study version 8.6
_SPECIFIC_MATRICES_860 = copy.deepcopy(_SPECIFIC_MATRICES_820)
"""Specific matrices for study version 8.6."""

# noinspection SpellCheckingInspection
#
_SPECIFIC_MATRICES_860["input/hydro/series/*/mingen"] = _MatrixProfile(cols=[], rows=[])

_SPECIFIC_MATRICES_870 = copy.deepcopy(_SPECIFIC_MATRICES_820)
"""Specific matrices for study version 8.7."""

# noinspection SpellCheckingInspection
# Scenarized RHS for binding constraints
_SPECIFIC_MATRICES_870["input/bindingconstraints/*"] = _MatrixProfile(cols=[], rows=[])


def adjust_matrix_columns_index(
df: pd.DataFrame, matrix_path: str, with_index: bool, with_header: bool, study_version: int
) -> None:
"""
Adjust the column names and index of a dataframe according to the matrix profile.
*NOTE:* The modification is done in place.
Args:
df: The dataframe to process.
matrix_path: The path of the matrix file, relative to the study directory.
with_index: Whether to set the index of the dataframe.
with_header: Whether to set the column names of the dataframe.
study_version: The version of the study.
"""
# Get the matrix profiles for a given study version
if study_version < 820:
matrix_profiles = _SPECIFIC_MATRICES
elif study_version < 860:
matrix_profiles = _SPECIFIC_MATRICES_820
elif study_version < 870:
matrix_profiles = _SPECIFIC_MATRICES_860
else:
matrix_profiles = _SPECIFIC_MATRICES_870

# Apply the matrix profile to the dataframe to adjust the column names and index
for pattern, matrix_profile in matrix_profiles.items():
if fnmatch.fnmatch(matrix_path, pattern):
matrix_profile.process_dataframe(
df,
matrix_path,
with_index=with_index,
with_header=with_header,
)
return

if fnmatch.fnmatch(matrix_path, "output/*"):
# Outputs already have their own column names
return

# The matrix may be a time series, in which case we don't need to adjust anything
# (the "Time" columns is already the index)
# Column names should be Monte-Carlo years: "TS-1", "TS-2", ...
df.columns = pd.Index([f"TS-{i}" for i in range(1, len(df.columns) + 1)])

return None
Loading

0 comments on commit 4509af4

Please sign in to comment.