Skip to content

Commit 158d8aa

Browse files
committed
[covid_hosp:f] Fix geocode acquisition and running time.
* Switch to executemany * Add new limited_geocode datatype * Fix test prep missing from #1030
1 parent a55cc10 commit 158d8aa

File tree

10 files changed

+71
-25
lines changed

10 files changed

+71
-25
lines changed

integrations/acquisition/covid_hosp/facility/test_scenarios.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def setUp(self):
3838
with Database.connect() as db:
3939
with db.new_cursor() as cur:
4040
cur.execute('truncate table covid_hosp_facility')
41+
cur.execute('truncate table covid_hosp_facility_key')
4142
cur.execute('truncate table covid_hosp_meta')
4243

4344
@freeze_time("2021-03-16")

src/acquisition/covid_hosp/common/database.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,21 +186,37 @@ def nan_safe_dtype(dtype, value):
186186
f'VALUES ({value_placeholders})'
187187
id_and_publication_date = (0, publication_date)
188188
if logger:
189-
logger.info("updating values")
189+
logger.info('updating values', count=len(dataframe.index))
190+
n = 0
191+
many_values = []
190192
with self.new_cursor() as cursor:
191-
for _, row in dataframe.iterrows():
193+
for index, row in dataframe.iterrows():
192194
values = []
193195
for c in dataframe_columns_and_types:
194196
values.append(nan_safe_dtype(c.dtype, row[c.csv_name]))
195-
cursor.execute(sql,
196-
id_and_publication_date +
197-
tuple(values) +
198-
tuple(i.csv_name for i in self.additional_fields))
197+
many_values.append(id_and_publication_date +
198+
tuple(values) +
199+
tuple(i.csv_name for i in self.additional_fields))
200+
n += 1
201+
# insert in batches because one at a time is slow and all at once makes
202+
# the connection drop :(
203+
if n % 5_000 == 0:
204+
try:
205+
cursor.executemany(sql, many_values)
206+
many_values = []
207+
except Exception as e:
208+
if logger:
209+
logger.info('error on insert', index=index, values=values)
210+
logger.error(e)
211+
raise e
212+
# insert final batch
213+
if many_values:
214+
cursor.executemany(sql, many_values)
199215

200216
# deal with non/seldomly updated columns used like a fk table (if this database needs it)
201217
if hasattr(self, 'AGGREGATE_KEY_COLS'):
202218
if logger:
203-
logger.info("updating keys")
219+
logger.info('updating keys')
204220
ak_cols = self.AGGREGATE_KEY_COLS
205221

206222
# restrict data to just the key columns and remove duplicate rows
@@ -227,6 +243,8 @@ def nan_safe_dtype(dtype, value):
227243
ak_table = self.table_name + '_key'
228244
# assemble full SQL statement
229245
ak_insert_sql = f'INSERT INTO `{ak_table}` ({ak_cols_str}) VALUES ({values_str}) AS v ON DUPLICATE KEY UPDATE {ak_updates_str}'
246+
if logger:
247+
logger.info("database query", sql=ak_insert_sql)
230248

231249
# commit the data
232250
with self.new_cursor() as cur:

src/acquisition/covid_hosp/common/network.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ def fetch_metadata_for_dataset(dataset_id, logger=False):
1313
----------
1414
dataset_id : str
1515
healthdata.gov dataset identifier of the dataset.
16+
logger : structlog.Logger [optional; default False]
17+
Logger to receive messages.
1618
1719
Returns
1820
-------
@@ -39,6 +41,8 @@ def fetch_dataset(url, pandas_impl=pandas, logger=False):
3941
----------
4042
url : str
4143
URL to the dataset in CSV format.
44+
logger : structlog.Logger [optional; default False]
45+
Logger to receive messages.
4246
4347
Returns
4448
-------

src/acquisition/covid_hosp/common/utils.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,17 @@ def limited_string(value):
7979
return value
8080
return limited_string
8181

