Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
60a0804
GTC-3451 Add TCL pipeline
manukala6 Jan 6, 2026
816b544
Merge branch 'main' into feature/tcl_pipeline
manukala6 Jan 6, 2026
ddc4959
GTC-3451 Sum carbon emissions with pixel area for TCL
manukala6 Jan 9, 2026
baac141
Merge branch 'feature/tcl_pipeline' of https://github.com/wri/project…
manukala6 Jan 9, 2026
519f202
Merge branch 'main' into feature/tcl_pipeline
manukala6 Jan 9, 2026
4ab5970
GTC-3451 Add ifl, drivers, primary forest contextual layers
manukala6 Jan 9, 2026
e2a69a3
GTC-3451 Correct contextual column names
manukala6 Jan 12, 2026
89ba809
GTC-3451 Align schema with TCL parquet
manukala6 Jan 14, 2026
a28d145
GTC-3451 TCL unit tests
manukala6 Jan 21, 2026
824be97
Make test dataset repository still use geometry mask
jterry64 Jan 22, 2026
b665440
Merge branch 'main' of https://github.com/wri/project-zeno-data-infra
jterry64 Jan 23, 2026
7b325b4
GTC-3451 TCL integration test
manukala6 Jan 23, 2026
3fabb91
Merge branch 'main' into feature/tcl_pipeline
manukala6 Jan 23, 2026
e513f48
Merge branch 'main' of https://github.com/wri/project-zeno-data-infra
jterry64 Jan 26, 2026
b865b7e
GTC-3451 Refactor to use xr.concat() for handling mismatched sparsity…
manukala6 Jan 26, 2026
2cc3d0b
Merge branch 'feature/tcl_pipeline' of https://github.com/wri/project…
manukala6 Jan 26, 2026
0f3c246
GTC-3451 Linting fixes
manukala6 Jan 27, 2026
44cae5c
Refactor TCL pipeline to decouple prefect from main flow
jterry64 Feb 2, 2026
d89b2e8
Fix gitignore
jterry64 Feb 2, 2026
41c3419
Add bbox test
jterry64 Feb 2, 2026
01bf793
Merge branch 'main' of https://github.com/wri/project-zeno-data-infra
jterry64 Feb 2, 2026
a66d101
Fix TCL test
jterry64 Feb 2, 2026
e6079db
Merge with main
jterry64 Feb 2, 2026
ba32126
Fix test for bbox to actually filter
jterry64 Feb 2, 2026
b26e833
Fix circular dependency
jterry64 Feb 2, 2026
de3cdd3
Make validation unit tests
jterry64 Feb 3, 2026
71fb389
Create unit test for sample statistics
jterry64 Feb 3, 2026
50d2b1a
Use Xee and GEE repository
jterry64 Feb 4, 2026
94c789f
Read GEE service account creds from github secrets
jterry64 Feb 4, 2026
0f9ff23
Do QC on a geojson
jterry64 Feb 6, 2026
7de0f1e
Fix circular dependency, get full flow tested
jterry64 Feb 6, 2026
ad6b7ff
Fix main flow test for TCL to use QC
jterry64 Feb 9, 2026
e6535e4
Add full integration test
jterry64 Feb 9, 2026
22ca1bf
Get real data test working
jterry64 Feb 9, 2026
24f2efe
Fix tests
jterry64 Feb 11, 2026
088e4c6
Add GEE service account creds to test runs
jterry64 Feb 11, 2026
63d5e3d
Add GEE service account creds to test runs
jterry64 Feb 11, 2026
efc8669
Use base64 encoded GEE account creds
jterry64 Feb 11, 2026
dba6e79
Fix GEE initialization, make lazy
jterry64 Feb 11, 2026
e0dadbf
Add additional features for QC set
jterry64 Feb 17, 2026
5a1c714
Use fixture for QC features in test
jterry64 Feb 18, 2026
ad4ec9f
Add natural forests
jterry64 Feb 18, 2026
0e19032
Fix tests
jterry64 Feb 19, 2026
fe09f29
Merge branch 'add_natural_forest' into validation_pipeline
jterry64 Feb 19, 2026
7222f2c
Add changes for natural forest class
jterry64 Feb 19, 2026
4ce1be5
Merge with main
jterry64 Feb 19, 2026
cd0c169
Update TCL test with real data
jterry64 Feb 19, 2026
9c75c43
Refactor drivers gee call
jterry64 Feb 19, 2026
1b6f71b
Stop swallowing errors
jterry64 Feb 19, 2026
53dc60f
Return multiple validation
jterry64 Feb 19, 2026
1837cc7
Return dict
jterry64 Feb 19, 2026
9f3a0a7
Use dict
jterry64 Feb 19, 2026
ce65efe
Validate natural forests intersection
jterry64 Feb 24, 2026
ea27204
Fix test
jterry64 Feb 24, 2026
47512b6
Move all postprocessing into postprocess stage for TCL
jterry64 Feb 25, 2026
b6c4edd
Use aoi_id instead of country/region/subregion for TCL
jterry64 Feb 25, 2026
1ef75ba
Get tests passing
jterry64 Feb 25, 2026
73237d8
isolate prefect resources by environment
solomon-negusse Mar 2, 2026
746945b
Create TCL update script
jterry64 Mar 2, 2026
47ae82b
Add prefect deployment for tcl update
jterry64 Mar 2, 2026
562f4e8
Create flow options for run_updates
jterry64 Mar 2, 2026
a5dad61
Write out QC file
jterry64 Mar 2, 2026
e66cc47
Deal with corner case
jterry64 Mar 2, 2026
ad8a2b7
Deal with PR comments
jterry64 Mar 3, 2026
f45eb62
Fix PR comments, get tests passing
jterry64 Mar 3, 2026
68d5aa2
Use update flows
jterry64 Mar 3, 2026
c89e72d
separate out new relic monitoring by environment
solomon-negusse Mar 4, 2026
7cf1737
fix container definition
solomon-negusse Mar 4, 2026
638cb10
add type hint for flow types and minor fixes
solomon-negusse Mar 5, 2026
cb8bba4
Merge branch 'add_natural_forest' into isolated-envs
solomon-negusse Mar 5, 2026
7ec9276
surface pipeline update parameters in prefect UI for test runs
solomon-negusse Mar 5, 2026
9ebbfad
remove redundant main function
solomon-negusse Mar 5, 2026
fc957e8
fix tcl results location
solomon-negusse Mar 5, 2026
d514b38
Merge branch 'add_natural_forest' into isolated-envs
solomon-negusse Mar 5, 2026
dae042d
move sbtn natural forests to gnw account
solomon-negusse Mar 5, 2026
2a49419
move sbtn natural forests zarr to gnw account
solomon-negusse Mar 5, 2026
2212ca6
Merge branch 'main' into add_natural_forest
solomon-negusse Mar 5, 2026
0217945
fix docstring; add parameter description
solomon-negusse Mar 5, 2026
47a035f
add type hint for flow name and minor fixes
solomon-negusse Mar 5, 2026
4dab956
prefect cli
solomon-negusse Mar 9, 2026
7d22fe7
Merge branch 'main' into isolated-envs
solomon-negusse Mar 9, 2026
70702d5
Merge branch 'main' into isolated-envs
solomon-negusse Mar 20, 2026
477800e
add local pipeline instructions
solomon-negusse Mar 20, 2026
e4e436a
add prefect cloud pipeline run option
solomon-negusse Mar 20, 2026
b502fff
remove tfvars thats not ready for operational use
solomon-negusse Mar 20, 2026
0f645b2
add doc on staging deployment
solomon-negusse Mar 24, 2026
4ffa5c9
Merge branch 'main' into isolated-envs
solomon-negusse Mar 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/newrelic.ini
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ monitor_mode = false

