Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(matrices): allow other formats for internal matrices storage #2113

Open
wants to merge 47 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
32d73ed
first draft
MartinBelthle Aug 7, 2024
cdb9cf9
handle empty file scenario
MartinBelthle Aug 7, 2024
5d64b98
adapt test documentation
MartinBelthle Aug 7, 2024
218a67c
fix some tests
MartinBelthle Aug 7, 2024
bd443b6
fix code
MartinBelthle Aug 7, 2024
0f0a6eb
fix code
MartinBelthle Aug 7, 2024
2893f4f
rebase with dev
MartinBelthle Sep 20, 2024
693056d
first version without parallelization
MartinBelthle Sep 20, 2024
9bc29f2
final version with parallelization
MartinBelthle Sep 20, 2024
b784936
add header to new file
MartinBelthle Sep 20, 2024
9a35536
merge with dev
MartinBelthle Nov 18, 2024
695532c
add format inside config
MartinBelthle Nov 18, 2024
b09d574
make migration script general
MartinBelthle Nov 18, 2024
09cd6be
remove unused import
MartinBelthle Nov 18, 2024
bf09fe7
remove mypy issue
MartinBelthle Nov 18, 2024
b29ea6b
revert change inside cli
MartinBelthle Nov 18, 2024
f623e84
remove unused imports
MartinBelthle Nov 18, 2024
cd378ff
think
MartinBelthle Nov 18, 2024
a597a7a
remove unsued import
MartinBelthle Nov 18, 2024
3d15dcc
adapt matrix repo
MartinBelthle Nov 18, 2024
f5a1b40
fix tests
MartinBelthle Nov 18, 2024
a431b12
add migration on the fly
MartinBelthle Nov 18, 2024
1bb608f
fix failing test
MartinBelthle Nov 18, 2024
14d8438
Merge branch 'dev' into feat/store-matrices-as-hdf5
MartinBelthle Nov 18, 2024
7709e37
Merge branch 'dev' into feat/store-matrices-as-hdf5
MartinBelthle Nov 18, 2024
5608a8e
make code work
MartinBelthle Nov 18, 2024
fc95962
make script work and add pyarrow to requirements
MartinBelthle Nov 18, 2024
c4c6aa4
change parquet compression
MartinBelthle Nov 18, 2024
ead70ab
remove old matrix after migration
MartinBelthle Nov 18, 2024
1f5f1ce
raise error when matrix doesn't exist
MartinBelthle Nov 18, 2024
84370b1
refactor tests
MartinBelthle Nov 18, 2024
b3e4ae5
add complex test case
MartinBelthle Nov 18, 2024
52a06cf
update doc
MartinBelthle Nov 18, 2024
9d3c304
fix test
MartinBelthle Nov 18, 2024
25e55d2
try to make test work on windows
MartinBelthle Nov 18, 2024
6282552
try to remove with suffix
MartinBelthle Nov 18, 2024
0c4b4bf
put back with suffix
MartinBelthle Nov 19, 2024
60342e9
remove tests on lock files as they are removed on windows
MartinBelthle Nov 19, 2024
9fbd5ca
Merge branch 'dev' into feat/store-matrices-as-hdf5
MartinBelthle Dec 3, 2024
47f5519
Merge branch 'dev' into feat/store-matrices-as-hdf5
MartinBelthle Jan 3, 2025
737b8a6
Merge branch 'dev' into feat/store-matrices-as-hdf5
MartinBelthle Jan 9, 2025
0b91706
add feather as it seems it's the best option
MartinBelthle Jan 9, 2025
95843d3
Merge branch 'dev' into feat/store-matrices-as-hdf5
TheoPascoli Jan 15, 2025
3a0ffeb
merge with dev
MartinBelthle Feb 6, 2025
cbe8027
fix linting with new deps version
MartinBelthle Feb 6, 2025
a64a7fb
fix linting
MartinBelthle Feb 6, 2025
411d014
fix license header
MartinBelthle Feb 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions antarest/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
from dataclasses import asdict, dataclass, field
from enum import StrEnum
from pathlib import Path
from typing import Dict, List, Optional
from typing import Dict, List, Optional, cast

