Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TRT-571 - Update net2cog to process multiple variables. #38

Merged
merged 6 commits into from
Mar 4, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]
### Changed
- [issues/32](https://github.com/podaac/net2cog/issues/32): Added capability to support multiple variables requests, both from explicitly requesting multiple variables, or requestion "all" variables. This also partially addresses [issues/35](https://github.com/podaac/net2cog/issues/35).

## [0.4.0]
### Changed
@@ -38,4 +39,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

[Unreleased]: https://github.com/podaac/net2cog/compare/v0.4.0...HEAD
[0.4.0]: https://github.com/podaac/net2cog/compare/v0.3.0...v0.4.0
[0.3.0]: https://github.com/podaac/net2cog/compare/eabb00704a6fc693aa4d79536dc5c5354c6de4d9...v0.3.0
[0.3.0]: https://github.com/podaac/net2cog/compare/eabb00704a6fc693aa4d79536dc5c5354c6de4d9...v0.3.0
11 changes: 10 additions & 1 deletion docker/Readme.md
Original file line number Diff line number Diff line change
@@ -39,9 +39,18 @@ and the `SOURCE` build arg should be set to the path to the wheel file.
Example:

```shell script
docker build -f docker/Dockerfile --build-arg SOURCE="dist/net2cog-1.1.0a1-py3-none-any.whl[harmony]" --build-arg DIST_PATH="dist/" .
docker build -f docker/Dockerfile -t ghcr.io/podaac/net2cog:SIT \
--build-arg SOURCE="dist/net2cog-1.1.0a1-py3-none-any.whl[harmony]" \
--build-arg DIST_PATH="dist/" .
```

To use with Harmony in a Box, the output image must be tagged, using the `-t`
flag, with a string that matches the `NET2COG_IMAGE` environment variable
used by Harmony. The default value for this environment variable in Harmony is
`ghcr.io/podaac/net2cog:SIT`, as specified in
[harmony/services/harmony/env-defaults](https://github.com/nasa/harmony/blob/main/services/harmony/env-defaults). This can be overwritten in your local `.env` file
for Harmony.

## Running

If given no arguments, running the docker image will invoke the [Harmony service](https://github.com/nasa/harmony-service-lib-py) CLI.
252 changes: 126 additions & 126 deletions net2cog/netcdf_convert.py
Original file line number Diff line number Diff line change
@@ -7,24 +7,22 @@
Functions related to converting a NetCDF file to other formats.
"""

import logging
import os
import pathlib
import subprocess
import tempfile
from os.path import join as pjoin, basename, dirname, exists, splitext
from subprocess import check_call
from logging import Logger
from os.path import join as path_join, basename
from tempfile import TemporaryDirectory
from typing import List

import rasterio
import rioxarray # noqa
import xarray as xr
from harmony_service_lib.util import generate_output_filename
from rasterio import CRS
from rio_cogeo.cogeo import cog_translate
from rio_cogeo.profiles import cog_profiles
from rioxarray.exceptions import DimensionError

LOGGER = logging.getLogger(__name__)
EXCLUDE_VARS = ['lon', 'lat', 'longitude', 'latitude', 'time']


@@ -37,152 +35,144 @@ def __init__(self, msg):
super().__init__(msg)


def run_command(command, work_dir):
"""
A simple utility to execute a subprocess command.
"""
try:
out_call = check_call(command, stderr=subprocess.STDOUT, cwd=work_dir)
return out_call
except subprocess.CalledProcessError as err:
LOGGER.error("command '%s' return with error (code %s): %s",
err.cmd, err.returncode, err.output)
raise

def _rioxr_swapdims(netcdf_xarray):
netcdf_xarray.coords['y'] = ('lat', netcdf_xarray.lat)
netcdf_xarray.coords['x'] = ('lon', netcdf_xarray.lon)

def check_dir(fname):
"""
To return filename and path without file extension
"""
file_name = fname.split('/')
rel_path = pjoin(*file_name[-2:])
file_wo_extension, _ = splitext(rel_path)
return file_wo_extension


def get_gtiff_name(output_file):
"""
To create tmp filename to convert to COG and create a filename
just as source but without '.TIF' extension
"""
outf = os.path.basename(output_file)
dir_path = dirname(output_file)
rel_path = check_dir(outf)
out_fname = pjoin(dir_path, rel_path)
if not exists(out_fname):
os.makedirs(out_fname)
return pjoin(out_fname, rel_path)
return netcdf_xarray.swap_dims({'lat': 'y', 'lon': 'x'})


def _write_cogtiff(out_f_name, nc_xarray):
def _write_cogtiff(
output_directory: str,
nc_xarray: xr.Dataset,
variable_name: str,
input_file_basename: str,
logger: Logger,
) -> str | None:
"""
This function converts each variable inside a NetCDF file into a
This function converts a variable inside a NetCDF file into a
cloud optimized geotiff.

Parameters
----------
out_f_name : string
Path to temp gtiff filename excluding file extension
out_directory : str
Path to temporary directory where output GeoTIFFs will be stored before
being staged in S3.
example :/home/dockeruser/converter/podaac/netcdf_converter/temp/
netcdf_converter/
RSS_smap_SSS_L3_8day_running_2020_037_FNL_v04.0_test
nc_xarray : xarray dataset
nc_xarray : xarray.Dataset
xarray dataset loaded from NetCDF file
variable_name: str
Name of the variable as passed in to Harmony.
input_file_basename: str
String representation of the basename of the NetCDF-4 file input to
the service. This is used to construct the output file name.

Notes
-----
Assumption that 0 is always on the prime meridian/equator.
"""

cogs_generated = []
with tempfile.TemporaryDirectory() as tempdir:

# variables in netcdf
for var in nc_xarray.variables:
if var in EXCLUDE_VARS:
continue
LOGGER.debug("NetCDF Var: %s", var)

def rioxr_swapdims(netcdf_xarray):
netcdf_xarray.coords['y'] = ('lat', netcdf_xarray.lat)
netcdf_xarray.coords['x'] = ('lon', netcdf_xarray.lon)

return netcdf_xarray.swap_dims({'lat': 'y', 'lon': 'x'})

# copy to a tempfolder
out_fname = out_f_name + '_' + var + '.tif'
temp_fname = pjoin(tempdir, basename(out_fname))

logger.debug("NetCDF Var: %s", variable_name)

if variable_name in EXCLUDE_VARS:
logger.debug(f"Variable {variable_name} is excluded. Will not produce COG")
return None

output_basename = generate_output_filename(
input_file_basename,
ext='tif',
variable_subset=[variable_name],
is_reformatted=True,
)
output_file_name = path_join(output_directory, output_basename)

with TemporaryDirectory() as tempdir:
temp_file_name = path_join(tempdir, output_basename)

# copy to a tempfolder
# out_fname = out_f_name + '_' + var + '.tif'
# temp_fname = path_join(tempdir, basename(out_fname))

try:
nc_xarray[variable_name].rio.to_raster(temp_file_name)
except KeyError as error:
# Occurs when trying to locate a variable that is not in the Dataset
raise Net2CogError(error) from error
except LookupError as err:
logger.info("Variable %s cannot be converted to tif: %s", variable_name, err)
return None
except DimensionError as dmerr:
try:
nc_xarray[var].rio.to_raster(temp_fname)
except LookupError as err:
LOGGER.info("Variable %s cannot be converted to tif: %s", var, err)
continue
except DimensionError as dmerr:
try:
LOGGER.info("%s: No x or y xarray dimensions, adding them...", dmerr)
nc_xarray_tmp = rioxr_swapdims(nc_xarray)
nc_xarray_tmp[var].rio.to_raster(temp_fname)
except RuntimeError as runerr:
LOGGER.info("Variable %s cannot be converted to tif: %s", var, runerr)
continue
except Exception as aerr: # pylint: disable=broad-except
LOGGER.info("Variable %s cannot be converted to tif: %s", var, aerr)
continue

# Option to add additional GDAL config settings
# config = dict(GDAL_NUM_THREADS="ALL_CPUS", GDAL_TIFF_OVR_BLOCKSIZE="128")
# with rasterio.Env(**config):

LOGGER.info("Starting conversion... %s", out_fname)

# default CRS setting
# crs = rasterio.crs.CRS({"init": "epsg:3857"})

with rasterio.open(temp_fname, mode='r+') as src_dataset:
# if src_dst.crs is None:
# src_dst.crs = crs
src_dataset.crs = CRS.from_proj4(proj="+proj=latlong")
dst_profile = cog_profiles.get("deflate")
cog_translate(src_dataset, out_fname, dst_profile, use_cog_driver=True)

cogs_generated.append(out_fname)
LOGGER.info("Finished conversion, writing variable: %s", out_fname)
LOGGER.info("NetCDF conversion complete. Returning COGs generated.")
return cogs_generated


def netcdf_converter(input_nc_file: pathlib.Path, output_cog_pathname: pathlib.Path, var_list: list = ()) -> List[str]:
"""
Primary function for beginning NetCDF conversion using rasterio,
logger.info("%s: No x or y xarray dimensions, adding them...", dmerr)
nc_xarray_tmp = _rioxr_swapdims(nc_xarray)
nc_xarray_tmp[variable_name].rio.to_raster(temp_file_name)
except RuntimeError as runerr:
logger.info("Variable %s cannot be converted to tif: %s", variable_name, runerr)
return None
except Exception as aerr: # pylint: disable=broad-except
logger.info("Variable %s cannot be converted to tif: %s", variable_name, aerr)
return None

# Option to add additional GDAL config settings
# config = dict(GDAL_NUM_THREADS="ALL_CPUS", GDAL_TIFF_OVR_BLOCKSIZE="128")
# with rasterio.Env(**config):

logger.info("Starting conversion... %s", output_file_name)

# default CRS setting
# crs = rasterio.crs.CRS({"init": "epsg:3857"})

with rasterio.open(temp_file_name, mode='r+') as src_dataset:
# if src_dst.crs is None:
# src_dst.crs = crs
src_dataset.crs = CRS.from_proj4(proj="+proj=latlong")
dst_profile = cog_profiles.get("deflate")
cog_translate(
src_dataset,
output_file_name,
dst_profile,
use_cog_driver=True
)

logger.info("Finished conversion, writing variable: %s", output_file_name)
logger.info("NetCDF conversion complete. Returning COG generated.")
return output_file_name


def netcdf_converter(
input_nc_file: pathlib.Path,
output_directory: pathlib.Path,
var_list: list[str],
logger: Logger,
) -> List[str]:
"""Primary function for beginning NetCDF conversion using rasterio,
rioxarray and xarray

Parameters
----------
input_nc_file : pathlib.Path
Path to NetCDF file to process
output_cog_pathname : pathlib.Path
COG Output path and NetCDF filename, filename converted to cog variable
filename (.tif)
ex: tests/data/tmpygj2vgxf/
RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.nc
var_list : list
output_directory : pathlib.Path
Path to temporary directory into which results will be placed before
staging in S3.
var_list : str | None
List of variable names to be converted to various single band cogs,
ex: ['gland', 'fland', 'sss_smap']
ex: ['gland', 'fland', 'sss_smap']. If a Harmony request asks for "all"
variables, the input value will be an empty list.

Notes
-----
Currently uses local file paths, no s3 paths
"""
logger.info("Input file name: %s", input_nc_file)

netcdf_file = os.path.abspath(input_nc_file)
LOGGER.debug('NetCDF Path: %s', netcdf_file)

gtiff_fname = get_gtiff_name(output_cog_pathname)
logger.debug('NetCDF Path: %s', netcdf_file)

if netcdf_file.endswith('.nc'):
LOGGER.info("Reading %s", basename(netcdf_file))
LOGGER.info('Tmp GTiff filename: %s', gtiff_fname)
logger.info("Reading %s", basename(netcdf_file))

xds = xr.open_dataset(netcdf_file)

@@ -192,15 +182,25 @@ def netcdf_converter(input_nc_file: pathlib.Path, output_cog_pathname: pathlib.P
or ({"x", "y"}.issubset(set(xds.dims)))):
# used to invert y axis
# xds_reversed = xds.reindex(lat=xds.lat[::-1])
LOGGER.info("Writing COG to %s", basename(gtiff_fname))
if var_list:
try:
xds = xds[var_list]
except KeyError as error:
raise Net2CogError(f"Variable {error} not found in dataset") from error
return _write_cogtiff(gtiff_fname, xds)
LOGGER.error("%s: NetCDF file does not contain spatial dimensions such as lat / lon "

if not var_list:
# Empty list means "all" variables, so get all variables in
# the `xarray.Dataset`.
var_list = list(xds.data_vars.keys())

output_files = [
_write_cogtiff(output_directory, xds, variable_name, basename(netcdf_file), logger)
for variable_name in var_list
]
# Remove None returns, e.g., for excluded variables
return [
output_file
for output_file in output_files
if output_file is not None
]

logger.error("%s: NetCDF file does not contain spatial dimensions such as lat / lon "
"or x / y", netcdf_file)
return []
LOGGER.info("Not a NetCDF file; Skipped file: %s", netcdf_file)
logger.info("Not a NetCDF file; Skipped file: %s", netcdf_file)
return []
143 changes: 88 additions & 55 deletions net2cog/netcdf_convert_harmony.py
Original file line number Diff line number Diff line change
@@ -15,18 +15,21 @@
import shutil
import tempfile

import harmony
import harmony_service_lib
import pystac
from harmony.exceptions import HarmonyException
from pystac import Asset
from harmony_service_lib import BaseHarmonyAdapter
from harmony_service_lib.exceptions import HarmonyException
from harmony_service_lib.message import Source
from harmony_service_lib.util import download, stage
from pystac import Asset, Item

from net2cog import netcdf_convert
from net2cog.netcdf_convert import Net2CogError

DATA_DIRECTORY_ENV = "DATA_DIRECTORY"


class NetcdfConverterService(harmony.BaseHarmonyAdapter):
class NetcdfConverterService(BaseHarmonyAdapter):
"""
See https://github.com/nasa/harmony-service-lib-py
for documentation and examples.
@@ -41,61 +44,61 @@ def __init__(self, message, catalog=None, config=None):
# Create temp directory
self.job_data_dir = tempfile.mkdtemp(prefix=message.requestId, dir=self.data_dir)

def process_item(self, item: pystac.Item, source: harmony.message.Source) -> pystac.Item:
def process_item(self, input_item: pystac.Item, source: Source) -> pystac.Item:
"""
Performs net2cog on input STAC Item's data, returning
an output STAC item
Parameters
----------
item : pystac.Item
input_item : pystac.Item
the item that should be coggified
source : harmony.message.Source
source : harmony_service_lib.message.Source
the input source defining the item
Returns
-------
pystac.Item
a STAC item describing the output
"""
result = item.clone()
result.assets = {}
output_dir = self.job_data_dir
try:
self.logger.info('Input item: %s', json.dumps(item.to_dict()))
self.logger.info('Input item: %s', json.dumps(input_item.to_dict()))
self.logger.info('Input source: %s', source)
# Get the data file
asset = next(v for k, v in item.assets.items() if 'data' in (v.roles or []))
asset = next(v for k, v in input_item.assets.items() if 'data' in (v.roles or []))
self.logger.info('Downloading %s to %s', asset.href, output_dir)
input_filename = harmony.adapter.util.download(asset.href,
output_dir,
logger=self.logger,
access_token=self.message.accessToken,
cfg=self.config)

# Generate output filename
output_filename, output_file_ext = os.path.splitext(
harmony.adapter.util.generate_output_filename(asset.href, ext='tif'))
output_filename = f'{output_filename}_converted{output_file_ext}'
input_filename = download(
asset.href,
output_dir,
logger=self.logger,
access_token=self.message.accessToken,
cfg=self.config
)

# Rename the downloaded output from a SHA256 hash to the basename
# of the source data file
source_asset_basename = os.path.basename(asset.href)
os.rename(input_filename, os.path.join(output_dir, source_asset_basename))
input_filename = os.path.join(output_dir, source_asset_basename)

# Determine variables that need processing
self.logger.info('Generating COG(s) for %s output will be saved to %s', input_filename, output_filename)
var_list = source.process('variables')
if not isinstance(var_list, list):
var_list = [var_list]
if len(var_list) != 1:
raise HarmonyException(
'net2cog harmony adapter currently only supports processing one variable at a time. '
'Please specify a single variable in your Harmony request.')
var_list = list(map(lambda var: var.name, var_list))
self.logger.info('Processing variables %s', var_list)

if var_list:
var_list = list(map(lambda var: var.name, var_list))
self.logger.info('Processing variables %s', var_list)
else:
self.logger.info('Processing all variables.')

# Run the netcdf converter for the complete netcdf granule
try:
cog_generated = next(iter(netcdf_convert.netcdf_converter(pathlib.Path(input_filename),
pathlib.Path(output_dir).joinpath(
output_filename),
var_list=var_list)), [])
generated_cogs = netcdf_convert.netcdf_converter(
pathlib.Path(input_filename),
pathlib.Path(output_dir),
var_list,
self.logger,
)
except Net2CogError as error:
raise HarmonyException(
f'net2cog failed to convert {asset.title}: {error}') from error
@@ -104,28 +107,58 @@ def process_item(self, item: pystac.Item, source: harmony.message.Source) -> pys
f'Notify net2cog service provider. '
f'Message: {uncaught_exception}')) from uncaught_exception

# Stage the output file with a conventional filename
self.logger.info('Generated COG %s', cog_generated)
staged_filename = os.path.basename(cog_generated)
url = harmony.adapter.util.stage(cog_generated,
staged_filename,
pystac.MediaType.COG,
location=self.message.stagingLocation,
logger=self.logger,
cfg=self.config)
self.logger.info('Staged %s to %s', cog_generated, url)

# Update the STAC record
result.assets['visual'] = Asset(url, title=staged_filename, media_type=pystac.MediaType.COG,
roles=['visual'])

# Return the STAC record
self.logger.info('Processed item %s', json.dumps(result.to_dict()))
return result
return self.stage_output_and_create_output_stac(generated_cogs, input_item)
finally:
# Clean up any intermediate resources
shutil.rmtree(self.job_data_dir)

def stage_output_and_create_output_stac(self, output_files: list[str], input_stac_item: Item) -> Item:
"""Iterate through all generated COGs and stage the results in S3. Also
add a unique pystac.Asset for each COG to the pystac.Item returned to
Harmony.
Parameters
----------
output_files : list[str]
the filenames of generated COGs to be staged
input_stac_item : pystac.Item
the input STAC for the request. This is the basis of the output
STAC, which will replace the pystac.Assets with generated COGs.
Returns
-------
pystac.Item
a STAC item describing the output. If there are multiple variables,
this STAC item will have multiple assets.
"""

output_stac_item = input_stac_item.clone()
output_stac_item.assets = {}

for output_file in output_files:
output_basename = os.path.basename(output_file)

staged_url = stage(
output_file,
output_basename,
pystac.MediaType.COG,
location=self.message.stagingLocation,
logger=self.logger,
cfg=self.config
)
self.logger.info('Staged %s to %s', output_file, staged_url)

# Each asset needs a unique key, so the filename of the COG is used
output_stac_item.assets[output_basename] = Asset(
staged_url,
title=output_basename,
media_type=pystac.MediaType.COG,
roles=['visual'],
)

return output_stac_item


def main():
"""Parse command line arguments and invoke the service to respond to
@@ -138,9 +171,9 @@ def main():
"""
parser = argparse.ArgumentParser(prog='net2cog_harmony',
description='Run the netcdf converter service')
harmony.setup_cli(parser)
harmony_service_lib.setup_cli(parser)
args = parser.parse_args()
if harmony.is_harmony_cli(args):
harmony.run_cli(parser, args, NetcdfConverterService)
if harmony_service_lib.is_harmony_cli(args):
harmony_service_lib.run_cli(parser, args, NetcdfConverterService)
else:
parser.error("Only --harmony CLIs are supported")
202 changes: 182 additions & 20 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ rio-cogeo = "^5.3.3"
rasterio = "^1.3.8"
rioxarray = "^0.17.0"
numpy = "^2.0.1"
harmony-service-lib = { version = "^1.0.22", optional = true }
harmony-service-lib = { version = "^2.4.0", optional = true }

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.0"
153 changes: 153 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""A pytest module containing test fixtures to be reused through out multiple tests."""
import json
import os
from logging import getLogger
from os.path import dirname, join, realpath
from pathlib import Path
from shutil import copyfile, rmtree
from tempfile import mkdtemp

from pytest import fixture


@fixture(scope='session')
def logger():
return getLogger(__name__)


@fixture(scope='session')
def data_dir():
"""Location of the tests/data directory in the environment running the tests."""
test_dir = dirname(realpath(__file__))
return join(test_dir, 'data')


@fixture(scope='session')
def smap_collection():
"""Name of SMAP collection, used as a subdirectory in tests/data."""
return 'SMAP_RSS_L3_SSS_SMI_8DAY-RUNNINGMEAN_V4'


@fixture(scope='session')
def smap_file_basename():
"""Basename of the SMAP file used as test input."""
return 'RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.nc'


@fixture(scope='function')
def temp_dir():
"""A temporary directory used for each test, to ensure tests are isolated."""
temp_directory = mkdtemp()
yield temp_directory
rmtree(temp_directory)


@fixture(scope='function')
def smap_file(data_dir, temp_dir, smap_collection, smap_file_basename):
"""Path to SMAP NetCDF-4 input file, copied into the test directory."""
temporary_data_file = Path(join(temp_dir, smap_file_basename))
copyfile(
join(data_dir, smap_collection, smap_file_basename),
temporary_data_file,
)
return temporary_data_file


@fixture(scope='function')
def smap_data_operation_message(data_dir, temp_dir, smap_collection, smap_file):
"""Message for SMAP request. JSON is scoped per function, to avoids affects
of mutability when updating retrieved dictionary in some tests.
The base message is updated for each test to include the path to the SMAP
granule, as hosted in a per-test temporary directory.
"""
temporary_message_file = Path(join(temp_dir, 'data_operation_message.json'))
copyfile(
join(data_dir, smap_collection, 'data_operation_message.json'),
temporary_message_file,
)

with open(temporary_message_file, 'r', encoding='utf-8') as file_handler:
data_operation_message = json.load(file_handler)

data_operation_message['sources'][0]['granules'][0]['url'] = f'file://{smap_file}'

with open(temporary_message_file, 'w', encoding='utf-8') as file_handler:
json.dump(data_operation_message, file_handler, indent=2)

return temporary_message_file


@fixture(scope='function')
def smap_stac(data_dir, temp_dir, smap_collection, smap_item):
"""Main STAC file containing catalog for SMAP data. While the smap_item
fixture is not called in the body below, declaring it as a dependency
ensures the file for the item is also populated in the temporary directory.
"""
temporary_catalog_file = Path(join(temp_dir, 'catalog.json'))
copyfile(
join(data_dir, smap_collection, 'catalog.json'),
temporary_catalog_file,
)
return temporary_catalog_file


@fixture(scope='function')
def smap_item(data_dir, temp_dir, smap_collection, smap_file):
"""File for STAC item representing the SMAP granule being processed in Harmony
requests. The JSON object is updated each test to include the path to the
SMAP granule as hosted in the per-test temporary directory.
"""
stac_item_basename = 'RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.json'
temporary_stac_item_file = Path(join(temp_dir, stac_item_basename))
copyfile(
join(data_dir, smap_collection, stac_item_basename),
temporary_stac_item_file,
)

with open(temporary_stac_item_file, 'r', encoding='utf-8') as file_handler:
stac_item_json = json.load(file_handler)

stac_item_json['assets']['data']['href'] = f'file://{smap_file}'

with open(temporary_stac_item_file, 'w', encoding='utf-8') as file_handler:
json.dump(stac_item_json, file_handler, indent=2)

return temporary_stac_item_file


@fixture(scope='function')
def mock_environ(tmp_path):
"""
Replace AWS env variables with fake values, to ensure no real AWS
calls are executed. During fixture teardown, revert environment
variables to their original values.
"""
environment_variables = {
'AWS_ACCESS_KEY_ID': 'foo',
'AWS_SECRET_ACCESS_KEY': 'foo',
'AWS_SECURITY_TOKEN': 'foo',
'AWS_SESSION_TOKEN': 'foo',
'AWS_REGION': 'us-west-2',
'AWS_DEFAULT_REGION': 'us-west-2',
'SHARED_SECRET_KEY': "shhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh",
'ENV': "test",
'DATA_DIRECTORY': str(tmp_path),
'OAUTH_CLIENT_ID': '',
'OAUTH_UID': '',
'OAUTH_PASSWORD': '',
'OAUTH_REDIRECT_URI': '',
'STAGING_PATH': '',
'STAGING_BUCKET': '',
}

for variable_name, variable_value in environment_variables.items():
os.environ[variable_name] = variable_value

yield

for variable_name in environment_variables:
os.unsetenv(variable_name)
Original file line number Diff line number Diff line change
@@ -10,8 +10,8 @@
},
{
"rel": "item",
"href": "./RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0/RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.json",
"href": "./RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.json",
"type": "application/json"
}
]
}
}
66 changes: 29 additions & 37 deletions tests/test_netcdf_convert.py
Original file line number Diff line number Diff line change
@@ -5,42 +5,28 @@
Test the netcdf conversion functionality.
"""
import os.path
import pathlib
import subprocess
from os import walk
from os.path import dirname, join, realpath

import pytest

from net2cog import netcdf_convert
from net2cog.netcdf_convert import Net2CogError
from net2cog.netcdf_convert import Net2CogError, netcdf_converter


@pytest.fixture(scope='session')
def data_dir():
test_dir = dirname(realpath(__file__))
test_data_dir = join(test_dir, 'data')
return test_data_dir


@pytest.fixture(scope="function")
def output_basedir(tmp_path):
return tmp_path


@pytest.mark.parametrize('data_file', [
'SMAP_RSS_L3_SSS_SMI_8DAY-RUNNINGMEAN_V4/RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.nc'
])
def test_cog_generation(data_file, data_dir, output_basedir):
def test_single_cog_generation(smap_file, temp_dir, logger):
"""
Test that the conversion works and the output is a valid cloud optimized geotiff
"""
test_file = pathlib.Path(data_dir, data_file)
test_file = pathlib.Path(temp_dir, smap_file)

results = netcdf_convert.netcdf_converter(test_file, pathlib.Path(output_basedir, data_file), [])
results = netcdf_converter(
test_file,
pathlib.Path(temp_dir),
['sss_smap'],
logger,
)

assert len(results) > 0
assert len(results) == 1

for entry in results:
if pathlib.Path(entry).is_file():
@@ -59,41 +45,47 @@ def test_cog_generation(data_file, data_dir, output_basedir):
assert cog_test == valid_cog


@pytest.mark.parametrize(['data_file', 'in_bands'], [
['SMAP_RSS_L3_SSS_SMI_8DAY-RUNNINGMEAN_V4/RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.nc', ['gland', 'fland', 'sss_smap']]
])
def test_band_selection(data_file, in_bands, data_dir, output_basedir):
@pytest.mark.parametrize(['in_bands'], [[['gland', 'fland', 'sss_smap']]])
def test_multiple_variable_selection(in_bands, temp_dir, smap_file, logger):
"""
Verify the correct bands asked for by the user are being converted
"""

in_bands = sorted(in_bands)
test_file = pathlib.Path(data_dir, data_file)
test_file = pathlib.Path(temp_dir, smap_file)

results = netcdf_convert.netcdf_converter(test_file, pathlib.Path(output_basedir, data_file), in_bands)
results = netcdf_converter(
test_file,
pathlib.Path(temp_dir),
in_bands,
logger
)

assert len(results) == 3

out_bands = []
for entry in results:
if pathlib.Path(entry).is_file():
band_completed = entry.split("4.0_")[-1].replace(".tif", "")
band_completed = entry.split('4.0_')[-1].replace('_reformatted.tif', '')
out_bands.append(band_completed)

out_bands.sort()
assert in_bands == out_bands


@pytest.mark.parametrize(['data_file', 'in_bands'], [
['SMAP_RSS_L3_SSS_SMI_8DAY-RUNNINGMEAN_V4/RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.nc', ['waldo']]
])
def test_unknown_band_selection(data_file, in_bands, data_dir, output_basedir):
@pytest.mark.parametrize(['in_bands'], [[['waldo']]])
def test_unknown_band_selection(in_bands, temp_dir, smap_file, logger):
"""
Verify an incorrect band asked for by the user raises an exception
"""

in_bands = sorted(in_bands)
test_file = pathlib.Path(data_dir, data_file)
test_file = pathlib.Path(temp_dir, smap_file)

with pytest.raises(Net2CogError):
netcdf_convert.netcdf_converter(test_file, pathlib.Path(output_basedir, data_file), in_bands)
netcdf_converter(
test_file,
pathlib.Path(temp_dir),
in_bands,
logger
)
172 changes: 76 additions & 96 deletions tests/test_netcdf_convert_harmony.py
Original file line number Diff line number Diff line change
@@ -1,130 +1,110 @@
"""
==============
test_subset_harmony.py
test_netcdf_convert_harmony.py
==============
Test the harmony service
Test the Harmony service by invoking it as Harmony would.
"""
import json
import os.path
import pathlib
import shutil
import sys
from unittest.mock import patch

import pytest
from harmony.exceptions import HarmonyException
from pystac import Catalog
from harmony_service_lib.exceptions import HarmonyException

import net2cog.netcdf_convert_harmony


@pytest.fixture(scope='function')
def mock_environ(tmp_path):
def test_service_invoke(mock_environ, temp_dir, smap_data_operation_message, smap_stac):
"""Test service invocation, given an input granule, a Harmony message and
a path to a single-item STAC.
"""
Replace AWS env variables with fake values, to ensure no real AWS
calls are executed. During fixture shutdown, revert environment
variables to their original values.
test_args = [
net2cog.netcdf_convert_harmony.__file__,
"--harmony-action", "invoke",
"--harmony-input-file", str(smap_data_operation_message),
"--harmony-sources", str(smap_stac),
"--harmony-metadata-dir", temp_dir,
]

with patch.object(sys, 'argv', test_args):
net2cog.netcdf_convert_harmony.main()


def test_service_multiple_variables(mock_environ, temp_dir, smap_data_operation_message, smap_stac):
"""Test service invocation when including multiple variables."""
with open(smap_data_operation_message, 'r', encoding='utf-8') as file_handler:
smap_data_operation_json = json.load(file_handler)

smap_data_operation_json['sources'][0]['variables'].append({
'id': 'V12345-ABC',
'name': 'gland',
'fullPath': 'gland',
})

with open(smap_data_operation_message, 'w', encoding='utf-8') as file_handler:
json.dump(smap_data_operation_json, file_handler, indent=2)

test_args = [
net2cog.netcdf_convert_harmony.__file__,
"--harmony-action", "invoke",
"--harmony-input-file", str(smap_data_operation_message),
"--harmony-sources", str(smap_stac),
"--harmony-metadata-dir", temp_dir,
]

with patch.object(sys, 'argv', test_args):
net2cog.netcdf_convert_harmony.main()


def test_service_all_variables(mock_environ, temp_dir, smap_data_operation_message, smap_stac):
"""Test service invocation when no variables are in the input message, which
occurs when "all" variables are requested.
"""
old_env = os.environ

os.environ['AWS_ACCESS_KEY_ID'] = 'foo'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'foo'
os.environ['AWS_SECURITY_TOKEN'] = 'foo'
os.environ['AWS_SESSION_TOKEN'] = 'foo'
os.environ['AWS_REGION'] = 'us-west-2'
os.environ['AWS_DEFAULT_REGION'] = 'us-west-2'
os.environ['SHARED_SECRET_KEY'] = "shhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh"
os.environ['ENV'] = "test"
os.environ['DATA_DIRECTORY'] = str(tmp_path)

os.environ['OAUTH_CLIENT_ID'] = ''
os.environ['OAUTH_UID'] = ''
os.environ['OAUTH_PASSWORD'] = ''
os.environ['OAUTH_REDIRECT_URI'] = ''
os.environ['STAGING_PATH'] = ''
os.environ['STAGING_BUCKET'] = ''

yield

os.environ = old_env


def test_service_invoke(mock_environ, tmp_path):
test_dir = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
data_operation_message = pathlib.Path('data', 'SMAP_RSS_L3_SSS_SMI_8DAY-RUNNINGMEAN_V4',
'data_operation_message.json')
stac_catalog = pathlib.Path('data', 'SMAP_RSS_L3_SSS_SMI_8DAY-RUNNINGMEAN_V4', 'catalog.json')
stac_item = pathlib.Path('data', 'SMAP_RSS_L3_SSS_SMI_8DAY-RUNNINGMEAN_V4',
'RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0',
'RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.json')
test_granule = pathlib.Path('data', 'SMAP_RSS_L3_SSS_SMI_8DAY-RUNNINGMEAN_V4',
'RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.nc')

data_operation_message_json = json.load(open(test_dir.joinpath(data_operation_message)))
data_operation_message_json['sources'][0]['granules'][0]['url'] = f'file://{test_dir.joinpath(test_granule)}'
tmp_path.joinpath(data_operation_message).parent.mkdir(parents=True, exist_ok=True)
tmp_path.joinpath(data_operation_message).touch()
with open(tmp_path.joinpath(data_operation_message), 'w') as f:
f.write(json.dumps(data_operation_message_json))

stac_item_json = json.load(open(test_dir.joinpath(stac_item)))
stac_item_json['assets']['data']['href'] = f'file://{test_dir.joinpath(test_granule)}'
tmp_path.joinpath(stac_item).parent.mkdir(parents=True, exist_ok=True)
tmp_path.joinpath(stac_item).touch()
with open(tmp_path.joinpath(stac_item), 'w') as f:
f.write(json.dumps(stac_item_json))

shutil.copy(test_dir.joinpath(stac_catalog), tmp_path.joinpath(stac_catalog))
with open(smap_data_operation_message, 'r', encoding='utf-8') as file_handler:
smap_data_operation_json = json.load(file_handler)

smap_data_operation_json['sources'][0]['variables'] = []

with open(smap_data_operation_message, 'w', encoding='utf-8') as file_handler:
json.dump(smap_data_operation_json, file_handler, indent=2)

test_args = [
net2cog.netcdf_convert_harmony.__file__,
"--harmony-action", "invoke",
"--harmony-input-file", f"{tmp_path.joinpath(data_operation_message)}",
"--harmony-sources", f"{tmp_path.joinpath(stac_catalog)}",
"--harmony-metadata-dir", str(tmp_path),
"--harmony-input-file", str(smap_data_operation_message),
"--harmony-sources", str(smap_stac),
"--harmony-metadata-dir", temp_dir,
]

with patch.object(sys, 'argv', test_args):
net2cog.netcdf_convert_harmony.main()


def test_service_error(mock_environ, tmp_path):
test_dir = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
data_operation_message = pathlib.Path('data', 'SMAP_RSS_L3_SSS_SMI_8DAY-RUNNINGMEAN_V4',
'data_operation_message.json')
stac_catalog = pathlib.Path('data', 'SMAP_RSS_L3_SSS_SMI_8DAY-RUNNINGMEAN_V4', 'catalog.json')
stac_item = pathlib.Path('data', 'SMAP_RSS_L3_SSS_SMI_8DAY-RUNNINGMEAN_V4',
'RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0',
'RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.json')
test_granule = pathlib.Path('data', 'SMAP_RSS_L3_SSS_SMI_8DAY-RUNNINGMEAN_V4',
'RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.nc')

data_operation_message_json = json.load(open(test_dir.joinpath(data_operation_message)))
data_operation_message_json['sources'][0]['granules'][0]['url'] = f'file://{test_dir.joinpath(test_granule)}'
data_operation_message_json['sources'][0]['variables'][0]['name'] = 'thor'
tmp_path.joinpath(data_operation_message).parent.mkdir(parents=True, exist_ok=True)
tmp_path.joinpath(data_operation_message).touch()
with open(tmp_path.joinpath(data_operation_message), 'w') as f:
f.write(json.dumps(data_operation_message_json))

stac_item_json = json.load(open(test_dir.joinpath(stac_item)))
stac_item_json['assets']['data']['href'] = f'file://{test_dir.joinpath(test_granule)}'
tmp_path.joinpath(stac_item).parent.mkdir(parents=True, exist_ok=True)
tmp_path.joinpath(stac_item).touch()
with open(tmp_path.joinpath(stac_item), 'w') as f:
f.write(json.dumps(stac_item_json))

shutil.copy(test_dir.joinpath(stac_catalog), tmp_path.joinpath(stac_catalog))
def test_service_error(mock_environ, temp_dir, smap_data_operation_message, smap_stac):
"""Test service invocation when an incorrect variable is supplied. This
should trigger a HarmonyException containing the original xarray KeyError
message.
"""
with open(smap_data_operation_message, 'r', encoding='utf-8') as file_handler:
smap_data_operation_json = json.load(file_handler)

smap_data_operation_json['sources'][0]['variables'][0]['name'] = 'thor'

with open(smap_data_operation_message, 'w', encoding='utf-8') as file_handler:
json.dump(smap_data_operation_json, file_handler, indent=2)

test_args = [
net2cog.netcdf_convert_harmony.__file__,
"--harmony-action", "invoke",
"--harmony-input-file", f"{tmp_path.joinpath(data_operation_message)}",
"--harmony-sources", f"{tmp_path.joinpath(stac_catalog)}",
"--harmony-metadata-dir", str(tmp_path),
"--harmony-input-file", str(smap_data_operation_message),
"--harmony-sources", str(smap_stac),
"--harmony-metadata-dir", temp_dir,
]

with patch.object(sys, 'argv', test_args):
with pytest.raises(HarmonyException):
with pytest.raises(HarmonyException, match="No variable named 'thor'."):
net2cog.netcdf_convert_harmony.main()