Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 50 additions & 23 deletions meteofetch/_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import logging
import os
import threading
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pathlib import Path
Expand All @@ -15,6 +16,11 @@

logger = logging.getLogger(__name__)

# 🔴 FIX: verrou global pour protéger la modification de ECCODES_DEFINITION_PATH
# en contexte multi-thread (set_grib_defs() modifie os.environ, qui est partagé
# entre tous les threads du processus).
_grib_defs_lock = threading.Lock()


class ForecastNotAvailableError(RuntimeError):
"""Raised when no valid forecast run is found among the recent past runs."""
Expand Down Expand Up @@ -62,18 +68,22 @@ class ForecastNotAvailableError(RuntimeError):

def geo_encode_cf(da: xr.DataArray) -> xr.DataArray:
"""
Rend une DataArray conforme aux conventions CF (Climate and Forecast).
Retourne une copie de la DataArray conforme aux conventions CF (Climate and Forecast).

Cette fonction ajoute les attributs et encodages nécessaires pour que la DataArray
soit compatible avec les outils respectant les conventions CF. Elle inclut la compression,
les informations de référence spatiale, et les coordonnées géographiques.

Args:
da (xr.DataArray): La DataArray à modifier pour la rendre conforme aux conventions CF.
da (xr.DataArray): La DataArray source.

Returns:
xr.DataArray: La DataArray modifiée avec les attributs et encodages CF ajoutés.
xr.DataArray: Une copie de la DataArray avec les attributs et encodages CF ajoutés.
"""
# 🟡 FIX: on travaille sur une copie pour éviter la mutation silencieuse de l'objet
# original. L'ancienne version mutait `da` en place ET la retournait, ce qui pouvait
# induire l'appelant en erreur en lui faisant croire qu'il recevait un nouvel objet.
da = da.copy()
da.encoding.update(
{
"zlib": True,
Expand All @@ -98,24 +108,40 @@ def set_grib_defs(source: Literal["eccodes", "meteofrance"]) -> None:
source: "eccodes" to use the bundled eccodes definitions (default upstream behaviour),
or "meteofrance" to use the Météo-France-specific definitions shipped with
this package (required for some MeteoFrance model fields).
"""
current_path = os.environ.get("ECCODES_DEFINITION_PATH")

if source == "eccodes":
required_path = None
elif source == "meteofrance":
required_path = str(Path(__file__).parent / "gribdefs")
else:
raise ValueError(f"Source inconnue : {source}")
Raises:
ValueError: If *source* is not one of the accepted values.

Note:
This function modifies a process-wide environment variable and is protected
by a threading lock. It is safe to call from multiple threads, but concurrent
calls will serialize. Avoid calling this function while GRIB reads are in
progress in other threads, as the definition path change takes effect
immediately for all subsequent eccodes operations.
"""
# 🔴 FIX: acquisition du verrou pour gérer le multithread (si deux threads
# appellent set_grib_defs() simultanément ou si un thread lit des GRIBs pendant
# qu'un autre change le path).
with _grib_defs_lock:
current_path = os.environ.get("ECCODES_DEFINITION_PATH")

if current_path != required_path:
if source == "eccodes":
os.environ.pop("ECCODES_DEFINITION_PATH", None)
required_path = None
elif source == "meteofrance":
required_path = str(Path(__file__).parent / "gribdefs")
else:
assert isinstance(required_path, str)
os.environ["ECCODES_DEFINITION_PATH"] = required_path
print(f"Définitions GRIB mises à jour : {source}")
eccodes.codes_context_delete()
raise ValueError(f"Source inconnue : {source!r}. Valeurs acceptées : 'eccodes', 'meteofrance'.")

if current_path != required_path:
if source == "eccodes":
os.environ.pop("ECCODES_DEFINITION_PATH", None)
else:
assert isinstance(required_path, str)
os.environ["ECCODES_DEFINITION_PATH"] = required_path
# 🟡 FIX: utilisation de logger à la place de print() pour uniformiser
# avec le reste du package .
logger.info("Définitions GRIB mises à jour : %s", source)
eccodes.codes_context_delete()


def set_test_mode() -> None:
Expand All @@ -127,7 +153,8 @@ def set_test_mode() -> None:
downloading or storing real meteorological data.
"""
os.environ["METEOFETCH_TEST_MODE"] = "1"
print("Test mode enabled. DataArray values are replaced with isnull() booleans.")
# 🟡 FIX: logger.info au lieu de print()
logger.info("Test mode enabled. DataArray values are replaced with isnull() booleans.")


def is_downloadable(url: str, return_date: bool = False) -> Union[bool, datetime]:
Expand All @@ -143,6 +170,11 @@ def is_downloadable(url: str, return_date: bool = False) -> Union[bool, datetime

Returns:
``True`` / ``datetime`` on success, ``False`` otherwise.

Note:
A HEAD 200 response only confirms the resource exists on the server;
it does not guarantee the file is complete or uncorrupted. Full integrity
checking (e.g. checksum) must be performed after the actual download.
"""
logger.debug("Checking availability of %s", url)
try:
Expand Down Expand Up @@ -176,18 +208,13 @@ def are_downloadable(urls: List[str], return_date: bool = False) -> Union[bool,
``True`` / ``datetime`` if all URLs are downloadable, ``False`` otherwise.
"""
with ThreadPoolExecutor() as executor:
# Utiliser executor.map pour appliquer la fonction is_downloadable à chaque URL
results = list(executor.map(lambda url: is_downloadable(url, return_date), urls))

if return_date:
# Filtrer les résultats pour obtenir uniquement les dates valides
valid_dates = [result for result in results if isinstance(result, datetime)]
# Vérifier si toutes les URLs sont téléchargeables et si des dates valides sont présentes
if len(valid_dates) == len(urls):
# Renvoie la date maximale
return max(valid_dates)
else:
return False
else:
# Renvoie True si toutes les URLs sont téléchargeables, False sinon
return all(results)
Loading