Skip to content

Commit 5f72ebe

Browse files
committed
Server-side compute: add 7day averaging
1 parent 868e5e5 commit 5f72ebe

File tree

3 files changed

+158
-3
lines changed

3 files changed

+158
-3
lines changed

integrations/server/test_covidcast_endpoints.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,33 @@ def test_derived_signals(self):
225225
expected_values = self._diff_rows(values1) + self._diff_rows(values2)
226226
self.assertAlmostEqual(out_values, expected_values)
227227

228+
with self.subTest("smoothed signal"):
229+
out = self._fetch("/", signal="jhu-csse:confirmed_7dav_incidence_num", geo=first.geo_pair, time="day:20200401-20200410")
230+
self.assertEqual(len(out["epidata"]), len(rows01))
231+
out_values = [row["value"] for row in out["epidata"]]
232+
values = [None] * 7 + [value for _, value in time_value_pairs]
233+
expected_values = self._smooth_rows(self._diff_rows(values))
234+
self.assertAlmostEqual(out_values, expected_values)
235+
236+
with self.subTest("diffed signal and smoothed signal in one request"):
237+
out = self._fetch("/", signal="jhu-csse:confirmed_incidence_num;jhu-csse:confirmed_7dav_incidence_num", geo=first.geo_pair, time="day:20200401-20200410")
238+
self.assertEqual(len(out["epidata"]), 2*len(rows01) + 6)
239+
out_values = [row["value"] for row in out["epidata"]]
240+
values = [None] * 7 + [value for _, value in time_value_pairs]
241+
expected_diff = self._diff_rows(values)
242+
expected_smoothed = self._smooth_rows(expected_diff)
243+
expected_values = list(interleave_longest(expected_smoothed, expected_diff))
244+
self.assertAlmostEqual(out_values, expected_values)
245+
246+
with self.subTest("smoothing with time window resizing"):
247+
# should fetch 6 extra days
248+
out = self._fetch("/", signal="jhu-csse:confirmed_7dav_incidence_num", geo=first.geo_pair, time="day:20200407-20200410")
249+
self.assertEqual(len(out["epidata"]), len(rows01) - 6)
250+
out_values = [row["value"] for row in out["epidata"]]
251+
# an extra None is added because the padding for DIFF_SMOOTH (pad_length = 8) is used
252+
values = [None] + [value for _, value in time_value_pairs]
253+
expected_values = self._smooth_rows(self._diff_rows(values))
254+
self.assertAlmostEqual(out_values, expected_values)
228255

229256
with self.subTest("diffing with time window resizing"):
230257
# should fetch 1 extra day
@@ -240,6 +267,15 @@ def test_derived_signals(self):
240267
first = rows[0]
241268
self._insert_rows(rows)
242269

270+
with self.subTest("smoothing with a time gap"):
271+
# should fetch 1 extra day
272+
out = self._fetch("/", signal="jhu-csse:confirmed_7dav_incidence_num", geo=first.geo_pair, time="day:20200401-20200420")
273+
self.assertEqual(len(out["epidata"]), len(rows) + 5)
274+
out_values = [row["value"] for row in out["epidata"]]
275+
values = [None] * 7 + [value for _, value in time_value_pairs][:10] + [None] * 5 + [value for _, value in time_value_pairs][10:]
276+
expected_values = self._smooth_rows(self._diff_rows(values))
277+
self.assertAlmostEqual(out_values, expected_values)
278+
243279
with self.subTest("diffing with a time gap"):
244280
# should fetch 1 extra day
245281
out = self._fetch("/", signal="jhu-csse:confirmed_incidence_num", geo=first.geo_pair, time="day:20200401-20200420")

src/server/endpoints/covidcast_utils/smooth_diff.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from more_itertools import windowed
66
from numpy import nan, nan_to_num, array, dot, isnan
77

8+
from delphi_utils.nancodes import Nans
89