82+
GEOCODE_LENGTH = 32
83+
GEOCODE_PATTERN = re.compile(r'POINT \(([0-9.-]*) ([0-9.-]*)\)')
84+
def limited_geocode(value):
85+
if len(value) < Utils.GEOCODE_LENGTH:
86+
return value
87+
# otherwise parse and reduce precision to 5
88+
m = Utils.GEOCODE_PATTERN.match(value)
89+
if not m:
90+
raise CovidHospException(f"Couldn't parse geocode '{value}'")
91+
return f'POINT ({" ".join(map(lambda x: f"{float(x):.6f}", m.groups()))})'
92+
8293
def issues_to_fetch(metadata, newer_than, older_than, logger=False):
8394
"""
8495
Construct all issue dates and URLs to be ingested based on metadata.
@@ -100,6 +111,7 @@ def issues_to_fetch(metadata, newer_than, older_than, logger=False):
100111
"""
101112
daily_issues = {}
102113
n_beyond = 0
114+
n_selected = 0
103115
for index in sorted(set(metadata.index)):
104116
day = index.date()
105117
if day > newer_than and day < older_than:
@@ -109,11 +121,13 @@ def issues_to_fetch(metadata, newer_than, older_than, logger=False):
109121
daily_issues[day] = urls_list
110122
else:
111123
daily_issues[day] += urls_list
124+
n_selected += len(urls_list)
112125
elif day >= older_than:
113126
n_beyond += 1
114-
if n_beyond > 0:
115-
if logger:
116-
logger.info("issues available", on_or_newer=older_than, count=n_beyond)
127+
if logger:
128+
if n_beyond > 0:
129+
logger.info("issues available beyond selection", on_or_newer=older_than, count=n_beyond)
130+
logger.info("issues selected", newer_than=str(newer_than), older_than=str(older_than), count=n_selected)
117131
return daily_issues
118132

