Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pipelines/run_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def create_cluster():
cluster = coiled.Cluster(
name="gnw_zonal_stat_count",
region="us-east-1",
n_workers=1,
n_workers=10,
tags={"project": "gnw_zonal_stat"},
scheduler_vm_types=["r7g.xlarge"],
worker_vm_types=["r7g.2xlarge"],
Expand Down
39 changes: 22 additions & 17 deletions pipelines/test/integration/tree_cover_loss/test_tcl_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,12 @@
import geopandas as gpd
import pytest
from prefect.testing.utilities import prefect_test_harness
from shapely.geometry import box, shape
from shapely.geometry import shape

from pipelines.test.integration.tree_cover_loss.conftest import (
ARG_1_28,
FakeQCRepository,
MatchingGoogleEarthEngineDatasetRepository,
)
from pipelines.tree_cover_loss.prefect_flows.tcl import (
compute_tree_cover_loss,
umd_tree_cover_loss,
)
from pipelines.tree_cover_loss.prefect_flows.tcl_flow import umd_tree_cover_loss_flow
from pipelines.tree_cover_loss.stages import TreeCoverLossTasks


@pytest.mark.integration
Expand Down Expand Up @@ -68,9 +61,15 @@ def test_tcl_flow_real_data(mock_qc_load, mock_qc_write_results, mock_save_parqu


@pytest.mark.integration
@patch(
"pipelines.tree_cover_loss.stages.qc_against_validation_source", return_value=True
)
@patch("pipelines.prefect_flows.common_stages._save_parquet")
@patch("pipelines.tree_cover_loss.stages._load_zarr")
def test_tcl_flow_with_new_contextual_layers(
mock_load_zarr,
mock_save_parquet,
mock_qc,
tcl_ds,
pixel_area_ds,
carbon_emissions_ds,
Expand All @@ -95,14 +94,12 @@ def test_tcl_flow_with_new_contextual_layers(
country_ds,
region_ds,
subregion_ds,
] * 2
]

result_df = umd_tree_cover_loss(
TreeCoverLossTasks(
gee_repository=MatchingGoogleEarthEngineDatasetRepository(),
qc_feature_repository=FakeQCRepository(),
)
)
with prefect_test_harness():
umd_tree_cover_loss_flow("test", overwrite=True)

result_df = mock_save_parquet.call_args[0][0]

# verify expected cols
expected_columns = {
Expand All @@ -127,9 +124,15 @@ def test_tcl_flow_with_new_contextual_layers(
assert result_df.size == 120


@patch(
"pipelines.tree_cover_loss.stages.qc_against_validation_source", return_value=True
)
@patch("pipelines.prefect_flows.common_stages._save_parquet")
@patch("pipelines.tree_cover_loss.stages._load_zarr")
def test_tcl_flow_with_bbox(
mock_load_zarr,
mock_save_parquet,
mock_qc,
tcl_ds,
pixel_area_ds,
carbon_emissions_ds,
Expand All @@ -142,7 +145,6 @@ def test_tcl_flow_with_bbox(
region_ds,
subregion_ds,
):

mock_load_zarr.side_effect = [
tcl_ds,
pixel_area_ds,
Expand All @@ -158,7 +160,10 @@ def test_tcl_flow_with_bbox(
]

# filter to bottom left pixel
result_df = compute_tree_cover_loss(TreeCoverLossTasks(), bbox=box(0, 0, 0, 0))
with prefect_test_harness():
umd_tree_cover_loss_flow("test", overwrite=True, bbox=(0, 0, 0, 0))

result_df = mock_save_parquet.call_args[0][0]

# verify expected cols
expected_columns = {
Expand Down
138 changes: 82 additions & 56 deletions pipelines/test/unit/tree_cover_loss/test_qc.py
Original file line number Diff line number Diff line change
@@ -1,93 +1,119 @@
from unittest.mock import patch

import numpy as np
import pandas as pd
from shapely.geometry import box

from pipelines.test.integration.tree_cover_loss.conftest import (
FakeGoogleEarthEngineDatasetRepository,
FakeQCRepository,
)
from pipelines.tree_cover_loss.stages import TreeCoverLossTasks
from pipelines.tree_cover_loss import stages


def _make_result_df(area_ha, canopy_cover, driver, natural_forest_class):
return pd.DataFrame(
{
"area_ha": area_ha,
"tree_cover_loss_year": [2021, 2022],
"canopy_cover": canopy_cover,
"driver": driver,
"aoi_id": ["AFG.1.1", "AFG.1.1"],
"natural_forest_class": natural_forest_class,
}
)


def test_tcl_validation_flow():
result_df = _make_result_df(
[100.0, 200.0],
["30", "30"],
["Agriculture", "Permanent settlement"],
["Natural Forest", "Non-natural Forest"],
)

with patch(
"pipelines.tree_cover_loss.stages.TreeCoverLossTasks.get_sample_statistics"
) as mock_sample, patch(
"pipelines.tree_cover_loss.stages.TreeCoverLossTasks.get_validation_statistics"
"pipelines.tree_cover_loss.stages.get_validation_statistics"
) as mock_validation:
tasks = TreeCoverLossTasks(qc_feature_repository=FakeQCRepository())
mock_sample.return_value = pd.DataFrame(
{
"area_ha": [100.0, 200.0],
"tree_cover_loss_year": [2021, 2022],
"canopy_cover": ["30", "30"],
"driver": ["Agriculture", "Permanent settlement"],
"aoi_id": ["AFG.1.1", "AFG.1.1"],
"natural_forest_class": ["Natural Forest", "Non-natural Forest"],
}
)
mock_validation.return_value = {
"driver_results": pd.DataFrame({"area_ha": [100.0, 200.0]}),
"natural_forests_results": pd.DataFrame({"area_ha": [100.0, 200.0]}),
}

assert tasks.qc_against_validation_source() is True
assert (
stages.qc_against_validation_source(
result_df,
qc_feature_repository=FakeQCRepository(),
)
is True
)

mock_validation.return_value = {
"driver_results": pd.DataFrame({"area_ha": [100.0, 150.0]}),
"natural_forests_results": pd.DataFrame({"area_ha": [100.0, 150.0]}),
}
assert tasks.qc_against_validation_source() is False

mock_sample.return_value = pd.DataFrame(
{
"area_ha": [100.0, 200.0],
"tree_cover_loss_year": [2021, 2022],
"canopy_cover": ["10", "30"],
"driver": ["Agriculture", "Permanent settlement"],
"aoi_id": ["AFG.1.1", "AFG.1.1"],
"natural_forest_class": ["Natural Forest", "Non-natural Forest"],
}
)
assert tasks.qc_against_validation_source() is False

mock_sample.return_value = pd.DataFrame(
{
"area_ha": [100.0, 200.0],
"tree_cover_loss_year": [2021, 2022],
"canopy_cover": ["30", "30"],
"driver": [np.nan, "Permanent settlement"],
"aoi_id": ["AFG.1.1", "AFG.1.1"],
"natural_forest_class": ["Natural Forest", "Non-natural Forest"],
}
assert (
stages.qc_against_validation_source(
result_df,
qc_feature_repository=FakeQCRepository(),
)
is False
)
assert bool(tasks.qc_against_validation_source()) is False

# canopy_cover below 30 → driver filter yields less area → should fail
result_df_low_canopy = _make_result_df(
[100.0, 200.0],
["10", "30"],
["Agriculture", "Permanent settlement"],
["Natural Forest", "Non-natural Forest"],
)

def test_get_sample_statistics_accepts_injected_geometry_lookup():
geom = box(0, 0, 1, 1)
expected = pd.DataFrame({"area_ha": [123.0]})

with patch("pipelines.tree_cover_loss.stages.compute_tree_cover_loss") as mock_flow:
mock_flow.return_value = expected
tasks = TreeCoverLossTasks()

result = tasks.get_sample_statistics(geom)
with patch(
"pipelines.tree_cover_loss.stages.get_validation_statistics"
) as mock_validation:
mock_validation.return_value = {
"driver_results": pd.DataFrame({"area_ha": [100.0, 200.0]}),
"natural_forests_results": pd.DataFrame({"area_ha": [100.0, 200.0]}),
}
assert (
stages.qc_against_validation_source(
result_df_low_canopy,
qc_feature_repository=FakeQCRepository(),
)
is False
)

assert result is expected
# "Unknown" driver → filtered out by driver != "Unknown" check → should fail
result_df_unknown_driver = _make_result_df(
[100.0, 200.0],
["30", "30"],
["Unknown", "Permanent settlement"],
["Natural Forest", "Non-natural Forest"],
)

mock_flow.assert_called_once()
_, kwargs = mock_flow.call_args
assert kwargs["bbox"] is geom
with patch(
"pipelines.tree_cover_loss.stages.get_validation_statistics"
) as mock_validation:
mock_validation.return_value = {
"driver_results": pd.DataFrame({"area_ha": [100.0, 200.0]}),
"natural_forests_results": pd.DataFrame({"area_ha": [100.0, 200.0]}),
}
assert (
bool(
stages.qc_against_validation_source(
result_df_unknown_driver,
qc_feature_repository=FakeQCRepository(),
)
)
is False
)


def test_get_validation_statistics_with_fake_repo():
geom = box(0, 0, 1, 1)
tasks = TreeCoverLossTasks(gee_repository=FakeGoogleEarthEngineDatasetRepository())

result = tasks.get_validation_statistics(geom)["driver_results"]
result = stages.get_validation_statistics(
geom, gee_repository=FakeGoogleEarthEngineDatasetRepository()
)["driver_results"]

expected = pd.DataFrame({"driver": [1.0, 3.0], "area_ha": [2.0, 2.0]})
pd.testing.assert_frame_equal(result, expected, check_dtype=False)
11 changes: 4 additions & 7 deletions pipelines/test/unit/tree_cover_loss/test_result_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sparse
import xarray as xr

from pipelines.tree_cover_loss.stages import TreeCoverLossTasks
from pipelines.tree_cover_loss import stages


def _create_sparse_dataarray(
Expand Down Expand Up @@ -40,8 +40,7 @@ def test_create_result_dataframe_same_sparsity_pattern():
shape=(3, 2),
)

tasks = TreeCoverLossTasks()
df = tasks.create_result_dataframe(result_dataarray)
df = stages.create_result_dataframe(result_dataarray)

# verify correct shape and columns
assert len(df) == 3, f"expected 3 rows, got {len(df)}"
Expand Down Expand Up @@ -83,8 +82,7 @@ def test_create_result_dataframe_mismatched_sparsity():
shape=(3, 2),
)

tasks = TreeCoverLossTasks()
df = tasks.create_result_dataframe(result_dataarray)
df = stages.create_result_dataframe(result_dataarray)

assert len(df) == 3, f"expected 3 rows (union of all coordinates), got {len(df)}"

Expand Down Expand Up @@ -129,8 +127,7 @@ def test_create_result_dataframe_output_schema():
shape=(2, 2),
)

tasks = TreeCoverLossTasks()
df = tasks.create_result_dataframe(result_dataarray)
df = stages.create_result_dataframe(result_dataarray)

# verify column names and dtypes
expected_columns = {"tree_cover_loss_year", "country", "area_ha", "carbon_Mg_CO2e"}
Expand Down
8 changes: 3 additions & 5 deletions pipelines/test/unit/tree_cover_loss/test_setup_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import numpy as np
import xarray as xr

from pipelines.tree_cover_loss.stages import TreeCoverLossTasks
from pipelines.tree_cover_loss import stages


def _create_mock_datasets(shape=(2, 2)):
Expand Down Expand Up @@ -90,8 +90,7 @@ def _create_mock_datasets(shape=(2, 2)):
def test_setup_compute_groupby_schema_and_order():
"""Test that groupby has correct column names, order, and dtypes"""
datasets = _create_mock_datasets()
tasks = TreeCoverLossTasks()
mask, groupbys, _ = tasks.setup_compute(datasets, expected_groups=None)
mask, groupbys, _ = stages.setup_compute(datasets, expected_groups=None)

expected_schema = [
(0, "tree_cover_loss_year", np.uint8),
Expand Down Expand Up @@ -120,8 +119,7 @@ def test_setup_compute_groupby_schema_and_order():

def test_setup_compute_creates_concat_dataarray():
datasets = _create_mock_datasets()
tasks = TreeCoverLossTasks()
mask, groupbys, _ = tasks.setup_compute(datasets, expected_groups=None)
mask, groupbys, _ = stages.setup_compute(datasets, expected_groups=None)

# verify layer dimension exists
assert (
Expand Down
Loading
Loading