Skip to content

Commit 7457a2c

Browse files
committed
Merge branch 'dev' into leonlu2/test_signal_wildcard
2 parents 1ee8e64 + b823f09 commit 7457a2c

File tree

23 files changed

+568
-401
lines changed

23 files changed

+568
-401
lines changed

.bumpversion.cfg

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 0.4.1
2+
current_version = 0.4.2
33
commit = False
44
tag = False
55

dev/local/Makefile

+14
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,20 @@ test:
139139
--env "FLASK_SECRET=abc" \
140140
delphi_web_python python -m pytest --import-mode importlib $(pdb) $(test) | tee test_output_$(NOW).log
141141

142+
.PHONY=bash
143+
bash:
144+
@docker run -it --rm --network delphi-net \
145+
--mount type=bind,source=$(CWD)repos/delphi/delphi-epidata,target=/usr/src/app/repos/delphi/delphi-epidata,readonly \
146+
--mount type=bind,source=$(CWD)repos/delphi/delphi-epidata/src,target=/usr/src/app/delphi/epidata,readonly \
147+
--env "SQLALCHEMY_DATABASE_URI=mysql+mysqldb://user:pass@delphi_database_epidata:3306/epidata" \
148+
--env "FLASK_SECRET=abc" \
149+
delphi_web_python bash
150+
151+
.PHONY=sql
152+
sql:
153+
@docker run --rm -it --network delphi-net --cap-add=sys_nice \
154+
percona mysql --user=user --password=pass --port 3306 --host delphi_database_epidata epidata
155+
142156
.PHONY=clean
143157
clean:
144158
@docker images -f "dangling=true" -q | xargs docker rmi >/dev/null 2>&1

integrations/acquisition/covid_hosp/facility/test_scenarios.py

