Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,7 @@ venv.bak/
*.hdf5
*.nc
*.tif

*.log
notebooks/logs
notebooks/results
23 changes: 23 additions & 0 deletions environment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: h5cloud
channels:
- conda-forge
dependencies:
- jupyterlab
- boto3
- tqdm
- matplotlib-base
- pandas
- numpy
- s3fs
- xarray
- fsspec
- dask
- distributed
- geopandas
- h5py>3.9
- zarr
- kerchunk
- h5netcdf
- pip
- pip:
- h5coro
17 changes: 12 additions & 5 deletions h5tests/h5coro_arr_mean.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .h5test import H5Test, timer_decorator
from h5test import H5Test, timer_decorator
import numpy as np
import subprocess

Expand All @@ -12,15 +12,22 @@

from h5coro import h5coro, s3driver, filedriver
h5coro.config(errorChecking=True, verbose=False, enableAttributes=False)

driver = s3driver.S3Driver

class H5CoroArrMean(H5Test):
@timer_decorator
def run(self):
group = '/gt1l/heights'
variable = 'h_ph'
def run(self, dataset="/gt1l/heights", variable="h_ph"):
group = dataset
variable = variable
final_h5coro_array = []
if self.files[0].startswith("s3://cryo"):
credentials = {}
else:
credentials = {"region_name": "us-west-2",
"anon": True}
for file in self.files:
h5obj = h5coro.H5Coro(file.replace("s3://", ""), s3driver.S3Driver)
h5obj = h5coro.H5Coro(file.replace("s3://", ""), s3driver.S3Driver, credentials=credentials)
output = h5obj.readDatasets(datasets=[f'{group}/{variable}'], block=True)
data = h5obj[f'{group}/{variable}'].values
final_h5coro_array = np.insert(final_h5coro_array, len(final_h5coro_array), data, axis=None)
Expand Down
32 changes: 18 additions & 14 deletions h5tests/h5py_arr_mean.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
from .h5test import H5Test, timer_decorator
import h5py
import numpy as np

from h5test import H5Test, timer_decorator


class H5pyArrMean(H5Test):
@timer_decorator
def run(self):
final_h5py_array = []
# TODO: Do we need to make this configurable or consistent?
group = '/gt1l/heights'
variable = 'h_ph'
def run(self, io_params={}, dataset="/gt1l/heights", variable="h_ph"):
final_h5py_array = []
fsspec_params = {}
h5py_params = {}
if "fsspec_params" in io_params:
fsspec_params = io_params["fsspec_params"]
if "h5py_params" in io_params:
h5py_params = io_params["h5py_params"]
for file in self.files:
with h5py.File(self.s3_fs.open(file, 'rb')) as f:
data = f[f'{group}/{variable}'][:]
# Need to test if using concatenate is faster
final_h5py_array = np.insert(
final_h5py_array,
len(final_h5py_array),
data, axis=None
)
with self.s3_fs.open(file, mode="rb", **fsspec_params) as fo:
print("h5py params: ", h5py_params)
with h5py.File(fo, **h5py_params) as f:
data = f[f"{dataset}/{variable}"][:]
final_h5py_array = np.insert(
final_h5py_array, len(final_h5py_array), data, axis=None
)
return np.mean(final_h5py_array)
178 changes: 151 additions & 27 deletions h5tests/h5test.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,198 @@
import boto3
import csv
from io import StringIO
import logging
import os
import re
import sys
import time
from datetime import datetime
import os
from io import StringIO

import boto3
import s3fs
import sys

current = os.path.abspath('..')
current = os.path.abspath("..")
sys.path.append(current)
from helpers.links import S3Links

def generate_timestamp():
return datetime.now().strftime('%Y-%m-%d-%H%M%S')

import csv
import logging
import os
import pathlib
import re
import time
from datetime import datetime
from io import StringIO

import boto3
import fsspec
import h5py
import numpy as np
import pandas as pd
import s3fs
import xarray as xr
from tqdm import tqdm


class RegexFilter(logging.Filter):
def __init__(self, regex_pattern):
super(RegexFilter, self).__init__()
self.regex_pattern = re.compile(regex_pattern)

def filter(self, record):
# Apply the regex pattern to the log message
return not bool(self.regex_pattern.search(record.msg))


def timer_decorator(func):
"""
A decorator to measure the execution time of the wrapped function.
"""

def fsspec_stats(log_file):
with open(log_file, "r") as input_file:
num_requests = 0
total_requested_bytes = 0
for line in input_file:
# Strip leading and trailing whitespaces from the line

try:
read_range = line.split("read:")[1].split(" - ")
request_size = int(read_range[1]) - int(read_range[0])
total_requested_bytes += request_size
num_requests += 1
except Exception:
pass
stats = {
"total_reqs": num_requests,
"total_reqs_bytes": total_requested_bytes,
"avg_req_size": int(round(total_requested_bytes / num_requests, 2)),
}
return stats