import numpy as np
import numpy.typing as npt
import pandas as pd
import yaml

from antarest.core.model import JSON
Expand All @@ -31,6 +34,38 @@ class Launcher(StrEnum):
DEFAULT = "default"


class InternalMatrixFormat(StrEnum):
TSV = "tsv"
HDF = "hdf"
PARQUET = "parquet"
FEATHER = "feather"

def load_matrix(self, path: Path) -> npt.NDArray[np.float64]:
if self == InternalMatrixFormat.TSV or path.stat().st_size == 0:
return np.loadtxt(path, delimiter="\t", dtype=np.float64, ndmin=2)
elif self == InternalMatrixFormat.HDF:
df = cast(pd.DataFrame, pd.read_hdf(path))
return df.to_numpy(dtype=np.float64)
elif self == InternalMatrixFormat.PARQUET:
return pd.read_parquet(path).to_numpy(dtype=np.float64)
elif self == InternalMatrixFormat.FEATHER:
return pd.read_feather(path).to_numpy(dtype=np.float64)
else:
raise NotImplementedError(f"Internal matrix format '{self}' is not implemented")

def save_matrix(self, dataframe: pd.DataFrame, path: Path) -> None:
if self == InternalMatrixFormat.TSV:
np.savetxt(path, dataframe.to_numpy(), delimiter="\t", fmt="%.18f")
elif self == InternalMatrixFormat.HDF:
dataframe.to_hdf(str(path), key="data")
elif self == InternalMatrixFormat.PARQUET:
dataframe.to_parquet(path, compression=None)
elif self == InternalMatrixFormat.FEATHER:
dataframe.to_feather(path)
else:
raise NotImplementedError(f"Internal matrix format '{self}' is not implemented")


@dataclass(frozen=True)
class ExternalAuthConfig:
"""
Expand Down Expand Up @@ -156,6 +191,7 @@ class StorageConfig:
auto_archive_sleeping_time: int = 3600
auto_archive_max_parallel: int = 5
snapshot_retention_days: int = 7
matrixstore_format: InternalMatrixFormat = InternalMatrixFormat.TSV

@classmethod
def from_dict(cls, data: JSON) -> "StorageConfig":
Expand Down Expand Up @@ -185,10 +221,8 @@ def from_dict(cls, data: JSON) -> "StorageConfig":
auto_archive_dry_run=data.get("auto_archive_dry_run", defaults.auto_archive_dry_run),
auto_archive_sleeping_time=data.get("auto_archive_sleeping_time", defaults.auto_archive_sleeping_time),
auto_archive_max_parallel=data.get("auto_archive_max_parallel", defaults.auto_archive_max_parallel),
snapshot_retention_days=data.get(
"snapshot_retention_days",
defaults.snapshot_retention_days,
),
snapshot_retention_days=data.get("snapshot_retention_days", defaults.snapshot_retention_days),
matrixstore_format=InternalMatrixFormat(data.get("matrixstore_format", defaults.matrixstore_format)),
)


Expand Down
4 changes: 1 addition & 3 deletions antarest/matrixstore/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

from typing import Optional

from fastapi import APIRouter, FastAPI

from antarest.core.application import AppBuildContext
from antarest.core.config import Config
from antarest.core.filetransfer.service import FileTransferManager
Expand Down Expand Up @@ -48,7 +46,7 @@ def build_matrix_service(
"""
if service is None:
repo = MatrixRepository()
content = MatrixContentRepository(config.storage.matrixstore)
content = MatrixContentRepository(config.storage.matrixstore, config.storage.matrixstore_format)
dataset_repo = MatrixDataSetRepository()

