Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions eodag/plugins/search/data_request_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def __init__(self, provider: str, config: PluginConfig) -> None:
self.config.pagination["next_page_url_key_path"] = string_to_jsonpath(
self.config.pagination.get("next_page_url_key_path", None)
)
self.download_info: Dict[str, Any] = {}

self.data_request_id = None

def discover_product_types(self) -> Optional[Dict[str, Any]]:
Expand Down Expand Up @@ -386,15 +386,6 @@ def _convert_result_data(
p.properties["orderLink"] = p.properties["orderLink"].replace(
"requestJobId", str(data_request_id)
)
if self.config.products[product_type].get("storeDownloadUrl", False):
# store download information to retrieve it later in case search by id
# is not possible
self.download_info[p.properties["id"]] = {
"requestJobId": data_request_id,
"orderLink": p.properties["orderLink"],
"downloadLink": p.properties["downloadLink"],
"provider": self.provider,
}
return products, total_items_nb

def _check_uses_custom_filters(self, product_type: str) -> bool:
Expand Down
1 change: 1 addition & 0 deletions eodag/resources/providers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1914,6 +1914,7 @@
S2_MSI_L1C:
productType: sentinel-s2-l1c
metadata_mapping:
id: '$.properties."sentinel:product_id"'
title: '$.properties."sentinel:product_id"'
platformSerialIdentifier: '$.id.`split(_, 0, -1)`'
polarizationMode: '$.id.`sub(/.{14}([A-Z]{2}).*/, \\1)`'
Expand Down
156 changes: 145 additions & 11 deletions eodag/rest/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import datetime
import logging
import os
import re
from typing import Any, Callable, Dict, List, Optional, cast
from typing import Any, Callable, Deque, Dict, List, Optional, cast

import boto3
import dateutil
from fastapi import Request
from fastapi.responses import StreamingResponse
Expand All @@ -46,7 +48,12 @@
get_next_link,
)
from eodag.rest.utils.rfc3339 import rfc3339_str_to_datetime
from eodag.utils import StreamResponse, _deprecated, dict_items_recursive_apply
from eodag.utils import (
DEFAULT_DEQUE_MAX_LEN,
StreamResponse,
_deprecated,
dict_items_recursive_apply,
)
from eodag.utils.exceptions import (
MisconfiguredError,
NoMatchingProductType,
Expand All @@ -67,6 +74,33 @@
}


class CacheRecord:
"""cache record"""

provider: str
product_type: str
results: SearchResult


max_len = int(
os.getenv("EODAG_SEARCH_RESULTS_CACHE_MAXLEN", str(DEFAULT_DEQUE_MAX_LEN))
)
cached_search_results: Deque[CacheRecord] = collections.deque(maxlen=max_len)
use_redis = False
if os.getenv("REDIS_HOST", None):
use_redis = True

if use_redis:
import pickle

import redis

# more config parameters could be added depending on redis used
REDIS_HOST = os.getenv("REDIS_HOST")
REDIS_PORT = os.getenv("REDIS_PORT", 6379)
redis_instance = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)


@_deprecated(reason="No more needed with STAC API + Swagger", version="2.6.1")
def get_home_page_content(base_url: str, ipp: Optional[int] = None) -> str:
"""Compute eodag service home page content
Expand Down Expand Up @@ -100,6 +134,34 @@ def format_product_types(product_types: List[Dict[str, Any]]) -> str:
return "\n".join(sorted(result))


def _add_to_cache(product_type: str, provider: Optional[str], products: SearchResult):
if len(products) == 0:
return
if use_redis:
for product in products:
auth = getattr(product, "downloader_auth", None)
if auth and getattr(auth, "s3_client", None):
auth.s3_client = None # cannot be pickled
try:
# remove register_downloader method type because it causes problems at cache retrieval
delattr(product, "register_downloader")
except AttributeError:
logger.debug("no register downloader to remove")
pickled_product = pickle.dumps(product)
if not provider:
provider = product.provider
cache_key = f"{product_type}_{provider}_{product.properties['id']}"
redis_instance.set(cache_key, pickled_product)
else:
cache_record = CacheRecord()
if not provider:
provider = products[0].provider
cache_record.provider = provider
cache_record.product_type = product_type
cache_record.results = products
cached_search_results.append(cache_record)


def search_stac_items(
request: Request,
search_request: SearchPostRequest,
Expand Down Expand Up @@ -183,6 +245,8 @@ def search_stac_items(
search_results = SearchResult([])
total = 0

_add_to_cache(eodag_args.productType, eodag_args.provider, search_results)

for record in search_results:
record.product_type = eodag_api.get_alias_from_product_type(record.product_type)

Expand All @@ -207,6 +271,73 @@ def search_stac_items(
return items


def _find_best_provider(providers: List[str], product_type) -> str:
if len(providers) == 1:
return providers[0]
plugins = eodag_api._plugins_manager.get_search_plugins(product_type)
for plugin in plugins:
if plugin.provider in providers:
return plugin.provider


def _recreate_s3_client(product: EOProduct):
auth = getattr(product, "auth", None)
if auth and hasattr(auth, "s3_client"):
base_uri = getattr(product, "downloader").config.base_uri
auth_dict = auth.authenticate()
auth.s3_client = boto3.client(
"s3",
endpoint_url=base_uri,
**auth_dict,
)


def _retrieve_from_cache(
provider: Optional[str], product_type: str, item_id: str
) -> Optional[EOProduct]:
if use_redis:
if provider:
cache_key = f"{product_type}_{provider}_{item_id}"
if redis_instance.exists(cache_key):
logger.debug("product %s retrieved from redis cache", item_id)
product = pickle.loads(redis_instance.get(cache_key))
_recreate_s3_client(product)
return product
else:
return None
else:
cache_key_pattern = f"{product_type}_*_{item_id}"
keys = redis_instance.scan_iter(cache_key_pattern)
providers = [
key.decode().replace(f"{product_type}_", "").replace(f"_{item_id}", "")
for key in keys
]
provider = _find_best_provider(providers, product_type)
cache_key = f"{product_type}_{provider}_{item_id}"
logger.debug(
"product %s of provider %s retrieved from redis cache",
item_id,
provider,
)
product = pickle.loads(redis_instance.get(cache_key))
_recreate_s3_client(product)
return product
else:
if not provider:
providers = [result.provider for result in cached_search_results]
provider = _find_best_provider(providers, product_type)
for stored_record in reversed(cached_search_results):
if (
stored_record.provider == provider
and product_type == stored_record.product_type
):
for p in stored_record.results:
if p.properties["id"] == item_id:
logger.debug("product %s retrieved from cache", item_id)
return p
return None


def download_stac_item(
catalogs: List[str],
item_id: str,
Expand All @@ -229,16 +360,19 @@ def download_stac_item(
"""
product_type = catalogs[0]