+27-10
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def test_acquire_dataset(self):
7676
response = Epidata.covid_hosp_facility(
7777
'450822', Epidata.range(20200101, 20210101))
7878
self.assertEqual(response['result'], 1)
79-
self.assertEqual(len(response['epidata']), 1)
79+
self.assertEqual(len(response['epidata']), 2)
8080
row = response['epidata'][0]
8181
for k,v in expected_spotchecks.items():
8282
self.assertTrue(
@@ -101,9 +101,9 @@ def test_acquire_dataset(self):
101101
response = Epidata.covid_hosp_facility(
102102
'450822', Epidata.range(20200101, 20210101))
103103
self.assertEqual(response['result'], 1)
104-
self.assertEqual(len(response['epidata']), 1)
104+
self.assertEqual(len(response['epidata']), 2)
105105

106-
@freeze_time("2021-03-16")
106+
@freeze_time("2021-03-17")
107107
def test_facility_lookup(self):
108108
"""Lookup facilities using various filters."""
109109

@@ -120,7 +120,7 @@ def test_facility_lookup(self):
120120
self.assertTrue(acquired)
121121

122122
# texas ground truth, sorted by `hospital_pk`
123-
# see sample data at testdata/acquisition/covid_hosp/facility/dataset_old.csv
123+
# see sample data at testdata/acquisition/covid_hosp/facility/dataset.csv
124124
texas_hospitals = [{
125125
'hospital_pk': '450771',
126126
'state': 'TX',
@@ -139,7 +139,7 @@ def test_facility_lookup(self):
139139
'hospital_name': 'MEDICAL CITY LAS COLINAS',
140140
'address': '6800 N MACARTHUR BLVD',
141141
'city': 'IRVING',
142-
'zip': '75039',
142+
'zip': '77777', # most-recent collection week should take precedence
143143
'hospital_subtype': 'Short Term',
144144
'fips_code': '48113',
145145
'is_metro_micro': 1,
@@ -150,7 +150,7 @@ def test_facility_lookup(self):
150150
'hospital_name': 'RANKIN HOSPITAL MEDICAL CLINIC',
151151
'address': '1611 SPUR 576',
152152
'city': 'RANKIN',
153-
'zip': '79778',
153+
'zip': '99999', # most-recent collection week should take precedence
154154
'hospital_subtype': 'Critical Access Hospitals',
155155
'fips_code': '48461',
156156
'is_metro_micro': 0,
@@ -160,16 +160,16 @@ def test_facility_lookup(self):
160160
response = Epidata.covid_hosp_facility_lookup(state='tx')
161161
self.assertEqual(response['epidata'], texas_hospitals)
162162

163-
with self.subTest(name='by ccn'):
164-
response = Epidata.covid_hosp_facility_lookup(ccn='450771')
163+
with self.subTest(name='by zip'):
164+
response = Epidata.covid_hosp_facility_lookup(zip='75093')
165165
self.assertEqual(response['epidata'], texas_hospitals[0:1])
166166

167167
with self.subTest(name='by city'):
168168
response = Epidata.covid_hosp_facility_lookup(city='irving')
169169
self.assertEqual(response['epidata'], texas_hospitals[1:2])
170170

171-
with self.subTest(name='by zip'):
172-
response = Epidata.covid_hosp_facility_lookup(zip='79778')
171+
with self.subTest(name='by ccn'):
172+
response = Epidata.covid_hosp_facility_lookup(ccn='451329')
173173
self.assertEqual(response['epidata'], texas_hospitals[2:3])
174174

175175
with self.subTest(name='by fips_code'):
@@ -179,3 +179,20 @@ def test_facility_lookup(self):
179179
with self.subTest(name='no results'):
180180
response = Epidata.covid_hosp_facility_lookup(state='not a state')
181181
self.assertEqual(response['result'], -2)
182+
183+
# update facility info
184+
mock_network = MagicMock()
185+
mock_network.fetch_metadata.return_value = \
186+
self.test_utils.load_sample_metadata('metadata_update_facility.csv')
187+
mock_network.fetch_dataset.return_value = \
188+
self.test_utils.load_sample_dataset('dataset_update_facility.csv')
189+
190+
# acquire sample data into local database
191+
with self.subTest(name='second acquisition'):
192+
acquired = Update.run(network=mock_network)
193+
self.assertTrue(acquired)
194+
195+
texas_hospitals[1]['zip'] = '88888'
196+
with self.subTest(name='by city after update'):
197+
response = Epidata.covid_hosp_facility_lookup(city='irving')
198+
self.assertEqual(response['epidata'], texas_hospitals[1:2])

src/acquisition/covid_hosp/common/database.py

+57-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Common database code used by multiple `covid_hosp` scrapers."""
22

33
# standard library
4+
from collections import namedtuple
45
from contextlib import contextmanager
56
import math
67

@@ -11,13 +12,15 @@
1112
# first party
1213
import delphi.operations.secrets as secrets
1314

15+
Columndef = namedtuple("Columndef", "csv_name sql_name dtype")
1416

1517
class Database:
1618

1719
def __init__(self,
1820
connection,
1921
table_name=None,
2022
columns_and_types=None,
23+
key_columns=None,
2124
additional_fields=None):
2225
"""Create a new Database object.
2326
@@ -39,7 +42,11 @@ def __init__(self,
3942
self.table_name = table_name
4043
self.publication_col_name = "issue" if table_name == 'covid_hosp_state_timeseries' else \
4144
'publication_date'
42-
self.columns_and_types = columns_and_types
45+
self.columns_and_types = {
46+
c.csv_name: c
47+
for c in (columns_and_types if columns_and_types is not None else [])
48+
}
49+
self.key_columns = key_columns if key_columns is not None else []
4350
self.additional_fields = additional_fields if additional_fields is not None else []
4451

4552
@classmethod
@@ -151,26 +158,67 @@ def insert_dataset(self, publication_date, dataframe):
151158
The dataset.
152159
"""
153160
dataframe_columns_and_types = [
154-
x for x in self.columns_and_types if x[0] in dataframe.columns
161+
x for x in self.columns_and_types.values() if x.csv_name in dataframe.columns
155162
]
163+
164+
def nan_safe_dtype(dtype, value):
165+
if isinstance(value, float) and math.isnan(value):
166+
return None
167+
return dtype(value)
168+
169+
# first convert keys and save the results; we'll need them later
170+
for csv_name in self.key_columns:
171+
dataframe.loc[:, csv_name] = dataframe[csv_name].map(self.columns_and_types[csv_name].dtype)
172+
156173
num_columns = 2 + len(dataframe_columns_and_types) + len(self.additional_fields)
157174
value_placeholders = ', '.join(['%s'] * num_columns)
158-
columns = ', '.join(f'`{i[1]}`' for i in dataframe_columns_and_types + self.additional_fields)
175+
columns = ', '.join(f'`{i.sql_name}`' for i in dataframe_columns_and_types + self.additional_fields)
159176
sql = f'INSERT INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columns}) ' \
160177
f'VALUES ({value_placeholders})'
161178
id_and_publication_date = (0, publication_date)
162179
with self.new_cursor() as cursor:
163180
for _, row in dataframe.iterrows():
164181
values = []
165-
for name, _, dtype in dataframe_columns_and_types:
166-
if isinstance(row[name], float) and math.isnan(row[name]):
167-
values.append(None)
168-
else:
169-
values.append(dtype(row[name]))
182+
for c in dataframe_columns_and_types:
183+
values.append(nan_safe_dtype(c.dtype, row[c.csv_name]))
170184
cursor.execute(sql,
171185
id_and_publication_date +
172186
tuple(values) +
173-
tuple(i[0] for i in self.additional_fields))
187+
tuple(i.csv_name for i in self.additional_fields))
188+
189+
# deal with non/seldomly updated columns used like a fk table (if this database needs it)
190+
if hasattr(self, 'AGGREGATE_KEY_COLS'):
191+
ak_cols = self.AGGREGATE_KEY_COLS
192+
193+
# restrict data to just the key columns and remove duplicate rows
194+
# sort by key columns to ensure that the last ON DUPLICATE KEY overwrite
195+
# uses the most-recent aggregate key information
196+
ak_data = (dataframe[set(ak_cols + self.key_columns)]
197+
.sort_values(self.key_columns)[ak_cols]
198+
.drop_duplicates())
199+
# cast types
200+
for col in ak_cols:
201+
ak_data[col] = ak_data[col].map(
202+
lambda value: nan_safe_dtype(self.columns_and_types[col].dtype, value)
203+
)
204+
# fix NULLs
205+
ak_data = ak_data.to_numpy(na_value=None).tolist()
206+
207+
# create string of tick-quoted and comma-seperated column list
208+
ak_cols_str = ','.join(f'`{col}`' for col in ak_cols)
209+
# ...and ticked and comma-sep'd "column=column" list for ON UPDATE (to keep only the most recent values for each pk)
210+
ak_updates_str = ','.join(f'`{col}`=v.{col}' for col in ak_cols)
211+
# ...and string of VALUES placeholders
212+
values_str = ','.join( ['%s'] * len(ak_cols) )
213+
# use aggregate key table alias
214+
ak_table = self.table_name + '_key'
215+
# assemble full SQL statement
216+
ak_insert_sql = f'INSERT INTO `{ak_table}` ({ak_cols_str}) VALUES ({values_str}) AS v ON DUPLICATE KEY UPDATE {ak_updates_str}'
217+
218+
# commit the data
219+
with self.new_cursor() as cur:
220+
cur.executemany(ak_insert_sql, ak_data)
221+
174222

175223
def get_max_issue(self):
176224
"""Fetch the most recent issue.

src/acquisition/covid_hosp/common/utils.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@ def int_from_date(date):
3535
int
3636
Date in YYYYMMDD format.
3737
"""
38-
39-
return int(date[:10].replace('/', '').replace('-', ''))
38+
if isinstance(date, str):
39+
return int(date[:10].replace('/', '').replace('-', ''))
40+
return date
4041

4142
def parse_bool(value):
4243
"""Convert a string to a boolean.
@@ -86,6 +87,7 @@ def issues_to_fetch(metadata, newer_than, older_than):
8687
for issues after newer_than and before older_than
8788
"""
8889
daily_issues = {}
90+
n_beyond = 0
8991
for index in sorted(set(metadata.index)):
9092
day = index.date()
9193
if day > newer_than and day < older_than:
@@ -95,6 +97,10 @@ def issues_to_fetch(metadata, newer_than, older_than):
9597
daily_issues[day] = urls_list
9698
else:
9799
daily_issues[day] += urls_list
100+
elif day >= older_than:
101+
n_beyond += 1
102+
if n_beyond > 0:
103+
print(f"{n_beyond} issues available on {older_than} or newer")
98104
return daily_issues
99105

100106
@staticmethod

0 commit comments

Comments
 (0)