Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 108 additions & 20 deletions python/grass/temporal/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,22 @@
:authors: Soeren Gebbert
"""

from __future__ import annotations

import re
import sys
from multiprocessing import Process

import grass.script as gs
from grass.exceptions import CalledModuleError

from .abstract_map_dataset import AbstractMapDataset

from .core import (
SQLDatabaseInterfaceConnection,
get_current_mapset,
get_tgis_message_interface,
get_tgis_db_version,
)
from .datetime_math import (
create_numeric_suffix,
Expand All @@ -31,6 +36,82 @@
############################################################################


def compile_new_map_name(
sp,
base: str,
count: int,
map_id: str,
semantic_label: str | None,
time_suffix: str | None,
dbif: SQLDatabaseInterfaceConnection,
):
"""Compile new map name with suffix and semantic label.

:param sp: An open SpaceTimeDataSet (STDS)
:param count: Running number of the map to be used as numeric suffix (if not time suffix)
:param map_id: Map ID to compile new map name for
:param time_suffix: Type of time suffix to use (or None)
:param dbif: initialized TGIS database interface
"""
if semantic_label:
base = f"{base}_{semantic_label}"
if (
sp.get_temporal_type() != "absolute"
or not time_suffix
or time_suffix.startswith("num")
):
return create_numeric_suffix(base, count, time_suffix)
old_map = sp.get_new_map_instance(map_id)
old_map.select(dbif)
if time_suffix == "gran":
suffix = create_suffix_from_datetime(
old_map.temporal_extent.get_start_time(), sp.get_granularity()
)
else:
suffix = create_time_suffix(old_map)
return f"{base}_{suffix}"


def replace_stds_names(expression: str, simple_name: str, full_name: str) -> str:
"""Safely replace simple with full STDS names.

When users provide inconsistent input for STDS in the expression
(with and without mapset componenet) or if the STDS name is part
of the name of other raster maps in the expression, the final
mapcalc expression may become invalid when the STDS name later is
replaced with the name of the individual maps in the time series.
The replacement with the fully qualified STDS names avoids that
confusion.

:param expression: The mapcalc expression to replace names in
:param simple_name: STDS name *without* mapset component
:param full_name: STDS name *with* mapset component
"""
separators = r""" ,-+*/^:&|"'`()<>#^"""
name_matches = re.finditer(simple_name, expression)
new_expression = ""
old_idx = 0
for match in name_matches:
# Fill-in expression component between matches
new_expression += expression[old_idx : match.start()]
# Only replace STDS name if pre- and succeeded by a separator
# Match is either at the start or preceeeded by a separator
if match.start() == 0 or expression[match.start() - 1] in separators:
# Match is either at the end or succeeded by a separator
if (
match.end() + 1 > len(expression)
or expression[match.end()] in separators
):
new_expression += full_name
else:
new_expression += simple_name
else:
new_expression += simple_name
old_idx = match.end()
new_expression += expression[old_idx:]
return new_expression