def __setup_logging(self, tstamp):
pathlib.Path(f"./logs").mkdir(exist_ok=True)
self.log_filename = f"logs/{self.data_format}-{tstamp}.log"
logger = logging.getLogger("fsspec")
logger.setLevel(logging.DEBUG)
self.regex_filter = RegexFilter(self.logs_regex)
# add regerx to root logger
logging.getLogger("fsspec").addFilter(self.regex_filter)
self._file_handler = logging.FileHandler(self.log_filename)
self._file_handler.setLevel(logging.DEBUG)
# Add the handler to the root logger
logging.getLogger("fsspec").addHandler(self._file_handler)

def __turnoff_logging(self):
logging.getLogger("fsspec").removeFilter(self.regex_filter)
logging.getLogger("fsspec").removeHandler(self._file_handler)
self._file_handler.close()

def wrapper(self, *args, **kwargs):
tstamp = datetime.now().strftime("%Y-%m-%d-%H%M%S")
if self.logs_regex:
__setup_logging(self, tstamp)
start_time = time.time()
result = func(self, *args, **kwargs)
end_time = time.time()
if self.logs_regex:
__turnoff_logging(self)
execution_time = end_time - start_time
# Call the store method here
self.io_stats = fsspec_stats(self.log_filename)
if self.store_results:
results_key = f"{generate_timestamp()}_{self.name}_{self.data_format}_results.csv"
s3_key = f"{self.results_directory}/{results_key}"
self.store(run_time=execution_time, result=result, bucket=self.bucket, s3_key=s3_key)
return result, execution_time
results_key = f"{tstamp}_{self.name}_{self.data_format}_results.csv"
self.store(run_time=execution_time, result=result, file_name=results_key)
return result, execution_time, self.log_filename, self.io_stats

return wrapper


class H5Test:
def __init__(self, data_format: str, files=None, store_results=True):
def __init__(
self,
data_format: str,
files=[],
store_results=True,
logs_regex=r"<File-like object S3FileSystem, .*?>\s*(read: \d+ - \d+)",
):
self.name = self.__class__.__name__
self.io_stats = {}
self.log_filename = ""
self.data_format = data_format
if files:
self.logs_regex = logs_regex
if len(files) > 0:
self.files = files
else:
self.files = S3Links().get_links_by_format(data_format)
self.s3_client = boto3.client('s3') # Ensure AWS credentials are configured
self.s3_fs = s3fs.S3FileSystem(anon=False)
raise ValueError("We need at least 1 ATL03 granule URL hosted in S3")

self.store_results = store_results
self.bucket = "nasa-cryo-scratch"
self.results_directory = "h5cloud/benchmark_results"

if files[0].startswith("s3://nasa-cryo-persistent"):
self.s3_client = boto3.client("s3") #
self.annon_access = False
self.results_bucket = "s3://nasa-cryo-persistent/"
self.results_directory = "h5cloud/benchmark_results"
self.results_store_type = "S3"
else:
self.annon_access = True
self.results_path = "results"
pathlib.Path(f"./{self.results_path}").mkdir(exist_ok=True)
self.results_store_type = "Local"

self.s3_fs = s3fs.S3FileSystem(anon=self.annon_access)

@timer_decorator
def run(self):
def run(self, io_params, dataset, variable):
raise NotImplementedError("The run method has not been implemented")

def store(self, run_time: float, result: str, bucket: str, s3_key: str):
def store(self, run_time: float, result: str, file_name: str):
"""
Store test results to an S3 bucket as a CSV file.

:param run_time: The runtime of the test
:param result: The result of the test
:param bucket: The name of the S3 bucket where the CSV will be uploaded
:param s3_key: The S3 key (filename) where the CSV will be stored
:param file_name: file to store the results
"""
# Create a CSV in-memory
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer)
csv_writer.writerow(['Name', 'Data Format', 'Run Time', 'Result']) # Headers
csv_writer.writerow([self.name, self.data_format, run_time, result])
csv_writer.writerow(
[
"Name",
"Data Format",
"Run Time",
"Result",
"Access Log",
"Total Bytes Tranferred",
"Total Requests",
]
) # Headers
csv_writer.writerow(
[
self.name,
self.data_format,
run_time,
result,
self.log_filename,
self.io_stats["total_reqs_bytes"],
self.io_stats["total_reqs"],
]
)

# Reset the buffer's position to the beginning
csv_buffer.seek(0)

# Upload the CSV to S3
self.s3_client.put_object(Bucket=bucket, Key=s3_key, Body=csv_buffer.getvalue())
if self.results_store_type == "S3":
# assumes s3 can write to bucket
self.s3_client.put_object(
Bucket=self.results_bucket,
Key=f"{self.results_directory}/{file_name}",
Body=csv_buffer.getvalue(),
)
else:
with open(f"{self.results_path}/{file_name}", "w", newline="") as csv_file:
csv_file.write(csv_buffer.getvalue())


## Example subclass
# class SampleTest(H5Test):
Expand Down
Loading