910
class PadFillValue(str, Enum):
1011
first = "first"
@@ -16,6 +17,27 @@ class SmootherKernelValue(str, Enum):
1617
average = "average"
1718

1819

20+
def _smoother(values: List[Number], kernel: Optional[Union[List[Number], SmootherKernelValue]] = None) -> Number:
21+
"""Basic smoother.
22+
23+
If kernel passed, uses the kernel as summation weights. If something is wrong,
24+
defaults to the mean.
25+
"""
26+
try:
27+
if kernel and isinstance(kernel, list):
28+
kernel = array(kernel, copy=False)
29+
values = array(values, copy=False)
30+
smoothed_value = dot(values, kernel)
31+
return smoothed_value
32+
else:
33+
raise ValueError
34+
except (ValueError, TypeError):
35+
try:
36+
smoothed_value = array(values).mean()
37+
except (ValueError, TypeError):
38+
smoothed_value = None
39+
40+
return smoothed_value
1941

2042

2143
def generate_smooth_rows(
@@ -52,8 +74,25 @@ def generate_smooth_rows(
5274
**kwargs:
5375
Container for non-shared parameters with other computation functions.
5476
"""
55-
for window in windowed(rows, smoother_window_length):
56-
yield smoothed_row(window)
77+
# Validate params.
78+
if not isinstance(smoother_window_length, int) or (isinstance(smoother_window_length, int) and smoother_window_length < 1):
79+
smoother_window_length = 7
80+
if isinstance(smoother_kernel, list):
81+
smoother_window_length = len(smoother_kernel)
82+
if not isinstance(nan_fill_value, Number):
83+
nan_fill_value = nan
84+
if not isinstance(smoother_kernel, (list, SmootherKernelValue)):
85+
smoother_kernel = SmootherKernelValue.average
86+
87+
for window in windowed(rows, smoother_window_length): # Iterable[List[Dict]]
88+
if all(e is None for e in window):
89+
continue
90+
value = _smoother(nan_to_num([e.get("value") if e else nan_fill_value for e in window], nan=nan_fill_value), smoother_kernel)
91+
value = float(round(value, 8)) if value and not isnan(value) else None
92+
last_item = list(filter(lambda e: e is not None, window))[-1]
93+
item = last_item.copy() # inherit values of last time stamp
94+
item.update({"value": value, "stderr": None, "sample_size": None, "missing_value": Nans.NOT_MISSING if value is not None else Nans.NOT_APPLICABLE, "missing_stderr": Nans.NOT_APPLICABLE, "missing_sample_size": Nans.NOT_APPLICABLE})
95+
yield item
5796

5897

5998
def generate_row_diffs(rows: Iterable[Dict], nan_fill_value: Number = nan, **kwargs) -> Iterable[Dict]:

tests/server/endpoints/covidcast_utils/test_smooth_diff.py

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,90 @@
55
from more_itertools import windowed
66

77
from delphi.epidata.server.utils import date_to_time_value, CovidcastRecords
8-
from delphi.epidata.server.endpoints.covidcast_utils.smooth_diff import generate_row_diffs, generate_smooth_rows
8+
from delphi.epidata.server.endpoints.covidcast_utils.smooth_diff import generate_row_diffs, generate_smooth_rows, _smoother
99

1010

1111
class TestStreaming:
12+
def test__smoother(self):
13+
assert _smoother(list(range(7)), [1] * 7) == sum(range(7))
14+
assert _smoother([1] * 6, list(range(7))) == sum([1] * 6) / 6
15+
16+
17+
def test_generate_smooth_rows(self):
18+
# an empty dataframe should return an empty dataframe
19+
data = DataFrame({})
20+
smoothed_df = DataFrame.from_records(generate_smooth_rows(data.to_dict(orient='records')))
21+
expected_df = DataFrame({})
22+
assert_frame_equal(smoothed_df, expected_df)
23+
24+
# a dataframe with a single entry should return a single nan value
25+
data = CovidcastRecords(
26+
time_values=[20210501],
27+
values=[1.0]
28+
).as_dataframe()
29+
smoothed_df = DataFrame.from_records(generate_smooth_rows(data.to_dict(orient='records')))
30+
expected_df = CovidcastRecords(
31+
time_values=[20210501],
32+
values=[None],
33+
stderrs=[None],
34+
sample_sizes=[None]
35+
).as_dataframe()
36+
assert_frame_equal(smoothed_df, expected_df)
37+
38+
data = CovidcastRecords(
39+
time_values=date_range("2021-05-01", "2021-05-10"),
40+
values=chain(range(7), [None, 2., 1.])
41+
).as_dataframe()
42+
43+
# regular window, nan fill
44+
smoothed_df = DataFrame.from_records(generate_smooth_rows(data.to_dict(orient='records')))
45+
expected_df = CovidcastRecords(
46+
time_values=date_range("2021-05-07", "2021-05-10"),
47+
values=(sum(x)/len(x) if None not in x else None for x in windowed(chain(range(7), [None, 2., 1.]), 7)),
48+
stderrs=[None]*4,
49+
sample_sizes=[None]*4,
50+
).as_dataframe()
51+
assert_frame_equal(smoothed_df, expected_df)
52+
53+
# regular window, 0 fill
54+
smoothed_df = DataFrame.from_records(generate_smooth_rows(data.to_dict(orient='records'), nan_fill_value=0.))
55+
expected_df = CovidcastRecords(
56+
time_values=date_range("2021-05-07", "2021-05-10"),
57+
values=(sum(x)/len(x) if None not in x else None for x in windowed(chain(range(7), [0., 2., 1.]), 7)),
58+
stderrs=[None]*4,
59+
sample_sizes=[None]*4,
60+
).as_dataframe()
61+
assert_frame_equal(smoothed_df, expected_df)
62+
63+
# regular window, different window length
64+
smoothed_df = DataFrame.from_records(generate_smooth_rows(data.to_dict(orient='records'), smoother_window_length = 8))
65+
expected_df = CovidcastRecords(
66+
time_values=date_range("2021-05-08", "2021-05-10"),
67+
values=(sum(x)/len(x) if None not in x else None for x in windowed(chain(range(7), [None, 2., 1.]), 8)),
68+
stderrs=[None]*3,
69+
sample_sizes=[None]*3,
70+
).as_dataframe()
71+
assert_frame_equal(smoothed_df, expected_df)
72+
73+
# regular window, different kernel
74+
smoothed_df = DataFrame.from_records(generate_smooth_rows(data.to_dict(orient='records'), smoother_kernel = list(range(8))))
75+
expected_df = CovidcastRecords(
76+
time_values=date_range("2021-05-08", "2021-05-10"),
77+
values=(sum([i * j for i, j in zip(x, range(8))])/len(x) if None not in x else None for x in windowed(chain(range(7), [None, 2., 1.]), 8)),
78+
stderrs=[None]*3,
79+
sample_sizes=[None]*3,
80+
).as_dataframe()
81+
assert_frame_equal(smoothed_df, expected_df)
82+
83+
# conflicting smoother args validation
84+
smoothed_df = DataFrame.from_records(generate_smooth_rows(data.to_dict(orient='records'), smoother_kernel=[1/7.]*7, smoother_window_length=10))
85+
expected_df = CovidcastRecords(
86+
time_values=date_range("2021-05-07", "2021-05-10"),
87+
values=(sum([i * j for i, j in zip(x, [1/7.]*7)]) if None not in x else None for x in windowed(chain(range(7), [None, 2., 1.]), 7)),
88+
stderrs=[None]*4,
89+
sample_sizes=[None]*4,
90+
).as_dataframe()
91+
assert_frame_equal(smoothed_df, expected_df)
1292

1393

1494
def test_generate_row_diffs(self):

0 commit comments

Comments
 (0)