service = MatrixService(
Expand Down
50 changes: 50 additions & 0 deletions antarest/matrixstore/migration_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright (c) 2025, RTE (https://www.rte-france.com)
#
# See AUTHORS.txt
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# SPDX-License-Identifier: MPL-2.0
#
# This file is part of the Antares project.

import logging
from functools import partial
from pathlib import Path

import pandas as pd

from antarest.core.config import InternalMatrixFormat

logger = logging.getLogger(__name__)


def migrate_matrix(matrix_path: Path, matrix_format: InternalMatrixFormat) -> None:
new_path = matrix_path.parent.joinpath(matrix_path.stem + f".{matrix_format.value}")
old_format = InternalMatrixFormat(matrix_path.suffix[1:]) # remove the "."
data = old_format.load_matrix(matrix_path)
data = data.reshape((1, 0)) if data.size == 0 else data
df = pd.DataFrame(data=data)
matrix_format.save_matrix(df, new_path)
matrix_path.unlink()


def migrate_matrixstore(matrix_store_path: Path, format: InternalMatrixFormat) -> None:
"""
Migrates all matrices inside the matrixstore to a given format
Does nothing if all files are already in the right format.
"""
matrices = [f for f in matrix_store_path.glob("*") if f.suffixes[-1] != ".lock" and f.suffixes[0] != format.value]
if matrices:
logger.info("Matrix store migration starts")

import multiprocessing
from multiprocessing import Pool

migrate_with_format = partial(migrate_matrix, matrix_format=format)
with Pool(processes=multiprocessing.cpu_count()) as pool:
pool.map(migrate_with_format, matrices)

logger.info("Matrix store migration ended successfully")
85 changes: 56 additions & 29 deletions antarest/matrixstore/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
from pathlib import Path

import numpy as np
import pandas as pd
from filelock import FileLock
from numpy import typing as npt
from sqlalchemy import exists # type: ignore
from sqlalchemy.orm import Session # type: ignore

from antarest.core.config import InternalMatrixFormat
from antarest.core.utils.fastapi_sqlalchemy import db
from antarest.matrixstore.model import Matrix, MatrixContent, MatrixData, MatrixDataSet

Expand Down Expand Up @@ -138,16 +140,17 @@ class MatrixContentRepository:

This class provides methods to get, check existence,
save, and delete the content of matrices stored in a directory.
The matrices are stored as tab-separated values (TSV) files and
The matrices are stored in various format (described in InternalMatrixFormat) and
are accessed and modified using their SHA256 hash as their unique identifier.

Attributes:
bucket_dir: The directory path where the matrices are stored.
"""

def __init__(self, bucket_dir: Path) -> None:
def __init__(self, bucket_dir: Path, format: InternalMatrixFormat) -> None:
self.bucket_dir = bucket_dir
self.bucket_dir.mkdir(parents=True, exist_ok=True)
self.format = format

def get(self, matrix_hash: str) -> MatrixContent:
"""
Expand All @@ -159,9 +162,15 @@ def get(self, matrix_hash: str) -> MatrixContent:
Returns:
The matrix content or `None` if the file is not found.
"""

matrix_file = self.bucket_dir.joinpath(f"{matrix_hash}.tsv")
matrix = np.loadtxt(matrix_file, delimiter="\t", dtype=np.float64, ndmin=2)
storage_format: t.Optional[InternalMatrixFormat] = None
for internal_format in InternalMatrixFormat:
matrix_path = self.bucket_dir.joinpath(f"{matrix_hash}.{internal_format}")
if matrix_path.exists():
storage_format = internal_format
break
if not storage_format:
raise FileNotFoundError(str(matrix_path.with_suffix("")))
matrix = storage_format.load_matrix(matrix_path)
matrix = matrix.reshape((1, 0)) if matrix.size == 0 else matrix
data = matrix.tolist()
index = list(range(matrix.shape[0]))
Expand All @@ -178,15 +187,15 @@ def exists(self, matrix_hash: str) -> bool:
Returns:
`True` if the matrix exist else `None`.
"""
matrix_file = self.bucket_dir.joinpath(f"{matrix_hash}.tsv")
return matrix_file.exists()
for internal_format in InternalMatrixFormat:
matrix_path = self.bucket_dir.joinpath(f"{matrix_hash}.{internal_format}")
if matrix_path.exists():
return True
return False

def save(self, content: t.Union[t.List[t.List[MatrixData]], npt.NDArray[np.float64]]) -> str:
"""
Saves the content of a matrix as a TSV file in the bucket directory
and returns its SHA256 hash.

The matrix content will be saved in a TSV file format, where each row represents
The matrix content will be saved in the repository given format, where each row represents
a line in the file and the values are separated by tabs. The file will be saved
in the bucket directory using a unique filename. The SHA256 hash of the NumPy array
is returned as a string.
Expand All @@ -197,7 +206,7 @@ def save(self, content: t.Union[t.List[t.List[MatrixData]], npt.NDArray[np.float
or a NumPy array of type np.float64.

Returns:
The SHA256 hash of the saved TSV file.
The SHA256 hash of the saved matrix file.

Raises:
ValueError:
Expand All @@ -215,18 +224,31 @@ def save(self, content: t.Union[t.List[t.List[MatrixData]], npt.NDArray[np.float
# for a non-mutable NumPy Array.
matrix = content if isinstance(content, np.ndarray) else np.array(content, dtype=np.float64)
matrix_hash = hashlib.sha256(matrix.data).hexdigest()
matrix_file = self.bucket_dir.joinpath(f"{matrix_hash}.tsv")
# Avoid having to save the matrix again (that's the whole point of using a hash).
if not matrix_file.exists():
# Ensure exclusive access to the matrix file between multiple processes (or threads).
lock_file = matrix_file.with_suffix(".tsv.lock")
with FileLock(lock_file, timeout=15):
if matrix.size == 0:
# If the array or dataframe is empty, create an empty file instead of
# traditional saving to avoid unwanted line breaks.
open(matrix_file, mode="wb").close()
else:
np.savetxt(matrix_file, matrix, delimiter="\t", fmt="%.18f")
matrix_path = self.bucket_dir.joinpath(f"{matrix_hash}.{self.format}")
if matrix_path.exists():
# Avoid having to save the matrix again (that's the whole point of using a hash).
return matrix_hash

lock_file = matrix_path.with_suffix(".tsv.lock") # use tsv lock to stay consistent with old data
for internal_format in InternalMatrixFormat:
matrix_in_another_format_path = self.bucket_dir.joinpath(f"{matrix_hash}.{internal_format}")
if matrix_in_another_format_path.exists():
# We want to migrate the old matrix in the given repository format.
# Ensure exclusive access to the matrix file between multiple processes (or threads).
with FileLock(lock_file, timeout=15):
data = internal_format.load_matrix(matrix_in_another_format_path)
df = pd.DataFrame(data)
self.format.save_matrix(df, matrix_path)
matrix_in_another_format_path.unlink()
return matrix_hash

# Ensure exclusive access to the matrix file between multiple processes (or threads).
with FileLock(lock_file, timeout=15):
if matrix.size == 0:
matrix_path.touch()
else:
df = pd.DataFrame(matrix)
self.format.save_matrix(df, matrix_path)

# IMPORTANT: Deleting the lock file under Linux can make locking unreliable.
# See https://github.com/tox-dev/py-filelock/issues/31
Expand All @@ -237,21 +259,26 @@ def save(self, content: t.Union[t.List[t.List[MatrixData]], npt.NDArray[np.float

def delete(self, matrix_hash: str) -> None:
"""
Deletes the TSV file containing the content of a matrix with the given SHA256 hash.
Deletes the matrix file containing the content of a matrix with the given SHA256 hash.

Parameters:
matrix_hash: The SHA256 hash of the matrix.

Raises:
FileNotFoundError: If the TSV file does not exist.
FileNotFoundError: If the matrix file does not exist.

Note:
This method also deletes any abandoned lock file.
"""
matrix_file = self.bucket_dir.joinpath(f"{matrix_hash}.tsv")
matrix_file.unlink()
possible_paths = [self.bucket_dir.joinpath(f"{matrix_hash}.{f}") for f in InternalMatrixFormat]
if not any(path.exists() for path in possible_paths):
raise FileNotFoundError(f"The matrix {matrix_hash} does not exist.")

for internal_format in InternalMatrixFormat:
matrix_path = self.bucket_dir.joinpath(f"{matrix_hash}.{internal_format}")
matrix_path.unlink(missing_ok=True)

# IMPORTANT: Deleting the lock file under Linux can make locking unreliable.
# Abandoned lock files are deleted here to maintain consistent behavior.
lock_file = matrix_file.with_suffix(".tsv.lock")
lock_file = matrix_path.with_suffix(".tsv.lock")
lock_file.unlink(missing_ok=True)
12 changes: 3 additions & 9 deletions antarest/service_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,9 @@ def create_event_bus(app_ctxt: t.Optional[AppBuildContext], config: Config) -> t
)


def create_core_services(app_ctxt: t.Optional[AppBuildContext], config: Config) -> t.Tuple[
ICache,
IEventBus,
ITaskService,
FileTransferManager,
LoginService,
MatrixService,
StudyService,
]:
def create_core_services(
app_ctxt: t.Optional[AppBuildContext], config: Config
) -> t.Tuple[ICache, IEventBus, ITaskService, FileTransferManager, LoginService, MatrixService, StudyService]:
event_bus, redis_client = create_event_bus(app_ctxt, config)
cache = build_cache(config=config, redis_client=redis_client)
filetransfer_service = build_filetransfer_service(app_ctxt, event_bus, config)
Expand Down
9 changes: 9 additions & 0 deletions docs/developer-guide/install/1-CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,15 @@ default:
- **Description:** Minutes before your study download will be cleared. The value could be less than the default one as a
user should download his study pretty soon after the download becomes available.

## **matrixstore_format**

- **Type:** String, possible values: `tsv`, `hdf`, `parquet` or `feather`
- **Default value:** `tsv`
- **Description:** Matrixstore internal storage format. `tsv` is the Antares studies format but to improve performance
and to reduce the disk space allocated to these matrices, you can choose other formats supported by the app.
It doesn't impact users as it's for internal usage only, matrices will be displayed the same way no matter the format.


```yaml
# example for storage settings
storage:
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pandas~=2.2.3
paramiko~=3.4.1
plyer~=2.0.0
psycopg2-binary~=2.9.9
pyarrow~=18.1.0
py7zr~=0.20.6
python-json-logger~=2.0.7
PyYAML~=5.3.1
Expand Down
6 changes: 2 additions & 4 deletions tests/conftest_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import pytest

from antarest.core.config import Config, StorageConfig, WorkspaceConfig
from antarest.core.config import Config, InternalMatrixFormat, StorageConfig, WorkspaceConfig
from antarest.core.interfaces.cache import ICache
from antarest.core.interfaces.eventbus import IEventBus
from antarest.core.requests import RequestParameters
Expand Down Expand Up @@ -145,9 +145,7 @@ def simple_matrix_service_fixture(bucket_dir: Path) -> SimpleMatrixService:
Returns:
An instance of the SimpleMatrixService class representing the matrix service.
"""
matrix_content_repository = MatrixContentRepository(
bucket_dir=bucket_dir,
)
matrix_content_repository = MatrixContentRepository(bucket_dir=bucket_dir, format=InternalMatrixFormat.TSV)
return SimpleMatrixService(matrix_content_repository=matrix_content_repository)


Expand Down
10 changes: 4 additions & 6 deletions tests/matrixstore/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

import pytest

from antarest.core.config import InternalMatrixFormat
from antarest.matrixstore.repository import MatrixContentRepository, MatrixDataSetRepository, MatrixRepository
from antarest.matrixstore.service import MatrixService

DEFAULT_INTERNAL_FORMAT = InternalMatrixFormat.TSV


@pytest.fixture(name="matrix_repo")
def matrix_repo_fixture() -> MatrixRepository:
Expand All @@ -30,7 +33,7 @@ def dataset_repo_fixture() -> MatrixDataSetRepository:

@pytest.fixture(name="content_repo")
def content_repo_fixture(tmp_path) -> MatrixContentRepository:
yield MatrixContentRepository(tmp_path.joinpath("content_repo"))
yield MatrixContentRepository(tmp_path.joinpath("content_repo"), format=DEFAULT_INTERNAL_FORMAT)


@pytest.fixture(name="matrix_service")
Expand All @@ -44,8 +47,3 @@ def matrix_service_fixture(matrix_repo, dataset_repo, content_repo) -> MatrixSer
config=unittest.mock.Mock(),
user_service=unittest.mock.Mock(),
)


@pytest.fixture(name="matrix_content_repo")
def matrix_content_repo_fixture(tmp_path) -> MatrixContentRepository:
yield MatrixContentRepository(bucket_dir=tmp_path.joinpath("matrix-store"))
Loading
Loading