Skip to content

Commit fdf38cc

Browse files
authored
Merge pull request #1083 from cmu-delphi/krivard/covid_hosp-facility-running-time
covid_hosp improvements to address and investigate long update running times
2 parents 8a48c4f + 5d8f3fc commit fdf38cc

File tree

18 files changed

+136
-44
lines changed

18 files changed

+136
-44
lines changed

deploy.json

+9
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,15 @@
4747
"add-header-comment": true
4848
},
4949

50+
"// acquisition - common",
51+
{
52+
"type": "move",
53+
"src": "src/acquisition/common/",
54+
"dst": "[[package]]/acquisition/common/",
55+
"match": "^.*\\.(py)$",
56+
"add-header-comment": true
57+
},
58+
5059
"// acquisition - fluview",
5160
{
5261
"type": "move",

integrations/acquisition/covid_hosp/facility/test_scenarios.py

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def setUp(self):
3838
with Database.connect() as db:
3939
with db.new_cursor() as cur:
4040
cur.execute('truncate table covid_hosp_facility')
41+
cur.execute('truncate table covid_hosp_facility_key')
4142
cur.execute('truncate table covid_hosp_meta')
4243

4344
@freeze_time("2021-03-16")

src/acquisition/covid_hosp/common/database.py

+40-8
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
# first party
1313
import delphi.operations.secrets as secrets
14+
from delphi.epidata.acquisition.common.logger import get_structured_logger
1415

1516
Columndef = namedtuple("Columndef", "csv_name sql_name dtype")
1617

@@ -53,6 +54,10 @@ def __init__(self,
5354
self.key_columns = key_columns if key_columns is not None else []
5455
self.additional_fields = additional_fields if additional_fields is not None else []
5556

57+
@classmethod
58+
def logger(database_class):
59+
return get_structured_logger(f"{database_class.__module__}")
60+
5661
@classmethod
5762
@contextmanager
5863
def connect(database_class, mysql_connector_impl=mysql.connector):
@@ -124,7 +129,7 @@ def contains_revision(self, revision):
124129
for (result,) in cursor:
125130
return bool(result)
126131

127-
def insert_metadata(self, publication_date, revision, meta_json):
132+
def insert_metadata(self, publication_date, revision, meta_json, logger=False):
128133
"""Add revision metadata to the database.
129134
130135
Parameters
@@ -135,6 +140,8 @@ def insert_metadata(self, publication_date, revision, meta_json):
135140
Unique revision string.
136141
meta_json : str
137142
Metadata serialized as a JSON string.
143+
logger structlog.Logger [optional; default False]
144+
Logger to receive messages
138145
"""
139146

140147
with self.new_cursor() as cursor:
@@ -152,7 +159,7 @@ def insert_metadata(self, publication_date, revision, meta_json):
152159
(%s, %s, %s, %s, %s, NOW())
153160
''', (self.table_name, self.hhs_dataset_id, publication_date, revision, meta_json))
154161

155-
def insert_dataset(self, publication_date, dataframe):
162+
def insert_dataset(self, publication_date, dataframe, logger=False):
156163
"""Add a dataset to the database.
157164
158165
Parameters
@@ -161,6 +168,8 @@ def insert_dataset(self, publication_date, dataframe):
161168
Date when the dataset was published in YYYYMMDD format.
162169
dataframe : pandas.DataFrame
163170
The dataset.
171+
logger structlog.Logger [optional; default False]
172+
Logger to receive messages.
164173
"""
165174
dataframe_columns_and_types = [
166175
x for x in self.columns_and_types.values() if x.csv_name in dataframe.columns
@@ -181,18 +190,37 @@ def nan_safe_dtype(dtype, value):
181190
sql = f'INSERT INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columns}) ' \
182191
f'VALUES ({value_placeholders})'
183192
id_and_publication_date = (0, publication_date)
193+
if logger:
194+
logger.info('updating values', count=len(dataframe.index))
195+
n = 0
196+
many_values = []
184197
with self.new_cursor() as cursor:
185-
for _, row in dataframe.iterrows():
198+
for index, row in dataframe.iterrows():
186199
values = []
187200
for c in dataframe_columns_and_types:
188201
values.append(nan_safe_dtype(c.dtype, row[c.csv_name]))
189-
cursor.execute(sql,
190-
id_and_publication_date +
191-
tuple(values) +
192-
tuple(i.csv_name for i in self.additional_fields))
202+
many_values.append(id_and_publication_date +
203+
tuple(values) +
204+
tuple(i.csv_name for i in self.additional_fields))
205+
n += 1
206+
# insert in batches because one at a time is slow and all at once makes
207+
# the connection drop :(
208+
if n % 5_000 == 0:
209+
try:
210+
cursor.executemany(sql, many_values)
211+
many_values = []
212+
except Exception as e:
213+
if logger:
214+
logger.error('error on insert', publ_date=publication_date, in_lines=(n-5_000, n), index=index, values=values, exception=e)
215+
raise e
216+
# insert final batch
217+
if many_values:
218+
cursor.executemany(sql, many_values)
193219

194220
# deal with non/seldomly updated columns used like a fk table (if this database needs it)
195221
if hasattr(self, 'AGGREGATE_KEY_COLS'):
222+
if logger:
223+
logger.info('updating keys')
196224
ak_cols = self.AGGREGATE_KEY_COLS
197225

198226
# restrict data to just the key columns and remove duplicate rows
@@ -219,13 +247,15 @@ def nan_safe_dtype(dtype, value):
219247
ak_table = self.table_name + '_key'
220248
# assemble full SQL statement
221249
ak_insert_sql = f'INSERT INTO `{ak_table}` ({ak_cols_str}) VALUES ({values_str}) AS v ON DUPLICATE KEY UPDATE {ak_updates_str}'
250+
if logger:
251+
logger.info("database query", sql=ak_insert_sql)
222252

223253
# commit the data
224254
with self.new_cursor() as cur:
225255
cur.executemany(ak_insert_sql, ak_data)
226256

227257

228-
def get_max_issue(self):
258+
def get_max_issue(self, logger=False):
229259
"""Fetch the most recent issue.
230260
231261
This is used to bookend what updates we pull in from the HHS metadata.
@@ -242,4 +272,6 @@ def get_max_issue(self):
242272
for (result,) in cursor:
243273
if result is not None:
244274
return pd.Timestamp(str(result))
275+
if logger:
276+
logger.warn("get_max_issue", msg="no matching results in meta table; returning 1900/1/1 epoch")
245277
return pd.Timestamp("1900/1/1")

src/acquisition/covid_hosp/common/network.py

+10-5
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,31 @@ class Network:
66
METADATA_URL_TEMPLATE = \
77
'https://healthdata.gov/api/views/%s/rows.csv'
88

9-
def fetch_metadata_for_dataset(dataset_id):
9+
def fetch_metadata_for_dataset(dataset_id, logger=False):
1010
"""Download and return metadata.
1111
1212
Parameters
1313
----------
1414
dataset_id : str
1515
healthdata.gov dataset identifier of the dataset.
16+
logger : structlog.Logger [optional; default False]
17+
Logger to receive messages.
1618
1719
Returns
1820
-------
1921
object
2022
The metadata object.
2123
"""
2224
url = Network.METADATA_URL_TEMPLATE % dataset_id
23-
print(f'fetching metadata at {url}')
25+
if logger:
26+
logger.info('fetching metadata', url=url)
2427
df = Network.fetch_dataset(url)
2528
df["Update Date"] = pandas.to_datetime(df["Update Date"])
2629
df.sort_values("Update Date", inplace=True)
2730
df.set_index("Update Date", inplace=True)
2831
return df
2932

30-
def fetch_dataset(url, pandas_impl=pandas):
33+
def fetch_dataset(url, pandas_impl=pandas, logger=False):
3134
"""Download and return a dataset.
3235
3336
Type inference is disabled in favor of explicit type casting at the
@@ -38,12 +41,14 @@ def fetch_dataset(url, pandas_impl=pandas):
3841
----------
3942
url : str
4043
URL to the dataset in CSV format.
44+
logger : structlog.Logger [optional; default False]
45+
Logger to receive messages.
4146
4247
Returns
4348
-------
4449
pandas.DataFrame
4550
The dataset.
4651
"""
47-
48-
print(f'fetching dataset at {url}')
52+
if logger:
53+
logger.info('fetching dataset', url=url)
4954
return pandas_impl.read_csv(url, dtype=str)

src/acquisition/covid_hosp/common/utils.py

+49-13
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import pandas as pd
88

9+
910
class CovidHospException(Exception):
1011
"""Exception raised exclusively by `covid_hosp` utilities."""
1112

@@ -69,7 +70,26 @@ def parse_bool(value):
6970
return False
7071
raise CovidHospException(f'cannot convert "{value}" to bool')
7172

72-
def issues_to_fetch(metadata, newer_than, older_than):
73+
def limited_string_fn(length):
74+
def limited_string(value):
75+
value = str(value)
76+
if len(value) > length:
77+
raise CovidHospException(f"Value '{value}':{len(value)} longer than max {length}")
78+
return value
79+
return limited_string
80+
81+
GEOCODE_LENGTH = 32
82+
GEOCODE_PATTERN = re.compile(r'POINT \((-?[0-9.]+) (-?[0-9.]+)\)')
83+
def limited_geocode(value):
84+
if len(value) < Utils.GEOCODE_LENGTH:
85+
return value
86+
# otherwise parse and set precision to 6 decimal places
87+
m = Utils.GEOCODE_PATTERN.match(value)
88+
if not m:
89+
raise CovidHospException(f"Couldn't parse geocode '{value}'")
90+
return f'POINT ({" ".join(f"{float(x):.6f}" for x in m.groups())})'
91+
92+
def issues_to_fetch(metadata, newer_than, older_than, logger=False):
7393
"""
7494
Construct all issue dates and URLs to be ingested based on metadata.
7595
@@ -81,13 +101,16 @@ def issues_to_fetch(metadata, newer_than, older_than):
81101
Lower bound (exclusive) of days to get issues for.
82102
older_than Date
83103
Upper bound (exclusive) of days to get issues for
104+
logger structlog.Logger [optional; default False]
105+
Logger to receive messages
84106
Returns
85107
-------
86108
Dictionary of {issue day: list of (download urls, index)}
87109
for issues after newer_than and before older_than
88110
"""
89111
daily_issues = {}
90112
n_beyond = 0
113+
n_selected = 0
91114
for index in sorted(set(metadata.index)):
92115
day = index.date()
93116
if day > newer_than and day < older_than:
@@ -97,14 +120,17 @@ def issues_to_fetch(metadata, newer_than, older_than):
97120
daily_issues[day] = urls_list
98121
else:
99122
daily_issues[day] += urls_list
123+
n_selected += len(urls_list)
100124
elif day >= older_than:
101125
n_beyond += 1
102-
if n_beyond > 0:
103-
print(f"{n_beyond} issues available on {older_than} or newer")
126+
if logger:
127+
if n_beyond > 0:
128+
logger.info("issues available beyond selection", on_or_newer=older_than, count=n_beyond)
129+
logger.info("issues selected", newer_than=str(newer_than), older_than=str(older_than), count=n_selected)
104130
return daily_issues
105131

106132
@staticmethod
107-
def merge_by_key_cols(dfs, key_cols):
133+
def merge_by_key_cols(dfs, key_cols, logger=False):
108134
"""Merge a list of data frames as a series of updates.
109135
110136
Parameters:
@@ -113,13 +139,20 @@ def merge_by_key_cols(dfs, key_cols):
113139
Data frames to merge, ordered from earliest to latest.
114140
key_cols: list(str)
115141
Columns to use as the index.
142+
logger structlog.Logger [optional; default False]
143+
Logger to receive messages
116144
117145
Returns a single data frame containing the most recent data for each state+date.
118146
"""
119147

120148
dfs = [df.set_index(key_cols) for df in dfs
121149
if not all(k in df.index.names for k in key_cols)]
122150
result = dfs[0]
151+
if logger and len(dfs) > 7:
152+
logger.warning(
153+
"expensive operation",
154+
msg="concatenating more than 7 files may result in long running times",
155+
count=len(dfs))
123156
for df in dfs[1:]:
124157
# update values for existing keys
125158
result.update(df)
@@ -153,22 +186,25 @@ def update_dataset(database, network, newer_than=None, older_than=None):
153186
bool
154187
Whether a new dataset was acquired.
155188
"""
156-
metadata = network.fetch_metadata()
189+
logger = database.logger()
190+
191+
metadata = network.fetch_metadata(logger=logger)
157192
datasets = []
158193
with database.connect() as db:
159-
max_issue = db.get_max_issue()
194+
max_issue = db.get_max_issue(logger=logger)
160195

161196
older_than = datetime.datetime.today().date() if newer_than is None else older_than
162197
newer_than = max_issue if newer_than is None else newer_than
163-
daily_issues = Utils.issues_to_fetch(metadata, newer_than, older_than)
198+
daily_issues = Utils.issues_to_fetch(metadata, newer_than, older_than, logger=logger)
164199
if not daily_issues:
165-
print("no new issues, nothing to do")
200+
logger.info("no new issues; nothing to do")
166201
return False
167202
for issue, revisions in daily_issues.items():
168203
issue_int = int(issue.strftime("%Y%m%d"))
169204
# download the dataset and add it to the database
170-
dataset = Utils.merge_by_key_cols([network.fetch_dataset(url) for url, _ in revisions],
171-
db.KEY_COLS)
205+
dataset = Utils.merge_by_key_cols([network.fetch_dataset(url, logger=logger) for url, _ in revisions],
206+
db.KEY_COLS,
207+
logger=logger)
172208
# add metadata to the database
173209
all_metadata = []
174210
for url, index in revisions:
@@ -180,10 +216,10 @@ def update_dataset(database, network, newer_than=None, older_than=None):
180216
))
181217
with database.connect() as db:
182218
for issue_int, dataset, all_metadata in datasets:
183-
db.insert_dataset(issue_int, dataset)
219+
db.insert_dataset(issue_int, dataset, logger=logger)
184220
for url, metadata_json in all_metadata:
185-
db.insert_metadata(issue_int, url, metadata_json)
186-
print(f'successfully acquired {len(dataset)} rows')
221+
db.insert_metadata(issue_int, url, metadata_json, logger=logger)
222+
logger.info("acquired rows", count=len(dataset))
187223

188224
# note that the transaction is committed by exiting the `with` block
189225
return True

src/acquisition/covid_hosp/facility/database.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class Database(BaseDatabase):
4040
Columndef('ccn', 'ccn', str),
4141
Columndef('city', 'city', str),
4242
Columndef('fips_code', 'fips_code', str),
43-
Columndef('geocoded_hospital_address', 'geocoded_hospital_address', str),
43+
Columndef('geocoded_hospital_address', 'geocoded_hospital_address', Utils.limited_geocode),
4444
Columndef('hhs_ids', 'hhs_ids', str),
4545
Columndef('hospital_name', 'hospital_name', str),
4646
Columndef('hospital_subtype', 'hospital_subtype', str),

src/acquisition/covidcast/covidcast_meta_cache_updater.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
# first party
99
from delphi.epidata.acquisition.covidcast.database import Database
10-
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
10+
from delphi.epidata.acquisition.common.logger import get_structured_logger
1111
from delphi.epidata.client.delphi_epidata import Epidata
1212

1313
def get_argument_parser():

src/acquisition/covidcast/csv_importer.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from delphi_utils import Nans
1717
from delphi.utils.epiweek import delta_epiweeks
1818
from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow
19-
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
19+
from delphi.epidata.acquisition.common.logger import get_structured_logger
2020

2121
DataFrameRow = NamedTuple('DFRow', [
2222
('geo_id', str),

src/acquisition/covidcast/csv_to_database.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter, PathDetails
1212
from delphi.epidata.acquisition.covidcast.database import Database, DBLoadStateException
1313
from delphi.epidata.acquisition.covidcast.file_archiver import FileArchiver
14-
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
14+
from delphi.epidata.acquisition.common.logger import get_structured_logger
1515

1616

1717
def get_argument_parser():

src/acquisition/covidcast/database.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
# first party
1616
import delphi.operations.secrets as secrets
17-
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
17+
from delphi.epidata.acquisition.common.logger import get_structured_logger
1818
from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow
1919

2020

src/acquisition/covidcast/delete_batch.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
# first party
1010
from delphi.epidata.acquisition.covidcast.database import Database
11-
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
11+
from delphi.epidata.acquisition.common.logger import get_structured_logger
1212

1313

1414
def get_argument_parser():

0 commit comments

Comments
 (0)