Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/7044.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Replace use of Met Office Datapoint (Switchoff December 2025) in tutorial with Met Office data via the Amazon Data Sustainability Initative.
2 changes: 2 additions & 0 deletions conda-environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ dependencies:
#- pympler
#- matplotlib-base
#- sqlparse
#- h5py
#- requests
3 changes: 0 additions & 3 deletions cylc/flow/etc/tutorial/api-keys

This file was deleted.

2 changes: 0 additions & 2 deletions cylc/flow/etc/tutorial/cylc-forecasting-workflow/.validate
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

set -eux
APIKEY="$(head --lines 1 ../api-keys)"
FLOW_NAME="$(< /dev/urandom tr -dc A-Za-z | head -c6)"
cylc lint .
cylc install --workflow-name "$FLOW_NAME" --no-run-name
sed -i "s/DATAPOINT_API_KEY/$APIKEY/" "$HOME/cylc-run/$FLOW_NAME/flow.cylc"
cylc validate --check-circular --icp=2000 "$FLOW_NAME"
cylc play --no-detach --abort-if-any-task-fails "$FLOW_NAME"
cylc clean "$FLOW_NAME"
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def plot_wind_data(wind_x, wind_y, x_range, y_range, x_coords, y_coords,
[x[0] for x in z_coords],
[y[1] for y in z_coords],
color='red')
plt.savefig('wind.png')
plt.savefig(f'{os.environ["CYLC_TASK_LOG_DIR"]}/wind.png')


def get_wind_fields():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,12 @@ def push_rainfall(rainfall, wind_data, step, resolution, spline_level):
dim_x, dim_y, resolution, resolution,
spline_level)

domain = util.parse_domain(os.environ['DOMAIN'])

while True:
out_of_bounds = []
for itt in range(len(x_values)):
try:
domain = util.parse_domain(os.environ['DOMAIN'])
lng = domain['lng1'] + x_values[itt]
lat = domain['lat1'] + y_values[itt]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@ Usage:
get-observations

Environment Variables:
SITE_ID: The four digit DataPoint site code identifying the weather station
we are to fetch results for.
API_KEY: The DataPoint API key, required for getting live weather data.
If un-specified then get-observations will fall back to archive data
from the workflow directory.
SITE_ID: The four digit WMO (World Meteorological Organization)
site code identifying the weather station we are to fetch results for.

"""

Expand All @@ -43,10 +40,6 @@ import requests
import util


BASE_URL = ('http://datapoint.metoffice.gov.uk/public/data/'
'val/wxobs/all/json/{site_id}'
'?res=hourly&time={time}&key={api_key}')

# Compass bearings for ordinate directions.
# NOTE: We measure direction by where the winds have come from!
ORDINATES = {
Expand All @@ -68,22 +61,52 @@ ORDINATES = {
'NNW': '157.5'
}

class Meteorology:
"""
Surface winds tend to be 20-30 degrees backed (vector anticlockwise)
from the winds which steer weather systems:
Friction with the ground surface tends to mean that land surface winds
are as slow as half the speed of the wind at 2000ft. This fudge factor is
a more conservative 1.5:

class NoDataException(Exception):
...
.. seealso::

[Source Book to the Forecaster's Reference Book](https://digital.nmla
.metoffice.gov.uk/IO_011f7cd4-50fc-4903-b556-d24480ea883d/), section
1.2
"""
SURFACE_BACKING = 2
SURFACE_FRICTION = .66
KT_TO_MPH = 1.15078

@staticmethod
def process_direction(direction: str) -> str:
"""Process raw wind direction:

* Convert direction from 10s of degrees to degrees.
* Convert from Oceanographic (wind to) to Meteorological (wind from)
convention.
* Surface Friction Correction
"""
return str(
((int(direction) + 18 - Meteorology.SURFACE_BACKING) % 36) * 10)

@staticmethod
def process_speed(speed: str) -> str:
"""Process Raw wind speed

