Skip to content

covid_hosp improvements to address and investigate long update running times #1083

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 3, 2023
Merged
9 changes: 9 additions & 0 deletions deploy.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@
"add-header-comment": true
},

"// acquisition - common",
{
"type": "move",
"src": "src/acquisition/common/",
"dst": "[[package]]/acquisition/common/",
"match": "^.*\\.(py)$",
"add-header-comment": true
},

"// acquisition - fluview",
{
"type": "move",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def setUp(self):
with Database.connect() as db:
with db.new_cursor() as cur:
cur.execute('truncate table covid_hosp_facility')
cur.execute('truncate table covid_hosp_facility_key')
cur.execute('truncate table covid_hosp_meta')

@freeze_time("2021-03-16")
Expand Down
File renamed without changes.
48 changes: 40 additions & 8 deletions src/acquisition/covid_hosp/common/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

# first party
import delphi.operations.secrets as secrets
from delphi.epidata.acquisition.common.logger import get_structured_logger

Columndef = namedtuple("Columndef", "csv_name sql_name dtype")

Expand Down Expand Up @@ -53,6 +54,10 @@ def __init__(self,
self.key_columns = key_columns if key_columns is not None else []
self.additional_fields = additional_fields if additional_fields is not None else []

@classmethod
def logger(database_class):
return get_structured_logger(f"{database_class.__module__}")

@classmethod
@contextmanager
def connect(database_class, mysql_connector_impl=mysql.connector):
Expand Down Expand Up @@ -124,7 +129,7 @@ def contains_revision(self, revision):
for (result,) in cursor:
return bool(result)

def insert_metadata(self, publication_date, revision, meta_json):
def insert_metadata(self, publication_date, revision, meta_json, logger=False):
"""Add revision metadata to the database.

Parameters
Expand All @@ -135,6 +140,8 @@ def insert_metadata(self, publication_date, revision, meta_json):
Unique revision string.
meta_json : str
Metadata serialized as a JSON string.
logger structlog.Logger [optional; default False]
Logger to receive messages
"""

with self.new_cursor() as cursor:
Expand All @@ -152,7 +159,7 @@ def insert_metadata(self, publication_date, revision, meta_json):
(%s, %s, %s, %s, %s, NOW())
''', (self.table_name, self.hhs_dataset_id, publication_date, revision, meta_json))

def insert_dataset(self, publication_date, dataframe):
def insert_dataset(self, publication_date, dataframe, logger=False):
"""Add a dataset to the database.

Parameters
Expand All @@ -161,6 +168,8 @@ def insert_dataset(self, publication_date, dataframe):
Date when the dataset was published in YYYYMMDD format.
dataframe : pandas.DataFrame
The dataset.
logger structlog.Logger [optional; default False]
Logger to receive messages.
"""
dataframe_columns_and_types = [
x for x in self.columns_and_types.values() if x.csv_name in dataframe.columns
Expand All @@ -181,18 +190,37 @@ def nan_safe_dtype(dtype, value):
sql = f'INSERT INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columns}) ' \
f'VALUES ({value_placeholders})'
id_and_publication_date = (0, publication_date)
if logger:
logger.info('updating values', count=len(dataframe.index))
n = 0
many_values = []
with self.new_cursor() as cursor:
for _, row in dataframe.iterrows():
for index, row in dataframe.iterrows():
values = []
for c in dataframe_columns_and_types:
values.append(nan_safe_dtype(c.dtype, row[c.csv_name]))
cursor.execute(sql,
id_and_publication_date +
tuple(values) +
tuple(i.csv_name for i in self.additional_fields))
many_values.append(id_and_publication_date +
tuple(values) +
tuple(i.csv_name for i in self.additional_fields))
n += 1
# insert in batches because one at a time is slow and all at once makes
# the connection drop :(
if n % 5_000 == 0:
try:
cursor.executemany(sql, many_values)
many_values = []
except Exception as e:
if logger:
logger.error('error on insert', publ_date=publication_date, in_lines=(n-5_000, n), index=index, values=values, exception=e)
raise e
# insert final batch
if many_values:
cursor.executemany(sql, many_values)

# deal with non/seldomly updated columns used like a fk table (if this database needs it)
if hasattr(self, 'AGGREGATE_KEY_COLS'):
if logger:
logger.info('updating keys')
ak_cols = self.AGGREGATE_KEY_COLS

# restrict data to just the key columns and remove duplicate rows
Expand All @@ -219,13 +247,15 @@ def nan_safe_dtype(dtype, value):
ak_table = self.table_name + '_key'
# assemble full SQL statement
ak_insert_sql = f'INSERT INTO `{ak_table}` ({ak_cols_str}) VALUES ({values_str}) AS v ON DUPLICATE KEY UPDATE {ak_updates_str}'
if logger:
logger.info("database query", sql=ak_insert_sql)

# commit the data
with self.new_cursor() as cur:
cur.executemany(ak_insert_sql, ak_data)


def get_max_issue(self):
def get_max_issue(self, logger=False):
"""Fetch the most recent issue.

This is used to bookend what updates we pull in from the HHS metadata.
Expand All @@ -242,4 +272,6 @@ def get_max_issue(self):
for (result,) in cursor:
if result is not None:
return pd.Timestamp(str(result))
if logger:
logger.warn("get_max_issue", msg="no matching results in meta table; returning 1900/1/1 epoch")
return pd.Timestamp("1900/1/1")
15 changes: 10 additions & 5 deletions src/acquisition/covid_hosp/common/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,31 @@ class Network:
METADATA_URL_TEMPLATE = \
'https://healthdata.gov/api/views/%s/rows.csv'

def fetch_metadata_for_dataset(dataset_id):
def fetch_metadata_for_dataset(dataset_id, logger=False):
"""Download and return metadata.

Parameters
----------
dataset_id : str
healthdata.gov dataset identifier of the dataset.
logger : structlog.Logger [optional; default False]
Logger to receive messages.

Returns
-------
object
The metadata object.
"""
url = Network.METADATA_URL_TEMPLATE % dataset_id
print(f'fetching metadata at {url}')
if logger:
logger.info('fetching metadata', url=url)
df = Network.fetch_dataset(url)
df["Update Date"] = pandas.to_datetime(df["Update Date"])
df.sort_values("Update Date", inplace=True)
df.set_index("Update Date", inplace=True)
return df

def fetch_dataset(url, pandas_impl=pandas):
def fetch_dataset(url, pandas_impl=pandas, logger=False):
"""Download and return a dataset.

Type inference is disabled in favor of explicit type casting at the
Expand All @@ -38,12 +41,14 @@ def fetch_dataset(url, pandas_impl=pandas):
----------
url : str
URL to the dataset in CSV format.
logger : structlog.Logger [optional; default False]
Logger to receive messages.

Returns
-------
pandas.DataFrame
The dataset.
"""

print(f'fetching dataset at {url}')
if logger:
logger.info('fetching dataset', url=url)
return pandas_impl.read_csv(url, dtype=str)
62 changes: 49 additions & 13 deletions src/acquisition/covid_hosp/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pandas as pd


class CovidHospException(Exception):
"""Exception raised exclusively by `covid_hosp` utilities."""

Expand Down Expand Up @@ -69,7 +70,26 @@ def parse_bool(value):
return False
raise CovidHospException(f'cannot convert "{value}" to bool')

def issues_to_fetch(metadata, newer_than, older_than):
def limited_string_fn(length):
def limited_string(value):
value = str(value)
if len(value) > length:
raise CovidHospException(f"Value '{value}':{len(value)} longer than max {length}")
return value
return limited_string

GEOCODE_LENGTH = 32
GEOCODE_PATTERN = re.compile(r'POINT \((-?[0-9.]+) (-?[0-9.]+)\)')
def limited_geocode(value):
if len(value) < Utils.GEOCODE_LENGTH:
return value
# otherwise parse and set precision to 6 decimal places
m = Utils.GEOCODE_PATTERN.match(value)
if not m:
raise CovidHospException(f"Couldn't parse geocode '{value}'")
return f'POINT ({" ".join(f"{float(x):.6f}" for x in m.groups())})'

def issues_to_fetch(metadata, newer_than, older_than, logger=False):
"""
Construct all issue dates and URLs to be ingested based on metadata.

Expand All @@ -81,13 +101,16 @@ def issues_to_fetch(metadata, newer_than, older_than):
Lower bound (exclusive) of days to get issues for.
older_than Date
Upper bound (exclusive) of days to get issues for
logger structlog.Logger [optional; default False]
Logger to receive messages
Returns
-------
Dictionary of {issue day: list of (download urls, index)}
for issues after newer_than and before older_than
"""
daily_issues = {}
n_beyond = 0
n_selected = 0
for index in sorted(set(metadata.index)):
day = index.date()
if day > newer_than and day < older_than:
Expand All @@ -97,14 +120,17 @@ def issues_to_fetch(metadata, newer_than, older_than):
daily_issues[day] = urls_list
else:
daily_issues[day] += urls_list
n_selected += len(urls_list)
elif day >= older_than:
n_beyond += 1
if n_beyond > 0:
print(f"{n_beyond} issues available on {older_than} or newer")
if logger:
if n_beyond > 0:
logger.info("issues available beyond selection", on_or_newer=older_than, count=n_beyond)
logger.info("issues selected", newer_than=str(newer_than), older_than=str(older_than), count=n_selected)
return daily_issues

@staticmethod
def merge_by_key_cols(dfs, key_cols):
def merge_by_key_cols(dfs, key_cols, logger=False):
"""Merge a list of data frames as a series of updates.

Parameters:
Expand All @@ -113,13 +139,20 @@ def merge_by_key_cols(dfs, key_cols):
Data frames to merge, ordered from earliest to latest.
key_cols: list(str)
Columns to use as the index.
logger structlog.Logger [optional; default False]
Logger to receive messages

Returns a single data frame containing the most recent data for each state+date.
"""

dfs = [df.set_index(key_cols) for df in dfs
if not all(k in df.index.names for k in key_cols)]
result = dfs[0]
if logger and len(dfs) > 7:
logger.warning(
"expensive operation",
msg="concatenating more than 7 files may result in long running times",
count=len(dfs))
for df in dfs[1:]:
# update values for existing keys
result.update(df)
Expand Down Expand Up @@ -153,22 +186,25 @@ def update_dataset(database, network, newer_than=None, older_than=None):
bool
Whether a new dataset was acquired.
"""
metadata = network.fetch_metadata()
logger = database.logger()

metadata = network.fetch_metadata(logger=logger)
datasets = []
with database.connect() as db:
max_issue = db.get_max_issue()
max_issue = db.get_max_issue(logger=logger)

older_than = datetime.datetime.today().date() if newer_than is None else older_than
newer_than = max_issue if newer_than is None else newer_than
daily_issues = Utils.issues_to_fetch(metadata, newer_than, older_than)
daily_issues = Utils.issues_to_fetch(metadata, newer_than, older_than, logger=logger)
if not daily_issues:
print("no new issues, nothing to do")
logger.info("no new issues; nothing to do")
return False
for issue, revisions in daily_issues.items():
issue_int = int(issue.strftime("%Y%m%d"))
# download the dataset and add it to the database
dataset = Utils.merge_by_key_cols([network.fetch_dataset(url) for url, _ in revisions],
db.KEY_COLS)
dataset = Utils.merge_by_key_cols([network.fetch_dataset(url, logger=logger) for url, _ in revisions],
db.KEY_COLS,
logger=logger)
# add metadata to the database
all_metadata = []
for url, index in revisions:
Expand All @@ -180,10 +216,10 @@ def update_dataset(database, network, newer_than=None, older_than=None):
))
with database.connect() as db:
for issue_int, dataset, all_metadata in datasets:
db.insert_dataset(issue_int, dataset)
db.insert_dataset(issue_int, dataset, logger=logger)
for url, metadata_json in all_metadata:
db.insert_metadata(issue_int, url, metadata_json)
print(f'successfully acquired {len(dataset)} rows')
db.insert_metadata(issue_int, url, metadata_json, logger=logger)
logger.info("acquired rows", count=len(dataset))

# note that the transaction is committed by exiting the `with` block
return True
2 changes: 1 addition & 1 deletion src/acquisition/covid_hosp/facility/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class Database(BaseDatabase):
Columndef('ccn', 'ccn', str),
Columndef('city', 'city', str),
Columndef('fips_code', 'fips_code', str),
Columndef('geocoded_hospital_address', 'geocoded_hospital_address', str),
Columndef('geocoded_hospital_address', 'geocoded_hospital_address', Utils.limited_geocode),
Columndef('hhs_ids', 'hhs_ids', str),
Columndef('hospital_name', 'hospital_name', str),
Columndef('hospital_subtype', 'hospital_subtype', str),
Expand Down
2 changes: 1 addition & 1 deletion src/acquisition/covidcast/covidcast_meta_cache_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

# first party
from delphi.epidata.acquisition.covidcast.database import Database
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
from delphi.epidata.acquisition.common.logger import get_structured_logger
from delphi.epidata.client.delphi_epidata import Epidata

def get_argument_parser():
Expand Down
2 changes: 1 addition & 1 deletion src/acquisition/covidcast/csv_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from delphi_utils import Nans
from delphi.utils.epiweek import delta_epiweeks
from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
from delphi.epidata.acquisition.common.logger import get_structured_logger

DataFrameRow = NamedTuple('DFRow', [
('geo_id', str),
Expand Down
2 changes: 1 addition & 1 deletion src/acquisition/covidcast/csv_to_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter, PathDetails
from delphi.epidata.acquisition.covidcast.database import Database, DBLoadStateException
from delphi.epidata.acquisition.covidcast.file_archiver import FileArchiver
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
from delphi.epidata.acquisition.common.logger import get_structured_logger


def get_argument_parser():
Expand Down
2 changes: 1 addition & 1 deletion src/acquisition/covidcast/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

# first party
import delphi.operations.secrets as secrets
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
from delphi.epidata.acquisition.common.logger import get_structured_logger
from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow


Expand Down
2 changes: 1 addition & 1 deletion src/acquisition/covidcast/delete_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

# first party
from delphi.epidata.acquisition.covidcast.database import Database
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
from delphi.epidata.acquisition.common.logger import get_structured_logger


def get_argument_parser():
Expand Down
Loading