[newrelic:staging]
app_name = GNW Data API (Staging)
monitor_mode = false
distributed_tracing.enabled = false
monitor_mode = true
distributed_tracing.enabled = true

[newrelic:production]
monitor_mode = true
Expand Down
182 changes: 162 additions & 20 deletions pipelines/README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,162 @@
===================================================

To run tests:

`cd pipelines`
`uv sync` [if not already done]
`source .venv/bin/activate` [if not already activated]
`AWS_PROFILE=<GFW Data API production profile> pytest test`

===================================================

If you want to run the prefect server and worker tasks locally (but with
xarray.reduce tasks still in coiled), from the top level of the repo do:
`cd pipelines`
`uv sync` [if not already done]
`source .venv/bin/activate` [if not already activated]
`coiled login`
`cd ..`
`prefect server start` (in a separate window, also activated)
`AWS_PROFILE=<zeno profile> python -m pipelines.run_updates`
# Pipelines

## Running Tests

```bash
cd pipelines
uv sync # if not already done
source .venv/bin/activate # if not already activated
AWS_PROFILE=<GFW Data API production profile> pytest test
```

## Running Pipeline Code Locally

### AWS Permissions

Pipelines interact with S3 resources in the Zeno AWS account. Make sure you have a valid AWS profile configured with the necessary permissions before running any pipeline:

```bash
export AWS_PROFILE=<zeno-profile>
```

### Coiled (Dask Cluster)