* Convert to KT to MPH
* Surface Friction Correction
"""
return str(
(
int(speed) * Meteorology.KT_TO_MPH
) / Meteorology.SURFACE_FRICTION
)


def get_datapoint_data(site_id, time, api_key):
"""Get weather data from the DataPoint service."""
# The URL required to get the data.
time = datetime.strptime(time, '%Y%m%dT%H%MZ').strftime('%Y-%m-%dT%H:%MZ')
url = BASE_URL.format(time=time, site_id=site_id, api_key=api_key)
req = requests.get(url)
if req.status_code != 200:
raise Exception(f'{url} returned exit code {req.status_code}')
# Get the data and parse it as JSON.
print('Opening URL: %s' % url)
return req.json()['SiteRep']['DV']['Location']
class NoDataException(Exception):
...


def get_archived_data(site_id, time):
Expand Down Expand Up @@ -152,11 +175,12 @@ def synop_grab(site_id, time):
raise NoDataException(
f'Request for data failed, raw request was {req.text}')

# Parse the direction from 10's of degrees to degrees:
data['direction'] = str(int(data['direction']) * 10)
# * Parse the direction from 10's of degrees to degrees
# * Convert direction from to direction it's blowing to
data['direction'] = Meteorology.process_direction(data['direction'])

# Convert data in KT to MPH:
data['speed'] = str(int(data['speed']) * 1.15078)
data['speed'] = Meteorology.process_speed(data['speed'])

# Get lat and long from MO Data:
lat, lon = reference_lat_long(site_id)
Expand Down Expand Up @@ -185,7 +209,7 @@ def get_nearby_site(site_id, badsites):
return int(result[0]), dist


def main(site_id, api_key=None):
def main(site_id):
cycle_point = os.environ['CYLC_TASK_CYCLE_POINT']

# Try to get the information from SYNOPS.
Expand All @@ -202,13 +226,8 @@ def main(site_id, api_key=None):
site_id, dist = get_nearby_site(site_id, badsites)

if obs is None:
if api_key:
print('Attempting to get weather data from DataPoint...')
data = get_datapoint_data(site_id, cycle_point, api_key)
else:
print('No API key provided, falling back to archived data')
data = get_archived_data(site_id, cycle_point)

print('Obs unavailable, falling back to archived data')
data = get_archived_data(site_id, cycle_point)
obs = extract_observations(data)

# Write observations.
Expand All @@ -218,5 +237,4 @@ def main(site_id, api_key=None):

if __name__ == '__main__':
util.sleep()
main(os.environ['SITE_ID'],
os.environ.get('API_KEY'))
main(os.environ['SITE_ID'])
144 changes: 82 additions & 62 deletions cylc/flow/etc/tutorial/cylc-forecasting-workflow/bin/get-rainfall
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ Usage:
get-rainfall

Environment Variables:
API_KEY: The DataPoint API key, required for getting live weather data.
If un-specified then get-observations will fall back to archive data
from the workflow directory.
DOMAIN: The area in which to generate forecasts for in the format
(lng1, lat1, lng2, lat2).
RESOLUTION: The length/width of each grid cell in degrees.
Expand All @@ -38,20 +35,20 @@ import math
import os
import shutil

import requests
import h5py
from pathlib import Path
import urllib
import requests # noqa: F401 - required implicitly by urllib.

try:
from PIL import Image
except ModuleNotFoundError:
# not all PIL installations are created equal
# sometimes we must import like this
import Image
from mercator import get_offset, get_scale, pos_to_coord
import util


URL = ('http://datapoint.metoffice.gov.uk/public/data/layer/wxobs/'
'RADAR_UK_Composite_Highres/png?TIME={time}&key={api_key}')
S3URL = (
'https://met-office-radar-obs-data.s3-eu-west-2.amazonaws.com/radar/'
'{Y}/{m}/{d}/{YYYYmmddHHMM}_ODIM_ng_radar_rainrate_composite_1km_UK.h5'
)
DEBUG=os.environ['CYLC_DEBUG']=='true'
CYLC_TASK_LOG_DIR=os.environ['CYLC_TASK_LOG_DIR']


class Rainfall(object):
Expand All @@ -62,16 +59,6 @@ class Rainfall(object):
resolution (float): The length of each grid cell in degrees.

"""
VALUE_MAP = {
(0, 0, 254, 255): 1,
(50, 101, 254, 255): 2,
(127, 127, 0, 255): 3,
(254, 203, 0, 255): 4,
(254, 152, 0, 255): 5,
(254, 0, 0, 255): 6,
(254, 0, 254, 255): 7
}

def __init__(self, domain, resolution):
self.resolution = resolution
self.domain = domain
Expand All @@ -98,10 +85,23 @@ class Rainfall(object):
"""
itt_x, itt_y = util.get_grid_coordinates(lng, lat, self.domain,
self.resolution)
try:
self.data[itt_y][itt_x].append(self.VALUE_MAP[value])
except KeyError:
pass

self.data[itt_y][itt_x].append(self.value_map(value))

@staticmethod
def value_map(v):
"""Convert rainfall rate values into colour space values.

Checks if rainfall value above each threshold in turn.