search_results, _ = eodag_api.search(
id=item_id, productType=product_type, provider=provider, **kwargs
)
if len(search_results) > 0:
product = cast(EOProduct, search_results[0])
else:
raise NotAvailableError(
f"Could not find {item_id} item in {product_type} collection"
+ (f" for provider {provider}" if provider else "")
product = _retrieve_from_cache(provider, product_type, item_id)
if not product:
logger.debug("product %s not found in cache, executing search by id", item_id)
search_results, _ = eodag_api.search(
id=item_id, productType=product_type, provider=provider, **kwargs
)
if len(search_results) > 0:
product = cast(EOProduct, search_results[0])
else:
raise NotAvailableError(
f"Could not find {item_id} item in {product_type} collection"
+ (f" for provider {provider}" if provider else "")
)

try:
download_stream = cast(
Expand Down
1 change: 1 addition & 0 deletions eodag/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
# Default maximum number of items per page requested by search_all. 50 instead of 20
# (DEFAULT_ITEMS_PER_PAGE) to increase it to the known and current minimum value (mundi)
DEFAULT_MAX_ITEMS_PER_PAGE = 50
DEFAULT_DEQUE_MAX_LEN = 100


def _deprecated(reason: str = "", version: Optional[str] = None) -> Callable[..., Any]:
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ install_requires =
annotated-types
setuptools
pygeofilter
redis

[options.extras_require]
dev =
Expand Down
38 changes: 38 additions & 0 deletions tests/units/test_http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from fastapi.testclient import TestClient
from shapely.geometry import box

from eodag import EOProduct
from eodag.config import PluginConfig
from eodag.plugins.authentication.base import Authentication
from eodag.plugins.download.base import Download
Expand Down Expand Up @@ -1377,6 +1378,43 @@ def test_cql_post_search(self):
},
)

@mock.patch(
"eodag.plugins.search.qssearch.QueryStringSearch.query",
autospec=True,
)
@mock.patch(
"eodag.plugins.authentication.generic.GenericAuth.authenticate",
autospec=True,
)
@mock.patch(
"eodag.plugins.download.http.HTTPDownload._stream_download_dict",
autospec=True,
)
def test_download_without_search_by_id(self, mock_download, mock_auth, mock_plugin):
product_id = "a_nice_product"
properties = {
"id": product_id,
"downloadLink": "https://bla.bli",
"orderLink": "https://bla.bli/blu",
"geometry": "-180 -90 180 90",
"title": product_id,
}
product = EOProduct("onda", properties, productType=self.tested_product_type)
mock_plugin.return_value = [product], 1
# search products
self.app.get(
f"search?collections={self.tested_product_type}&provider=onda&"
f"bbox=0,43,1,44&datetime=2018-01-20/2018-01-25",
follow_redirects=True,
)

mock_download.return_value = StreamResponse(content=iter([b""]))
# check that download of returned product is working without attempt to execute search
self._request_valid_raw(
f"collections/{self.tested_product_type}/items/a_nice_product/download?provider=onda",
search_call_count=0,
)

@mock.patch("eodag.rest.core.eodag_api.list_product_types", autospec=True)
@mock.patch("eodag.rest.core.eodag_api.guess_product_type", autospec=True)
def test_collection_free_text_search(self, guess_pt: Mock, list_pt: Mock):
Expand Down
Loading