def extract_dataset(
input,
output,
Expand Down Expand Up @@ -82,13 +163,24 @@ def extract_dataset(
dbif = SQLDatabaseInterfaceConnection()
dbif.connect()

tgis_version = get_tgis_db_version()

sp = open_old_stds(input, type, dbif)
has_semantic_labels = bool(
tgis_version > 2 and type == "raster" and sp.metadata.semantic_labels
)

# Check the new stds
new_sp = check_new_stds(output, type, dbif, gs.overwrite())
if type == "vector":
rows = sp.get_registered_maps("id,name,mapset,layer", where, "start_time", dbif)
else:
rows = sp.get_registered_maps("id", where, "start_time", dbif)
rows = sp.get_registered_maps(
f"id{',semantic_label' if has_semantic_labels else ''}",
where,
"start_time",
dbif,
)

new_maps = {}
if rows:
Expand All @@ -102,33 +194,30 @@ def extract_dataset(
proc_count = 0
proc_list = []

# Make sure STRDS is in the expression referenced with fully qualified name
expression = replace_stds_names(
expression, sp.base.get_name(), sp.base.get_map_id()
)
for row in rows:
count += 1

if count % 10 == 0:
msgr.percent(count, num_rows, 1)

if sp.get_temporal_type() == "absolute" and time_suffix == "gran":
old_map = sp.get_new_map_instance(row["id"])
old_map.select(dbif)
suffix = create_suffix_from_datetime(
old_map.temporal_extent.get_start_time(), sp.get_granularity()
)
map_name = "{ba}_{su}".format(ba=base, su=suffix)
elif sp.get_temporal_type() == "absolute" and time_suffix == "time":
old_map = sp.get_new_map_instance(row["id"])
old_map.select(dbif)
suffix = create_time_suffix(old_map)
map_name = "{ba}_{su}".format(ba=base, su=suffix)
else:
map_name = create_numeric_suffix(base, count, time_suffix)
map_name = compile_new_map_name(
sp,
base,
count,
row["id"],
row["semantic_label"] if has_semantic_labels else None,
time_suffix,
dbif,
)

# We need to modify the r(3).mapcalc expression
if type != "vector":
expr = expression
expr = expr.replace(sp.base.get_map_id(), row["id"])
expr = expr.replace(sp.base.get_name(), row["id"])

expr = "%s = %s" % (map_name, expr)

# We need to build the id
Expand Down Expand Up @@ -273,9 +362,8 @@ def extract_dataset(

if type == "raster":
# Set the semantic label
semantic_label = old_map.metadata.get_semantic_label()
if semantic_label is not None:
new_map.set_semantic_label(semantic_label)
if has_semantic_labels:
new_map.set_semantic_label(row["semantic_label"])

# Insert map in temporal database
new_map.insert(dbif)
Expand Down
199 changes: 199 additions & 0 deletions temporal/t.rast.extract/tests/test_t_rast_extract_pytest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
"""Test t.rast.extract.

This test checks that t.rast.extract works also when
a) a fully qualified name is used in the expression,
b) it is run with a STRDS from another mapset as input and
c) the STRDS contains maps with identical temporal extent but with
different semantic labels

(C) 2025 by the GRASS Development Team
This program is free software under the GNU General Public
License (>=v2). Read the file COPYING that comes with GRASS
for details.

@author Stefan Blumentrath
"""

import os
import shutil
from pathlib import Path

import grass.script as gs
import pytest
from grass.tools import Tools


@pytest.fixture(scope="module")
def session(tmp_path_factory):
"""Pytest fixture to create a temporary GRASS project with a STRDS.

This setup initializes the GRASS environment, sets the computational region,
creates a series of raster maps with semantic labels, and rgisters
the maps in a new SpaceTimeRasterDataSet (STRDS).

Yields:
session: active GRASS session with the prepared project and STRDS.

"""
# Create a single temporary project directory once per module
tmp_path = tmp_path_factory.mktemp("raster_time_series")

# Remove if exists (defensive cleanup)
if tmp_path.exists():
shutil.rmtree(tmp_path)

gs.create_project(tmp_path)

with gs.setup.init(
tmp_path.parent,
location=tmp_path.name,
env=os.environ.copy(),
) as session:
tools = Tools(session=session)
tools.g_mapset(mapset="perc", flags="c")
tools.g_gisenv(set="TGIS_USE_CURRENT_MAPSET=1")
tools.g_region(s=0, n=80, w=0, e=120, b=0, t=50, res=10, res3=10)
register_strings = []
for i in range(1, 4):
for label in ["A", "B"]:
mapname = f"perc_{label}_{i}"
semantic_label = f"S2{label}_{i}"
tools.r_mapcalc(expression=f"{mapname} = {i}00", overwrite=True)
tools.r_semantic_label(
map=mapname,
semantic_label=semantic_label,
overwrite=True,
)
register_strings.append(f"{mapname}|2025-08-0{i}|{semantic_label}\n")

tools.t_create(
type="strds",
temporaltype="absolute",
output="perc",
title="A test",
description="A test",
overwrite=True,
)
tmp_file = tools.g_tempfile(pid=os.getpid()).text
Path(tmp_file).write_text("".join(register_strings), encoding="UTF8")
tools.t_register(
type="raster",
input="perc",
file=tmp_file,
overwrite=True,
)
yield session


def test_selection(session):
"""Perform a simple selection by datetime."""
tools = Tools(session=session)
tools.t_rast_extract(
input="perc",
output="perc_1",
where="start_time > '2025-08-01'",
verbose=True,
overwrite=True,
)


def test_selection_and_expression(session):
"""Perform a selection and a r.mapcalc expression with full name."""
result = "perc_2"
tools = Tools(session=session)
tools.t_rast_extract(
input="perc",
output=result,
where="start_time > '2025-08-01'",
expression=" if(perc@perc>=200,perc@perc,null())",
basename=result,
nprocs=2,
verbose=True,
overwrite=True,
)
strds_info = tools.t_info(input=result, flags="g").keyval
expected_info = {
"start_time": "'2025-08-02 00:00:00'",
"end_time": "'2025-08-03 00:00:00'",
"name": result,
"min_min": 200.0,
"min_max": 300.0,
"max_min": 200.0,
"max_max": 300.0,
"aggregation_type": "None",
"number_of_semantic_labels": 4,
"semantic_labels": "S2A_2,S2A_3,S2B_2,S2B_3",
"number_of_maps": 4,
}
for k, v in expected_info.items():
assert strds_info[k] == v, (
f"Expected value for key '{k}' is {v}. Got: {strds_info[k]}"
)


def test_inconsistent_selection_and_expression(session):
"""Perform a selection and a r.mapcalc expression with simple and full name."""
result = "perc_3"
tools = Tools(session=session)
tools.t_rast_extract(
input="perc",
output=result,
where="start_time > '2025-08-01'",
expression=" if(perc>=200,perc@perc,null())",
basename=result,
nprocs=2,
verbose=True,
overwrite=True,
)
strds_info = tools.t_info(input=result, flags="g").keyval
expected_info = {
"start_time": "'2025-08-02 00:00:00'",
"end_time": "'2025-08-03 00:00:00'",
"name": result,
"min_min": 200.0,
"min_max": 300.0,
"max_min": 200.0,
"max_max": 300.0,
"aggregation_type": "None",
"number_of_semantic_labels": 4,
"semantic_labels": "S2A_2,S2A_3,S2B_2,S2B_3",
"number_of_maps": 4,
}
for k, v in expected_info.items():
assert strds_info[k] == v, (
f"Expected value for key '{k}' is {v}. Got: {strds_info[k]}"
)


def test_selection_and_expression_simple_name(session):
"""Perform a selection and a r.mapcalc expression with simple name."""
result = "perc_4"
tools = Tools(session=session)
tools.t_rast_extract(
input="perc",
output=result,
where="start_time > '2025-08-01'",
expression=" if(perc>=200,perc@perc,null())",
basename=result,
nprocs=2,
verbose=True,
overwrite=True,
)
strds_info = tools.t_info(input=result, flags="g").keyval
expected_info = {
"start_time": "'2025-08-02 00:00:00'",
"end_time": "'2025-08-03 00:00:00'",
"name": result,
"min_min": 200.0,
"min_max": 300.0,
"max_min": 200.0,
"max_max": 300.0,
"aggregation_type": "None",
"number_of_semantic_labels": 4,
"semantic_labels": "S2A_2,S2A_3,S2B_2,S2B_3",
"number_of_maps": 4,
}
for k, v in expected_info.items():
assert strds_info[k] == v, (
f"Expected value for key '{k}' is {v}. Got: {strds_info[k]}"
)
Loading