119133
@staticmethod
@@ -173,7 +187,7 @@ def update_dataset(database, network, newer_than=None, older_than=None):
173187
bool
174188
Whether a new dataset was acquired.
175189
"""
176-
logger = get_structured_logger(f"{database.__class__.__module__}.{database.__class__.__name__}.update_dataset")
190+
logger = get_structured_logger(f"{database.__module__}.{database.__name__}.update_dataset")
177191

178192
metadata = network.fetch_metadata(logger=logger)
179193
datasets = []
@@ -189,7 +203,7 @@ def update_dataset(database, network, newer_than=None, older_than=None):
189203
for issue, revisions in daily_issues.items():
190204
issue_int = int(issue.strftime("%Y%m%d"))
191205
# download the dataset and add it to the database
192-
dataset = Utils.merge_by_key_cols([network.fetch_dataset(url) for url, _ in revisions],
206+
dataset = Utils.merge_by_key_cols([network.fetch_dataset(url, logger=logger) for url, _ in revisions],
193207
db.KEY_COLS,
194208
logger=logger)
195209
# add metadata to the database

src/acquisition/covid_hosp/facility/database.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class Database(BaseDatabase):
4040
Columndef('ccn', 'ccn', str),
4141
Columndef('city', 'city', str),
4242
Columndef('fips_code', 'fips_code', str),
43-
Columndef('geocoded_hospital_address', 'geocoded_hospital_address', Utils.limited_string_fn(32)),
43+
Columndef('geocoded_hospital_address', 'geocoded_hospital_address', Utils.limited_geocode),
4444
Columndef('hhs_ids', 'hhs_ids', str),
4545
Columndef('hospital_name', 'hospital_name', str),
4646
Columndef('hospital_subtype', 'hospital_subtype', str),

tests/acquisition/covid_hosp/common/test_database.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,9 @@ def test_insert_dataset(self):
144144
result = database.insert_dataset(sentinel.publication_date, dataset)
145145

146146
self.assertIsNone(result)
147-
self.assertEqual(mock_cursor.execute.call_count, 6)
147+
self.assertEqual(mock_cursor.executemany.call_count, 1)
148148

149-
actual_sql = mock_cursor.execute.call_args[0][0]
149+
actual_sql = mock_cursor.executemany.call_args[0][0]
150150
self.assertIn(
151151
'INSERT INTO `test_table` (`id`, `publication_date`, `sql_str_col`, `sql_int_col`, `sql_float_col`)',
152152
actual_sql)
@@ -162,5 +162,9 @@ def test_insert_dataset(self):
162162

163163
for i, expected in enumerate(expected_values):
164164
with self.subTest(name=f'row {i + 1}'):
165-
actual = mock_cursor.execute.call_args_list[i][0][1]
165+
# [0]: the first call() object
166+
# [0]: get positional args out of the call() object
167+
# [-1]: the last arg of the executemany call
168+
# [i]: the ith row inserted in the executemany
169+
actual = mock_cursor.executemany.call_args_list[0][0][-1][i]
166170
self.assertEqual(actual, (0, sentinel.publication_date) + expected)

tests/acquisition/covid_hosp/common/test_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def test_run_skip_old_dataset(self):
9797
mock_network = MagicMock()
9898
mock_network.fetch_metadata.return_value = \
9999
self.test_utils.load_sample_metadata()
100-
mock_database = MagicMock()
100+
mock_database = MagicMock(**{"__module__":"test_module", "__name__":"test_name"})
101101
with mock_database.connect() as mock_connection:
102102
pass
103103
mock_connection.get_max_issue.return_value = pd.Timestamp("2200/1/1")
@@ -117,7 +117,7 @@ def test_run_acquire_new_dataset(self):
117117
self.test_utils.load_sample_metadata()
118118
fake_dataset = pd.DataFrame({"date": [pd.Timestamp("2020/1/1")], "state": ["ca"]})
119119
mock_network.fetch_dataset.return_value = fake_dataset
120-
mock_database = MagicMock()
120+
mock_database = MagicMock(**{"__module__":"test_module", "__name__":"test_name"})
121121
with mock_database.connect() as mock_connection:
122122
pass
123123
type(mock_connection).KEY_COLS = PropertyMock(return_value=["state", "date"])

tests/acquisition/covid_hosp/facility/test_database.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,14 @@ def test_insert_dataset(self):
3535
result = database.insert_dataset(sentinel.publication_date, dataset)
3636

3737
self.assertIsNone(result)
38-
self.assertEqual(mock_cursor.execute.call_count, 22)
39-
40-
last_query_values = mock_cursor.execute.call_args[0][-1]
38+
# once for the values, once for the keys
39+
self.assertEqual(mock_cursor.executemany.call_count, 2)
40+
41+
# [0]: the first call() object
42+
# [0]: get the positional args out of the call() object
43+
# [-1]: the last arg of the executemany call
44+
# [-1]: the last row inserted in the executemany
45+
last_query_values = mock_cursor.executemany.call_args_list[0][0][-1][-1]
4146
expected_query_values = (
4247
0, sentinel.publication_date, '450822', 20201130,
4348
'6800 N MACARTHUR BLVD', 61.1, 7, 428, 60.9, 7, 426, 61.1, 7, 428,

tests/acquisition/covid_hosp/state_daily/test_database.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ def test_insert_dataset(self):
3838
result = database.insert_dataset(sentinel.issue, dataset)
3939

4040
self.assertIsNone(result)
41-
self.assertEqual(mock_cursor.execute.call_count, 53)
41+
self.assertEqual(mock_cursor.executemany.call_count, 1)
4242

43-
last_query_values = mock_cursor.execute.call_args[0][-1]
43+
last_query_values = mock_cursor.executemany.call_args[0][-1][-1]
4444
expected_query_values = (
4545
0, sentinel.issue, 'WY', 20201209,
4646
0.2519685039370078, 29, 127, 32, 0.4233576642335766, 31, 137, 58, 22, 2,

tests/acquisition/covid_hosp/state_timeseries/test_database.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ def test_insert_dataset(self):
3636
result = database.insert_dataset(sentinel.issue, dataset)
3737

3838
self.assertIsNone(result)
39-
self.assertEqual(mock_cursor.execute.call_count, 22)
39+
self.assertEqual(mock_cursor.executemany.call_count, 1)
4040

41-
last_query_values = mock_cursor.execute.call_args[0][-1]
41+
last_query_values = mock_cursor.executemany.call_args[0][-1][-1]
4242
expected_query_values = (
4343
0, sentinel.issue, 'WY', 20200826, 0.0934579439252336, 26, 107, 10,
4444
0.4298245614035088, 28, 114, 49, 19, 7, 2, None, 4, 2, 0, 1, '2', 0, 26,

0 commit comments

Comments
 (0)