diff --git a/deploy.json b/deploy.json index 0e00d1201..425ddef6d 100644 --- a/deploy.json +++ b/deploy.json @@ -47,6 +47,15 @@ "add-header-comment": true }, + "// acquisition - common", + { + "type": "move", + "src": "src/acquisition/common/", + "dst": "[[package]]/acquisition/common/", + "match": "^.*\\.(py)$", + "add-header-comment": true + }, + "// acquisition - fluview", { "type": "move", diff --git a/integrations/acquisition/covid_hosp/facility/test_scenarios.py b/integrations/acquisition/covid_hosp/facility/test_scenarios.py index 4c47d689e..aaa3c5e3b 100644 --- a/integrations/acquisition/covid_hosp/facility/test_scenarios.py +++ b/integrations/acquisition/covid_hosp/facility/test_scenarios.py @@ -38,6 +38,7 @@ def setUp(self): with Database.connect() as db: with db.new_cursor() as cur: cur.execute('truncate table covid_hosp_facility') + cur.execute('truncate table covid_hosp_facility_key') cur.execute('truncate table covid_hosp_meta') @freeze_time("2021-03-16") diff --git a/src/acquisition/covidcast/logger.py b/src/acquisition/common/logger.py similarity index 100% rename from src/acquisition/covidcast/logger.py rename to src/acquisition/common/logger.py diff --git a/src/acquisition/covid_hosp/common/database.py b/src/acquisition/covid_hosp/common/database.py index 8875828fa..ed308e7a0 100644 --- a/src/acquisition/covid_hosp/common/database.py +++ b/src/acquisition/covid_hosp/common/database.py @@ -11,6 +11,7 @@ # first party import delphi.operations.secrets as secrets +from delphi.epidata.acquisition.common.logger import get_structured_logger Columndef = namedtuple("Columndef", "csv_name sql_name dtype") @@ -53,6 +54,10 @@ def __init__(self, self.key_columns = key_columns if key_columns is not None else [] self.additional_fields = additional_fields if additional_fields is not None else [] + @classmethod + def logger(database_class): + return get_structured_logger(f"{database_class.__module__}") + @classmethod @contextmanager def connect(database_class, mysql_connector_impl=mysql.connector): @@ -124,7 +129,7 @@ def contains_revision(self, revision): for (result,) in cursor: return bool(result) - def insert_metadata(self, publication_date, revision, meta_json): + def insert_metadata(self, publication_date, revision, meta_json, logger=False): """Add revision metadata to the database. Parameters @@ -135,6 +140,8 @@ def insert_metadata(self, publication_date, revision, meta_json): Unique revision string. meta_json : str Metadata serialized as a JSON string. + logger structlog.Logger [optional; default False] + Logger to receive messages """ with self.new_cursor() as cursor: @@ -152,7 +159,7 @@ def insert_metadata(self, publication_date, revision, meta_json): (%s, %s, %s, %s, %s, NOW()) ''', (self.table_name, self.hhs_dataset_id, publication_date, revision, meta_json)) - def insert_dataset(self, publication_date, dataframe): + def insert_dataset(self, publication_date, dataframe, logger=False): """Add a dataset to the database. Parameters @@ -161,6 +168,8 @@ def insert_dataset(self, publication_date, dataframe): Date when the dataset was published in YYYYMMDD format. dataframe : pandas.DataFrame The dataset. + logger structlog.Logger [optional; default False] + Logger to receive messages. """ dataframe_columns_and_types = [ x for x in self.columns_and_types.values() if x.csv_name in dataframe.columns @@ -181,18 +190,37 @@ def nan_safe_dtype(dtype, value): sql = f'INSERT INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columns}) ' \ f'VALUES ({value_placeholders})' id_and_publication_date = (0, publication_date) + if logger: + logger.info('updating values', count=len(dataframe.index)) + n = 0 + many_values = [] with self.new_cursor() as cursor: - for _, row in dataframe.iterrows(): + for index, row in dataframe.iterrows(): values = [] for c in dataframe_columns_and_types: values.append(nan_safe_dtype(c.dtype, row[c.csv_name])) - cursor.execute(sql, - id_and_publication_date + - tuple(values) + - tuple(i.csv_name for i in self.additional_fields)) + many_values.append(id_and_publication_date + + tuple(values) + + tuple(i.csv_name for i in self.additional_fields)) + n += 1 + # insert in batches because one at a time is slow and all at once makes + # the connection drop :( + if n % 5_000 == 0: + try: + cursor.executemany(sql, many_values) + many_values = [] + except Exception as e: + if logger: + logger.error('error on insert', publ_date=publication_date, in_lines=(n-5_000, n), index=index, values=values, exception=e) + raise e + # insert final batch + if many_values: + cursor.executemany(sql, many_values) # deal with non/seldomly updated columns used like a fk table (if this database needs it) if hasattr(self, 'AGGREGATE_KEY_COLS'): + if logger: + logger.info('updating keys') ak_cols = self.AGGREGATE_KEY_COLS # restrict data to just the key columns and remove duplicate rows @@ -219,13 +247,15 @@ def nan_safe_dtype(dtype, value): ak_table = self.table_name + '_key' # assemble full SQL statement ak_insert_sql = f'INSERT INTO `{ak_table}` ({ak_cols_str}) VALUES ({values_str}) AS v ON DUPLICATE KEY UPDATE {ak_updates_str}' + if logger: + logger.info("database query", sql=ak_insert_sql) # commit the data with self.new_cursor() as cur: cur.executemany(ak_insert_sql, ak_data) - def get_max_issue(self): + def get_max_issue(self, logger=False): """Fetch the most recent issue. This is used to bookend what updates we pull in from the HHS metadata. @@ -242,4 +272,6 @@ def get_max_issue(self): for (result,) in cursor: if result is not None: return pd.Timestamp(str(result)) + if logger: + logger.warn("get_max_issue", msg="no matching results in meta table; returning 1900/1/1 epoch") return pd.Timestamp("1900/1/1") diff --git a/src/acquisition/covid_hosp/common/network.py b/src/acquisition/covid_hosp/common/network.py index ba0cca281..7b6228f16 100644 --- a/src/acquisition/covid_hosp/common/network.py +++ b/src/acquisition/covid_hosp/common/network.py @@ -6,13 +6,15 @@ class Network: METADATA_URL_TEMPLATE = \ 'https://healthdata.gov/api/views/%s/rows.csv' - def fetch_metadata_for_dataset(dataset_id): + def fetch_metadata_for_dataset(dataset_id, logger=False): """Download and return metadata. Parameters ---------- dataset_id : str healthdata.gov dataset identifier of the dataset. + logger : structlog.Logger [optional; default False] + Logger to receive messages. Returns ------- @@ -20,14 +22,15 @@ def fetch_metadata_for_dataset(dataset_id): The metadata object. """ url = Network.METADATA_URL_TEMPLATE % dataset_id - print(f'fetching metadata at {url}') + if logger: + logger.info('fetching metadata', url=url) df = Network.fetch_dataset(url) df["Update Date"] = pandas.to_datetime(df["Update Date"]) df.sort_values("Update Date", inplace=True) df.set_index("Update Date", inplace=True) return df - def fetch_dataset(url, pandas_impl=pandas): + def fetch_dataset(url, pandas_impl=pandas, logger=False): """Download and return a dataset. Type inference is disabled in favor of explicit type casting at the @@ -38,12 +41,14 @@ def fetch_dataset(url, pandas_impl=pandas): ---------- url : str URL to the dataset in CSV format. + logger : structlog.Logger [optional; default False] + Logger to receive messages. Returns ------- pandas.DataFrame The dataset. """ - - print(f'fetching dataset at {url}') + if logger: + logger.info('fetching dataset', url=url) return pandas_impl.read_csv(url, dtype=str) diff --git a/src/acquisition/covid_hosp/common/utils.py b/src/acquisition/covid_hosp/common/utils.py index 99a6b4f33..fcf956f66 100644 --- a/src/acquisition/covid_hosp/common/utils.py +++ b/src/acquisition/covid_hosp/common/utils.py @@ -6,6 +6,7 @@ import pandas as pd + class CovidHospException(Exception): """Exception raised exclusively by `covid_hosp` utilities.""" @@ -69,7 +70,26 @@ def parse_bool(value): return False raise CovidHospException(f'cannot convert "{value}" to bool') - def issues_to_fetch(metadata, newer_than, older_than): + def limited_string_fn(length): + def limited_string(value): + value = str(value) + if len(value) > length: + raise CovidHospException(f"Value '{value}':{len(value)} longer than max {length}") + return value + return limited_string + + GEOCODE_LENGTH = 32 + GEOCODE_PATTERN = re.compile(r'POINT \((-?[0-9.]+) (-?[0-9.]+)\)') + def limited_geocode(value): + if len(value) < Utils.GEOCODE_LENGTH: + return value + # otherwise parse and set precision to 6 decimal places + m = Utils.GEOCODE_PATTERN.match(value) + if not m: + raise CovidHospException(f"Couldn't parse geocode '{value}'") + return f'POINT ({" ".join(f"{float(x):.6f}" for x in m.groups())})' + + def issues_to_fetch(metadata, newer_than, older_than, logger=False): """ Construct all issue dates and URLs to be ingested based on metadata. @@ -81,6 +101,8 @@ def issues_to_fetch(metadata, newer_than, older_than): Lower bound (exclusive) of days to get issues for. older_than Date Upper bound (exclusive) of days to get issues for + logger structlog.Logger [optional; default False] + Logger to receive messages Returns ------- Dictionary of {issue day: list of (download urls, index)} @@ -88,6 +110,7 @@ def issues_to_fetch(metadata, newer_than, older_than): """ daily_issues = {} n_beyond = 0 + n_selected = 0 for index in sorted(set(metadata.index)): day = index.date() if day > newer_than and day < older_than: @@ -97,14 +120,17 @@ def issues_to_fetch(metadata, newer_than, older_than): daily_issues[day] = urls_list else: daily_issues[day] += urls_list + n_selected += len(urls_list) elif day >= older_than: n_beyond += 1 - if n_beyond > 0: - print(f"{n_beyond} issues available on {older_than} or newer") + if logger: + if n_beyond > 0: + logger.info("issues available beyond selection", on_or_newer=older_than, count=n_beyond) + logger.info("issues selected", newer_than=str(newer_than), older_than=str(older_than), count=n_selected) return daily_issues @staticmethod - def merge_by_key_cols(dfs, key_cols): + def merge_by_key_cols(dfs, key_cols, logger=False): """Merge a list of data frames as a series of updates. Parameters: @@ -113,6 +139,8 @@ def merge_by_key_cols(dfs, key_cols): Data frames to merge, ordered from earliest to latest. key_cols: list(str) Columns to use as the index. + logger structlog.Logger [optional; default False] + Logger to receive messages Returns a single data frame containing the most recent data for each state+date. """ @@ -120,6 +148,11 @@ def merge_by_key_cols(dfs, key_cols): dfs = [df.set_index(key_cols) for df in dfs if not all(k in df.index.names for k in key_cols)] result = dfs[0] + if logger and len(dfs) > 7: + logger.warning( + "expensive operation", + msg="concatenating more than 7 files may result in long running times", + count=len(dfs)) for df in dfs[1:]: # update values for existing keys result.update(df) @@ -153,22 +186,25 @@ def update_dataset(database, network, newer_than=None, older_than=None): bool Whether a new dataset was acquired. """ - metadata = network.fetch_metadata() + logger = database.logger() + + metadata = network.fetch_metadata(logger=logger) datasets = [] with database.connect() as db: - max_issue = db.get_max_issue() + max_issue = db.get_max_issue(logger=logger) older_than = datetime.datetime.today().date() if newer_than is None else older_than newer_than = max_issue if newer_than is None else newer_than - daily_issues = Utils.issues_to_fetch(metadata, newer_than, older_than) + daily_issues = Utils.issues_to_fetch(metadata, newer_than, older_than, logger=logger) if not daily_issues: - print("no new issues, nothing to do") + logger.info("no new issues; nothing to do") return False for issue, revisions in daily_issues.items(): issue_int = int(issue.strftime("%Y%m%d")) # download the dataset and add it to the database - dataset = Utils.merge_by_key_cols([network.fetch_dataset(url) for url, _ in revisions], - db.KEY_COLS) + dataset = Utils.merge_by_key_cols([network.fetch_dataset(url, logger=logger) for url, _ in revisions], + db.KEY_COLS, + logger=logger) # add metadata to the database all_metadata = [] for url, index in revisions: @@ -180,10 +216,10 @@ def update_dataset(database, network, newer_than=None, older_than=None): )) with database.connect() as db: for issue_int, dataset, all_metadata in datasets: - db.insert_dataset(issue_int, dataset) + db.insert_dataset(issue_int, dataset, logger=logger) for url, metadata_json in all_metadata: - db.insert_metadata(issue_int, url, metadata_json) - print(f'successfully acquired {len(dataset)} rows') + db.insert_metadata(issue_int, url, metadata_json, logger=logger) + logger.info("acquired rows", count=len(dataset)) # note that the transaction is committed by exiting the `with` block return True diff --git a/src/acquisition/covid_hosp/facility/database.py b/src/acquisition/covid_hosp/facility/database.py index 665256a4f..172f32dc4 100644 --- a/src/acquisition/covid_hosp/facility/database.py +++ b/src/acquisition/covid_hosp/facility/database.py @@ -40,7 +40,7 @@ class Database(BaseDatabase): Columndef('ccn', 'ccn', str), Columndef('city', 'city', str), Columndef('fips_code', 'fips_code', str), - Columndef('geocoded_hospital_address', 'geocoded_hospital_address', str), + Columndef('geocoded_hospital_address', 'geocoded_hospital_address', Utils.limited_geocode), Columndef('hhs_ids', 'hhs_ids', str), Columndef('hospital_name', 'hospital_name', str), Columndef('hospital_subtype', 'hospital_subtype', str), diff --git a/src/acquisition/covidcast/covidcast_meta_cache_updater.py b/src/acquisition/covidcast/covidcast_meta_cache_updater.py index a46345b62..b4eff0d08 100644 --- a/src/acquisition/covidcast/covidcast_meta_cache_updater.py +++ b/src/acquisition/covidcast/covidcast_meta_cache_updater.py @@ -7,7 +7,7 @@ # first party from delphi.epidata.acquisition.covidcast.database import Database -from delphi.epidata.acquisition.covidcast.logger import get_structured_logger +from delphi.epidata.acquisition.common.logger import get_structured_logger from delphi.epidata.client.delphi_epidata import Epidata def get_argument_parser(): diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index 0fa936802..3eaec7d2a 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -16,7 +16,7 @@ from delphi_utils import Nans from delphi.utils.epiweek import delta_epiweeks from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow -from delphi.epidata.acquisition.covidcast.logger import get_structured_logger +from delphi.epidata.acquisition.common.logger import get_structured_logger DataFrameRow = NamedTuple('DFRow', [ ('geo_id', str), diff --git a/src/acquisition/covidcast/csv_to_database.py b/src/acquisition/covidcast/csv_to_database.py index 842e820c9..90270cb27 100644 --- a/src/acquisition/covidcast/csv_to_database.py +++ b/src/acquisition/covidcast/csv_to_database.py @@ -11,7 +11,7 @@ from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter, PathDetails from delphi.epidata.acquisition.covidcast.database import Database, DBLoadStateException from delphi.epidata.acquisition.covidcast.file_archiver import FileArchiver -from delphi.epidata.acquisition.covidcast.logger import get_structured_logger +from delphi.epidata.acquisition.common.logger import get_structured_logger def get_argument_parser(): diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 3beedac82..347c85841 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -14,7 +14,7 @@ # first party import delphi.operations.secrets as secrets -from delphi.epidata.acquisition.covidcast.logger import get_structured_logger +from delphi.epidata.acquisition.common.logger import get_structured_logger from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow diff --git a/src/acquisition/covidcast/delete_batch.py b/src/acquisition/covidcast/delete_batch.py index fe40897fd..ae6ddc487 100644 --- a/src/acquisition/covidcast/delete_batch.py +++ b/src/acquisition/covidcast/delete_batch.py @@ -8,7 +8,7 @@ # first party from delphi.epidata.acquisition.covidcast.database import Database -from delphi.epidata.acquisition.covidcast.logger import get_structured_logger +from delphi.epidata.acquisition.common.logger import get_structured_logger def get_argument_parser(): diff --git a/src/acquisition/covidcast/file_archiver.py b/src/acquisition/covidcast/file_archiver.py index 92686f3cf..368677133 100644 --- a/src/acquisition/covidcast/file_archiver.py +++ b/src/acquisition/covidcast/file_archiver.py @@ -6,7 +6,7 @@ import shutil # first party -from delphi.epidata.acquisition.covidcast.logger import get_structured_logger +from delphi.epidata.acquisition.common.logger import get_structured_logger class FileArchiver: """Archives files by moving and compressing.""" diff --git a/src/acquisition/covidcast/signal_dash_data_generator.py b/src/acquisition/covidcast/signal_dash_data_generator.py index 2e7467487..431dae9fd 100644 --- a/src/acquisition/covidcast/signal_dash_data_generator.py +++ b/src/acquisition/covidcast/signal_dash_data_generator.py @@ -15,7 +15,7 @@ # first party import covidcast import delphi.operations.secrets as secrets -from delphi.epidata.acquisition.covidcast.logger import get_structured_logger +from delphi.epidata.acquisition.common.logger import get_structured_logger LOOKBACK_DAYS_FOR_COVERAGE = 56 diff --git a/tests/acquisition/covid_hosp/common/test_database.py b/tests/acquisition/covid_hosp/common/test_database.py index 09244dd2f..c070a00ae 100644 --- a/tests/acquisition/covid_hosp/common/test_database.py +++ b/tests/acquisition/covid_hosp/common/test_database.py @@ -144,9 +144,9 @@ def test_insert_dataset(self): result = database.insert_dataset(sentinel.publication_date, dataset) self.assertIsNone(result) - self.assertEqual(mock_cursor.execute.call_count, 6) + self.assertEqual(mock_cursor.executemany.call_count, 1) - actual_sql = mock_cursor.execute.call_args[0][0] + actual_sql = mock_cursor.executemany.call_args[0][0] self.assertIn( 'INSERT INTO `test_table` (`id`, `publication_date`, `sql_str_col`, `sql_int_col`, `sql_float_col`)', actual_sql) @@ -162,5 +162,9 @@ def test_insert_dataset(self): for i, expected in enumerate(expected_values): with self.subTest(name=f'row {i + 1}'): - actual = mock_cursor.execute.call_args_list[i][0][1] + # [0]: the first call() object + # [0]: get positional args out of the call() object + # [-1]: the last arg of the executemany call + # [i]: the ith row inserted in the executemany + actual = mock_cursor.executemany.call_args_list[0][0][-1][i] self.assertEqual(actual, (0, sentinel.publication_date) + expected) diff --git a/tests/acquisition/covid_hosp/facility/test_database.py b/tests/acquisition/covid_hosp/facility/test_database.py index 28872a6ac..2e1ee29fe 100644 --- a/tests/acquisition/covid_hosp/facility/test_database.py +++ b/tests/acquisition/covid_hosp/facility/test_database.py @@ -35,9 +35,14 @@ def test_insert_dataset(self): result = database.insert_dataset(sentinel.publication_date, dataset) self.assertIsNone(result) - self.assertEqual(mock_cursor.execute.call_count, 22) - - last_query_values = mock_cursor.execute.call_args[0][-1] + # once for the values, once for the keys + self.assertEqual(mock_cursor.executemany.call_count, 2) + + # [0]: the first call() object + # [0]: get the positional args out of the call() object + # [-1]: the last arg of the executemany call + # [-1]: the last row inserted in the executemany + last_query_values = mock_cursor.executemany.call_args_list[0][0][-1][-1] expected_query_values = ( 0, sentinel.publication_date, '450822', 20201130, '6800 N MACARTHUR BLVD', 61.1, 7, 428, 60.9, 7, 426, 61.1, 7, 428, diff --git a/tests/acquisition/covid_hosp/state_daily/test_database.py b/tests/acquisition/covid_hosp/state_daily/test_database.py index efa439669..95401d7cc 100644 --- a/tests/acquisition/covid_hosp/state_daily/test_database.py +++ b/tests/acquisition/covid_hosp/state_daily/test_database.py @@ -38,9 +38,9 @@ def test_insert_dataset(self): result = database.insert_dataset(sentinel.issue, dataset) self.assertIsNone(result) - self.assertEqual(mock_cursor.execute.call_count, 53) + self.assertEqual(mock_cursor.executemany.call_count, 1) - last_query_values = mock_cursor.execute.call_args[0][-1] + last_query_values = mock_cursor.executemany.call_args[0][-1][-1] expected_query_values = ( 0, sentinel.issue, 'WY', 20201209, 0.2519685039370078, 29, 127, 32, 0.4233576642335766, 31, 137, 58, 22, 2, diff --git a/tests/acquisition/covid_hosp/state_timeseries/test_database.py b/tests/acquisition/covid_hosp/state_timeseries/test_database.py index 2649f7b5f..24897d42d 100644 --- a/tests/acquisition/covid_hosp/state_timeseries/test_database.py +++ b/tests/acquisition/covid_hosp/state_timeseries/test_database.py @@ -36,9 +36,9 @@ def test_insert_dataset(self): result = database.insert_dataset(sentinel.issue, dataset) self.assertIsNone(result) - self.assertEqual(mock_cursor.execute.call_count, 22) + self.assertEqual(mock_cursor.executemany.call_count, 1) - last_query_values = mock_cursor.execute.call_args[0][-1] + last_query_values = mock_cursor.executemany.call_args[0][-1][-1] expected_query_values = ( 0, sentinel.issue, 'WY', 20200826, 0.0934579439252336, 26, 107, 10, 0.4298245614035088, 28, 114, 49, 19, 7, 2, None, 4, 2, 0, 1, '2', 0, 26,