TODO:
- Unit test this
"""
thresholds = {32, 16, 8, 4, 2, 1, .5, .2}
for i, threshold in enumerate(sorted(thresholds, reverse=True)):
if v > threshold:
return 8 - i
return 0

def compute_bins(self):
"""Return this dataset as a 2D matrix."""
Expand All @@ -114,25 +114,6 @@ class Rainfall(object):
return self.data


def get_datapoint_radar_image(filename, time, api_key):
"""Retrieve a png image of rainfall from the DataPoint service.

Args:
filename (str): The path to write the image file to.
time (str): The datetime of the image to retrieve in ISO8601 format.
api_key (str): Datapoint API key.

"""
time = datetime.strptime(time, '%Y%m%dT%H%MZ').strftime(
'%Y-%m-%dT%H:%M:%SZ')
url = URL.format(time=time, api_key=api_key)
req = requests.get(url)
if req.status_code != 200:
raise Exception(f'{url} returned exit code {req.status_code}')
with open(filename, 'bw') as png_file:
png_file.write(req.content)


def get_archived_radar_image(filename, time):
"""Retrieve a png image from the archived data in the workflow directory.

Expand All @@ -147,8 +128,23 @@ def get_archived_radar_image(filename, time):
filename)


def get_amazon_radar_data(filename, time):
time = datetime.strptime(time, '%Y%m%dT%H%MZ')
url = S3URL.format(
Y=time.strftime('%Y'),
m=time.strftime('%m'),
d=time.strftime('%d'),
YYYYmmddHHMM=time.strftime('%Y%m%d%H%M')
)
print(f'[INFO] Getting data from {url=}')
data = urllib.request.urlopen(url).read()
with open(filename, 'wb') as fh:
fh.write(data)


# def process_rainfall_data(filename, resolution, domain):
def process_rainfall_data(filename, resolution, domain):
"""Generate a 2D matrix of data from the rainfall data in the image.
"""get_amazon_radar_dataGenerate a 2D matrix of data from the rainfall data in the image.

Args:
filename (str): Path to the png image to process.
Expand All @@ -160,37 +156,61 @@ def process_rainfall_data(filename, resolution, domain):
list - A 2D matrix of rainfall data.

"""
print(f'[INFO] Analysing data from {filename}')
data = h5py.File(filename)['dataset1']['data1']['data']
rainfall = Rainfall(domain, resolution)

image = Image.open(filename)
scale = get_scale(domain, image.width)
offset = get_offset(domain, scale)
_height, width = data.shape

scale = get_scale(domain, width)
offset = (-1100.8461538461539, 1400.6953225710452)

if DEBUG:
print(f'[INFO] {scale=}, {offset=}')
from matplotlib import pyplot as plt
Path(CYLC_TASK_LOG_DIR).mkdir(parents=True, exist_ok=True)
plt.imshow(data)
plt.savefig(f'{CYLC_TASK_LOG_DIR}/raw.data.png')

for itt_x in range(image.width):
for itt_y in range(image.height):
for itt_y, row in enumerate(data):
for itt_x, col in enumerate(row):
lng, lat = pos_to_coord(
itt_x,
itt_y * (2. / 3.), # Counter aspect ratio.
offset, scale)
rainfall.add(lng, lat, image.getpixel((itt_x, itt_y)))
offset,
scale
)
val = float(col)
# Original data uses -1 to indicate radar mask
val = 0 if val == -1 else val
rainfall.add(lng, lat, val)
data = rainfall.compute_bins()

return rainfall.compute_bins()
if DEBUG:
plt.imshow(data)
plt.savefig(f'{CYLC_TASK_LOG_DIR}/processed.data.png')

return data

def main():
time = os.environ['CYLC_TASK_CYCLE_POINT']
resolution = float(os.environ['RESOLUTION'])
domain = util.parse_domain(os.environ['DOMAIN'])
api_key = os.environ.get('API_KEY')

if api_key:
print('Attempting to get weather data from the DataPoint service.')
get_datapoint_radar_image('rainfall-radar.png', time, api_key)
# Acts as a switch - if a file-name is given, use that file-name
canned_data = os.environ.get('CANNED_DATA', 'fetch')

if canned_data == 'fetch':
print(f'[INFO] Attempting to get rainfall data from S3 bucket')
get_amazon_radar_data('radardata.h5', time)
canned_data = 'radardata.h5'
else:
print('No API key provided, falling back to archived data')
get_archived_radar_image('rainfall-radar.png', time)
print(
f'[INFO] Canned data provided: {canned_data}')
get_archived_radar_data(canned_data, time)

data = process_rainfall_data(canned_data, resolution, domain)

data = process_rainfall_data('rainfall-radar.png', resolution, domain)
util.write_csv('rainfall.csv', data)


Expand Down
Loading
Loading