Pipeline zonal stats computations run on a Dask cluster managed by [Coiled](https://cloud.coiled.io/). If not already logged in, authenticate from the terminal:

```bash
coiled login
```

Coiled syncs your local Python environment to the remote Dask scheduler and workers. Before running a pipeline, make sure:

1. **Activate the pipelines virtual environment** — the Coiled cluster is provisioned using packages from your active environment.
2. **Keep the environment up to date** — run `uv sync` so the packages Coiled syncs to the cluster match what the pipeline code expects.

```bash
cd pipelines
uv sync
source .venv/bin/activate
```

> **⚠ Architecture note:** The Coiled cluster in `run_updates.py` uses ARM-based instances (`r7g` family). If your local machine is not ARM (e.g. Intel/AMD x86_64), you may encounter package incompatibilities when Coiled attempts to sync your environment. Running from an Apple Silicon Mac or another ARM host avoids this issue.

### Prefect

To monitor pipeline flows via the Prefect UI, there are two options depending on what's configured in `~/.prefect/profiles.toml`:

#### Option 1: Local Prefect Server

Spin up a local Prefect server in a separate terminal session:

```bash
prefect server start
```

The Prefect dashboard will be available at http://127.0.0.1:4200 and local pipeline runs will show up there.

#### Option 2: Prefect Cloud

If the active profile in `~/.prefect/profiles.toml` points to a Prefect Cloud workspace, runs will appear in the cloud dashboard instead. Example `profiles.toml`:

```toml
active = "local"

[profiles.local]
PREFECT_API_URL = "https://api.prefect.cloud/api/accounts/<account-id>/workspaces/<workspace-id>"
PREFECT_API_KEY = "*********"
```

### Running a Pipeline

Pipelines are launched via the Click CLI in `run_updates.py`. Run with `--help` to see all available options:

```bash
python -m pipelines.run_updates --help
```

```
Usage: python -m pipelines.run_updates [OPTIONS]

Options:
--flow [dist_update|tcl_update]
Which update flow to run.
--version TEXT Dataset version (required for tcl_update).
--overwrite Overwrite existing outputs.
--is-latest Mark this version as latest.
--help Show this message and exit.
```

Example — run the DIST alerts update:

```bash
python -m pipelines.run_updates --flow dist_update --version v20260301
```

## Running Pipeline from Prefect Cloud UI

### Architecture Overview

![Pipeline Architecture](pipeline_arch.png)

Pipelines are deployed to **Prefect Cloud** and execute on **AWS ECS Fargate**. The infrastructure is managed via Terraform (see `terraform/prefect.tf`). Here's how the pieces fit together:

1. A **Deployment** (defined in Terraform) schedules a job or a user triggers a run from the Dashboard UI.
2. The job is submitted to an **ECS Work Pool**, which defines the Fargate task template (CPU, memory, container image, IAM role).
3. A **Worker** running continuously in the ECS cluster polls the work pool for pending jobs and spawns an ephemeral **Flow Runner Task** on Fargate.
4. The Flow Runner executes the pipeline code and reports status and logs back to the Prefect Server, visible in the Dashboard UI.

### Pipeline Docker Image

Both the ECS Fargate task and the Coiled Dask cluster require a Docker image containing the pipeline code and dependencies. This image is configured via the `PIPELINES_IMAGE` environment variable:

- **ECS (Prefect):** The deployment's job variables in Terraform set `image` to the value of `var.pipelines_image`, which is also passed as the `PIPELINES_IMAGE` env var inside the container.
- **Coiled:** The `create_cluster()` task in `run_updates.py` reads `os.getenv("PIPELINES_IMAGE")` to specify the container image for the Dask scheduler and workers.

Make sure the image is up to date and pushed to the container registry before triggering a run.

### Triggering a Run from the Dashboard

There are two deployments available:

| Deployment | Purpose |
|---|---|
| **gnw-zonal-stats-update** | Production — runs against production resources. |
| **gnw-zonal-stats-update-staging** | Staging — runs against staging resources. Allows you to override the Docker image used by the flow runner and Coiled cluster (see below). |

> **⚠ Warning:** The staging deployment is **not fully isolated** from production. It shares the same AWS account and can access production S3 buckets and other resources. Take care when running staging flows — use the **overwrite** and **is_latest** flags with caution, as they can modify or replace production data. Always double-check parameters before submitting a run.

1. Open the [Prefect Cloud Dashboard](https://app.prefect.cloud/).
2. Navigate to **Deployments** and find **gnw-zonal-stats-update** (production) or **gnw-zonal-stats-update-staging** (staging).
3. Click **Run** → **Custom Run** to configure parameters:

![Find the deployment and trigger a custom run](prefect_cloud_1.png)

- **flow_name** — `dist_update` or `tcl_update`
- **version** — the dataset version (e.g. `v20260301`; required for `tcl_update`)
- **overwrite** — set to `true` to re-run over existing outputs
- **is_latest** — set to `true` to mark this version as the latest served by the analytics API

![Configure run parameters](prefect_cloud_2.png)

4. Submit the run. The ECS worker will pick it up and execute it on Fargate.

#### Updating the Docker Image for Staging

The **gnw-zonal-stats-update-staging** deployment lets you override the ECR image used by both the ECS flow runner task and the Coiled Dask cluster. This is useful for testing a new image before promoting it to production.

To change the image:

1. Navigate to **Deployments** and find **gnw-zonal-stats-update-staging**.
2. Click the **three-dot menu (⋯)** in the top-right corner and select **Edit**.

![Edit the deployment](prefect_cloud_3.png)

3. In the **Job Variables** section, update the following values with the desired ECR image URI:
- **`image`** — the container image for the ECS flow runner task.
- **`PIPELINES_IMAGE`** — the container image passed to the Coiled Dask cluster.

4. Save the deployment. Subsequent runs will use the updated image for both the ECS task and the Coiled cluster.

### Automatic Triggers

A **webhook + automation** is configured so that when a new DIST alerts version is published, the `dist_update` flow is triggered automatically with the version from the event payload. This is defined in `terraform/prefect.tf` as the `run-gnw-zonal-stats-on-dist-update` automation.
Binary file added pipelines/pipeline_arch.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added pipelines/prefect_cloud_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added pipelines/prefect_cloud_2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added pipelines/prefect_cloud_3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion pipelines/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ description = "Data analytics pipelines for Zeno"
requires-python = ">=3.12"
dependencies = [
"boto3",
"click",
"coiled",
"dask[array,dataframe,distributed,diagnostics]",
"fastparquet",
Expand All @@ -26,7 +27,7 @@ dependencies = [
"xarray",
"zarr",
"earthengine-api",
"xee"
"xee",
]

[dependency-groups]
Expand Down
25 changes: 24 additions & 1 deletion pipelines/run_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
from enum import Enum

import click
import coiled
from prefect import flow, task
from prefect.logging import get_run_logger
Expand Down Expand Up @@ -127,5 +128,27 @@ def run_updates(
return result_uris


@click.command()
@click.option(
"--flow",
"flow_name",
type=click.Choice([e.value for e in UpdateFlow], case_sensitive=False),
default=UpdateFlow.DIST_UPDATE.value,
help="Which update flow to run.",
)
@click.option(
"--version", default=None, help="Dataset version (required for tcl_update)."
)
@click.option("--overwrite", is_flag=True, help="Overwrite existing outputs.")
@click.option("--is-latest", is_flag=True, help="Mark this version as latest.")
def cli(flow_name, version, overwrite, is_latest):
run_updates(
version=version,
overwrite=overwrite,
is_latest=is_latest,
flow_name=UpdateFlow(flow_name),
)


if __name__ == "__main__":
run_updates(overwrite=False)
cli()
2 changes: 2 additions & 0 deletions pipelines/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ module "analytics" {
name = "NEW_RELIC_LICENSE_KEY"
value = var.new_relic_license_key
},
{
name = "NEW_RELIC_ENVIRONMENT"
value = var.new_relic_environment
},
{
name = "LOCAL_CLUSTER_AREA_THRESHOLD_HA"
value = tostring(var.local_cluster_area_threshold_ha)
Expand Down
10 changes: 5 additions & 5 deletions terraform/prefect.tf
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,11 @@ resource "prefect_deployment" "gnw_zonal_stats_update" {


resource "prefect_webhook" "dist_update_event" {
name = "dist-updated-event"
name = "dist-updated-event${local.name_suffix}"
description = "Fire an event when a new DIST alerts version is published."
enabled = true
template = jsonencode({
event = "dist_updated"
event = "dist_updated${local.name_suffix}"
payload = {
dataset = "{{body.dataset}}"
version = "{{body.version}}"
Expand All @@ -185,13 +185,13 @@ resource "prefect_webhook" "dist_update_event" {
}

resource "prefect_automation" "run_pipelines_on_dist_update" {
name = "run-gnw-zonal-stats-on-dist-update"
enabled = true
name = "run-gnw-zonal-stats-on-dist-update${local.name_suffix}"
enabled = terraform.workspace == "default"

trigger = {
event = {
posture = "Reactive"
expect = ["dist_updated"]
expect = ["dist_updated${local.name_suffix}"]
threshold = 1
within = 0
}
Expand Down
6 changes: 6 additions & 0 deletions terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ variable "new_relic_license_key" {
description = "New Relic License Key"
}

variable "new_relic_environment" {
type = string
description = "New Relic agent environment (development, test, staging, production)"
default = "production"
}

variable "prefect_api_key" {
description = "Prefect Cloud API key"
type = string
Expand Down
Loading