diff --git a/integrations/acquisition/covid_hosp/state_daily/test_scenarios.py b/integrations/acquisition/covid_hosp/state_daily/test_scenarios.py index e55bc8ca6..1801de932 100644 --- a/integrations/acquisition/covid_hosp/state_daily/test_scenarios.py +++ b/integrations/acquisition/covid_hosp/state_daily/test_scenarios.py @@ -41,6 +41,7 @@ def setUp(self): # clear relevant tables with Database.connect() as db: with db.new_cursor() as cur: + cur.execute('truncate table covid_hosp_state_daily') cur.execute('truncate table covid_hosp_state_timeseries') cur.execute('truncate table covid_hosp_meta') diff --git a/integrations/acquisition/covid_hosp/state_timeseries/test_scenarios.py b/integrations/acquisition/covid_hosp/state_timeseries/test_scenarios.py index 5d13ccbb0..d62550a32 100644 --- a/integrations/acquisition/covid_hosp/state_timeseries/test_scenarios.py +++ b/integrations/acquisition/covid_hosp/state_timeseries/test_scenarios.py @@ -37,6 +37,7 @@ def setUp(self): # clear relevant tables with Database.connect() as db: with db.new_cursor() as cur: + cur.execute('truncate table covid_hosp_state_daily') cur.execute('truncate table covid_hosp_state_timeseries') cur.execute('truncate table covid_hosp_meta') diff --git a/integrations/server/test_covid_hosp.py b/integrations/server/test_covid_hosp.py index 16538b82d..7f53d6174 100644 --- a/integrations/server/test_covid_hosp.py +++ b/integrations/server/test_covid_hosp.py @@ -25,14 +25,21 @@ def setUp(self): # clear relevant tables with Database.connect() as db: with db.new_cursor() as cur: + cur.execute('truncate table covid_hosp_state_daily') cur.execute('truncate table covid_hosp_state_timeseries') cur.execute('truncate table covid_hosp_meta') - def insert_issue(self, cur, issue, value, record_type): - so_many_nulls = ', '.join(['null'] * 57) + def insert_timeseries(self, cur, issue, value): + so_many_nulls = ', '.join(['null'] * 114) cur.execute(f'''insert into covid_hosp_state_timeseries values ( - 0, {issue}, 'PA', 20201118, {value}, {so_many_nulls}, '{record_type}', {so_many_nulls} + 0, {issue}, 'PA', 20201118, {value}, {so_many_nulls} + )''') + + def insert_daily(self, cur, issue, value): + so_many_nulls = ', '.join(['null'] * 114) + cur.execute(f'''insert into covid_hosp_state_daily values ( + 0, {issue}, 'PA', 20201118, {value}, {so_many_nulls} )''') def test_query_by_issue(self): @@ -42,10 +49,10 @@ def test_query_by_issue(self): with db.new_cursor() as cur: # inserting out of order to test server-side order by # also inserting two for 20201201 to test tiebreaker. - self.insert_issue(cur, 20201201, 123, 'T') - self.insert_issue(cur, 20201201, 321, 'D') - self.insert_issue(cur, 20201203, 789, 'T') - self.insert_issue(cur, 20201202, 456, 'T') + self.insert_timeseries(cur, 20201201, 123) + self.insert_daily(cur, 20201201, 321) + self.insert_timeseries(cur, 20201203, 789) + self.insert_timeseries(cur, 20201202, 456) # request without issue (defaulting to latest issue) with self.subTest(name='no issue (latest)'): @@ -86,11 +93,11 @@ def test_query_by_issue(self): def test_query_by_as_of(self): with Database.connect() as db: with db.new_cursor() as cur: - self.insert_issue(cur, 20201101, 0, 'T') - self.insert_issue(cur, 20201102, 1, 'D') - self.insert_issue(cur, 20201103, 2, 'D') - self.insert_issue(cur, 20201103, 3, 'T') - self.insert_issue(cur, 20201104, 4, 'T') + self.insert_timeseries(cur, 20201101, 0) + self.insert_daily(cur, 20201102, 1) + self.insert_daily(cur, 20201103, 2) + self.insert_timeseries(cur, 20201103, 3) + self.insert_timeseries(cur, 20201104, 4) with self.subTest(name='as_of with multiple issues'): response = Epidata.covid_hosp('PA', 20201118, as_of=20201103) diff --git a/src/acquisition/covid_hosp/common/database.py b/src/acquisition/covid_hosp/common/database.py index 4fd0981a1..173ae4a7a 100644 --- a/src/acquisition/covid_hosp/common/database.py +++ b/src/acquisition/covid_hosp/common/database.py @@ -22,8 +22,7 @@ def __init__(self, table_name=None, hhs_dataset_id=None, columns_and_types=None, - key_columns=None, - additional_fields=None): + key_columns=None): """Create a new Database object. Parameters @@ -37,22 +36,18 @@ def __init__(self, columns_and_types : tuple[str, str, Callable] List of 3-tuples of (CSV header name, SQL column name, data type) for all the columns in the CSV file. - additional_fields : tuple[str] - List of 2-tuples of (value, SQL column name) fordditional fields to include - at the end of the row which are not present in the CSV data. """ self.connection = connection self.table_name = table_name self.hhs_dataset_id = hhs_dataset_id - self.publication_col_name = "issue" if table_name == 'covid_hosp_state_timeseries' else \ + self.publication_col_name = "issue" if table_name == 'covid_hosp_state_timeseries' or table_name == "covid_hosp_state_daily" else \ 'publication_date' self.columns_and_types = { c.csv_name: c for c in (columns_and_types if columns_and_types is not None else []) } self.key_columns = key_columns if key_columns is not None else [] - self.additional_fields = additional_fields if additional_fields is not None else [] @classmethod def logger(database_class): @@ -184,9 +179,9 @@ def nan_safe_dtype(dtype, value): for csv_name in self.key_columns: dataframe.loc[:, csv_name] = dataframe[csv_name].map(self.columns_and_types[csv_name].dtype) - num_columns = 2 + len(dataframe_columns_and_types) + len(self.additional_fields) + num_columns = 2 + len(dataframe_columns_and_types) value_placeholders = ', '.join(['%s'] * num_columns) - columns = ', '.join(f'`{i.sql_name}`' for i in dataframe_columns_and_types + self.additional_fields) + columns = ', '.join(f'`{i.sql_name}`' for i in dataframe_columns_and_types) sql = f'INSERT INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columns}) ' \ f'VALUES ({value_placeholders})' id_and_publication_date = (0, publication_date) @@ -200,8 +195,7 @@ def nan_safe_dtype(dtype, value): for c in dataframe_columns_and_types: values.append(nan_safe_dtype(c.dtype, row[c.csv_name])) many_values.append(id_and_publication_date + - tuple(values) + - tuple(i.csv_name for i in self.additional_fields)) + tuple(values)) n += 1 # insert in batches because one at a time is slow and all at once makes # the connection drop :( diff --git a/src/acquisition/covid_hosp/state_daily/database.py b/src/acquisition/covid_hosp/state_daily/database.py index 6a8228994..24e99b878 100644 --- a/src/acquisition/covid_hosp/state_daily/database.py +++ b/src/acquisition/covid_hosp/state_daily/database.py @@ -7,8 +7,7 @@ class Database(BaseDatabase): - # note we share a database with state_timeseries - TABLE_NAME = 'covid_hosp_state_timeseries' + TABLE_NAME = 'covid_hosp_state_daily' KEY_COLS = ['state', 'reporting_cutoff_start'] # These are 3-tuples of (CSV header name, SQL db column name, data type) for # all the columns in the CSV file. @@ -226,5 +225,4 @@ def __init__(self, *args, **kwargs): table_name=Database.TABLE_NAME, hhs_dataset_id=Network.DATASET_ID, columns_and_types=Database.ORDERED_CSV_COLUMNS, - key_columns=Database.KEY_COLS, - additional_fields=[Columndef('D', 'record_type', None)]) + key_columns=Database.KEY_COLS) diff --git a/src/acquisition/covid_hosp/state_timeseries/database.py b/src/acquisition/covid_hosp/state_timeseries/database.py index 348d9fc0b..0b53965cd 100644 --- a/src/acquisition/covid_hosp/state_timeseries/database.py +++ b/src/acquisition/covid_hosp/state_timeseries/database.py @@ -225,5 +225,4 @@ def __init__(self, *args, **kwargs): table_name=Database.TABLE_NAME, hhs_dataset_id=Network.DATASET_ID, columns_and_types=Database.ORDERED_CSV_COLUMNS, - key_columns=Database.KEY_COLS, - additional_fields=[Columndef('T', 'record_type', None)]) + key_columns=Database.KEY_COLS) diff --git a/src/ddl/covid_hosp.sql b/src/ddl/covid_hosp.sql index 2ffe7c71a..0d74a077c 100644 --- a/src/ddl/covid_hosp.sql +++ b/src/ddl/covid_hosp.sql @@ -61,9 +61,7 @@ CREATE TABLE `covid_hosp_meta` ( /* -`covid_hosp_state_timeseries` stores the versioned "state timeseries" dataset, -which contains data from both the time series data and the daily snapshot files. - +`covid_hosp_state_timeseries` stores time series data from the versioned "state timeseries" dataset. Data is public under the Open Data Commons Open Database License (ODbL). +------------------------------------------------------------------+---------+------+-----+---------+----------------+ @@ -131,7 +129,6 @@ Data is public under the Open Data Commons Open Database License (ODbL). | adult_icu_bed_utilization_coverage | int(11) | YES | | NULL | | | adult_icu_bed_utilization_numerator | int(11) | YES | | NULL | | | adult_icu_bed_utilization_denominator | int(11) | YES | | NULL | | -| record_type | char(1) | NO | MUL | NULL | | +------------------------------------------------------------------+---------+------+-----+---------+----------------+ - `id` @@ -366,14 +363,6 @@ For daily snapshot files, there is a `reporting_cutoff_start` value, defined as "Look back date start - The latest reports from each hospital is summed for this report starting with this date." We place this value into the `date` column. - -We also add a column `record_type` that specifies if a row came from a -time series file or a daily snapshot file. "T" = time series and -"D" = daily snapshot. When both a time series and a daily snapshot row -have the same issue/date/state but different values, we tiebreak by -taking the daily snapshot value. This is done with a window function that -sorts by the record_type field, ascending, and so it is important that "D" -comes before "T". */ CREATE TABLE `covid_hosp_state_timeseries` ( @@ -439,7 +428,6 @@ CREATE TABLE `covid_hosp_state_timeseries` ( `adult_icu_bed_utilization_coverage` INT, `adult_icu_bed_utilization_numerator` INT, `adult_icu_bed_utilization_denominator` INT, - `record_type` CHAR(1) NOT NULL, -- new columns added Oct 10 `geocoded_state` VARCHAR(32), `previous_day_admission_adult_covid_confirmed_18_19` INT, @@ -500,15 +488,34 @@ CREATE TABLE `covid_hosp_state_timeseries` ( `total_patients_hospitalized_confirmed_influenza_coverage` INT, PRIMARY KEY (`id`), -- for uniqueness - -- for fast lookup of most recent issue for a given state, date, and record type - UNIQUE KEY `issue_by_state_and_date` (`state`, `date`, `issue`, `record_type`), - -- for fast lookup of a time-series for a given state, issue, and record type - KEY `date_by_issue_and_state` (`issue`, `state`, `date`, `record_type`), - -- for fast lookup of all states for a given date, issue, and record_type - KEY `state_by_issue_and_date` (`issue`, `date`, `state`, `record_type`) + -- for fast lookup of most recent issue for a given state and date + UNIQUE KEY `issue_by_state_and_date` (`state`, `date`, `issue`), + -- for fast lookup of a time-series for a given state and issue + KEY `date_by_issue_and_state` (`issue`, `state`, `date`), + -- for fast lookup of all states for a given date and issue + KEY `state_by_issue_and_date` (`issue`, `date`, `state`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; +/* +`covid_hosp_state_daily` stores the versioned "state timeseries" dataset, +which contains data from the daily snapshot files. +Schema is equivalent to `covid_hosp_state_timeseries`. +*/ +CREATE TABLE `covid_hosp_state_daily` ( + -- for uniqueness + PRIMARY KEY (`id`), + -- for fast lookup of most recent issue for a given state and date + UNIQUE KEY `issue_by_state_and_date` (`state`, `date`, `issue`), + -- for fast lookup of a time-series for a given state and issue + KEY `date_by_issue_and_state` (`issue`, `state`, `date`), + -- for fast lookup of all states for a given date and issue + KEY `state_by_issue_and_date` (`issue`, `date`, `state`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 +SELECT * FROM covid_hosp_state_timeseries; +-- AUTOINCREMENT is not preserved by `CREATE TABLE ... SELECT`; Re-add +ALTER TABLE covid_hosp_state_daily MODIFY id INT NOT NULL AUTO_INCREMENT; + /* `covid_hosp_facility` stores the versioned "facility" dataset. diff --git a/src/ddl/migrations/covid_hosp_state_split_tables.sql b/src/ddl/migrations/covid_hosp_state_split_tables.sql new file mode 100644 index 000000000..03e8ac7ce --- /dev/null +++ b/src/ddl/migrations/covid_hosp_state_split_tables.sql @@ -0,0 +1,24 @@ +-- 1. Add new state_daily table mirroring state_timeseries table + +CREATE TABLE `covid_hosp_state_daily` ( + -- for uniqueness + PRIMARY KEY (`id`), + -- for fast lookup of most recent issue for a given state and date + UNIQUE KEY `issue_by_state_and_date` (`state`, `date`, `issue`), + -- for fast lookup of a time-series for a given state and issue + KEY `date_by_issue_and_state` (`issue`, `state`, `date`), + -- for fast lookup of all states for a given date and issue + KEY `state_by_issue_and_date` (`issue`, `date`, `state`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 +SELECT * FROM covid_hosp_state_timeseries WHERE record_type='D'; +-- AUTOINCREMENT is not preserved by `CREATE TABLE ... SELECT`; Re-add +ALTER TABLE covid_hosp_state_daily MODIFY id INT NOT NULL AUTO_INCREMENT; + +-- 2. Remove data with incorrect record_type from timeseries table (D records were moved to daily) + +DELETE FROM `covid_hosp_state_timeseries` WHERE record_type='D'; + +-- 3. Remove the record_type column from both tables + +ALTER TABLE `covid_hosp_state_daily` DROP COLUMN record_type; +ALTER TABLE `covid_hosp_state_timeseries` DROP COLUMN record_type; diff --git a/src/server/endpoints/covid_hosp_state_timeseries.py b/src/server/endpoints/covid_hosp_state_timeseries.py index 78931ee68..e5bce751d 100644 --- a/src/server/endpoints/covid_hosp_state_timeseries.py +++ b/src/server/endpoints/covid_hosp_state_timeseries.py @@ -152,19 +152,66 @@ def handle(): q.where_integers("date", dates) q.where_strings("state", states) + # These queries prioritize the daily value if there is both a time series and daily value for a given issue/date/state. + # Further details: https://github.com/cmu-delphi/delphi-epidata/pull/336/files#diff-097d4969fdc9ac1f722809e85f3dc59ad371b66011861a50d15fcc605839c63dR364-R368 if issues is not None: + # Filter for all matching issues q.where_integers("issue", issues) - # final query using specific issues - query = f"WITH c as (SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY date, state, issue ORDER BY record_type) `row` FROM {q.table} WHERE {q.conditions_clause}) SELECT {q.fields_clause} FROM {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause}" - elif as_of is not None: - sub_condition_asof = "(issue <= :as_of)" - q.params["as_of"] = as_of - query = f"WITH c as (SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY date, state ORDER BY issue DESC, record_type) `row` FROM {q.table} WHERE {q.conditions_clause} AND {sub_condition_asof}) SELECT {q.fields_clause} FROM {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause}" + + # Get all issues matching the conditions from daily & timeseries + union_subquery = f''' + ( + SELECT *, 'D' AS record_type FROM `covid_hosp_state_daily` AS {q.alias} WHERE {q.conditions_clause} + UNION ALL + SELECT *, 'T' AS record_type FROM `covid_hosp_state_timeseries` AS {q.alias} WHERE {q.conditions_clause} + ) AS {q.alias}''' + + # Prioritize rows with record_type='D' for each issue/date/state group + query = f''' + SELECT {q.fields_clause} FROM ( + SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY issue, date, state ORDER BY record_type) AS `row` + FROM {union_subquery} + ) AS {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause} + ''' else: - # final query using most recent issues - subquery = f"(SELECT max(`issue`) `max_issue`, `date`, `state` FROM {q.table} WHERE {q.conditions_clause} GROUP BY `date`, `state`) x" - condition = f"x.`max_issue` = {q.alias}.`issue` AND x.`date` = {q.alias}.`date` AND x.`state` = {q.alias}.`state`" - query = f"WITH c as (SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY date, state, issue ORDER BY record_type) `row` FROM {q.table} JOIN {subquery} ON {condition}) select {q.fields_clause} FROM {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause}" + # Filter for most recent issues + cond_clause = q.conditions_clause + if as_of is not None: + # ...Filter for most recent issues before a given as_of + cond_clause += " AND (issue <= :as_of)" + q.params["as_of"] = as_of + + join_condition = f"{q.alias}.state = x.state AND {q.alias}.date = x.date AND {q.alias}.issue = x.max_issue" + + # Get the rows from the daily & timeseries tables with the highest issue value within each state/date group + join_daily = f''' + SELECT {q.fields_clause}, 'D' AS record_type FROM `covid_hosp_state_daily` AS {q.alias} + JOIN ( + SELECT {q.alias}.state, {q.alias}.date, MAX({q.alias}.issue) AS max_issue + FROM `covid_hosp_state_daily` AS {q.alias} + WHERE {cond_clause} + GROUP BY {q.alias}.state, {q.alias}.date + ) AS x + ON {join_condition} + ''' + join_timeseries = f''' + SELECT {q.fields_clause}, 'T' AS record_type FROM `covid_hosp_state_timeseries` AS {q.alias} + JOIN ( + SELECT {q.alias}.state, {q.alias}.date, MAX(issue) AS max_issue + FROM `covid_hosp_state_timeseries` AS {q.alias} + WHERE {cond_clause} + GROUP BY {q.alias}.state, {q.alias}.date + ) AS x + ON {join_condition} + ''' + + # Combine daily & timeseries queries, getting the combined latest issues (and prioritizing rows with record_type='D' in a tie) + query = f''' + SELECT {q.fields_clause} FROM ( + SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY state, date ORDER BY issue DESC, record_type) AS `row` + FROM ({join_daily} UNION ALL {join_timeseries}) AS {q.alias} + ) AS {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause} + ''' # send query return execute_query(query, q.params, fields_string, fields_int, fields_float) diff --git a/tests/acquisition/covid_hosp/state_daily/test_database.py b/tests/acquisition/covid_hosp/state_daily/test_database.py index 95401d7cc..ae9acd098 100644 --- a/tests/acquisition/covid_hosp/state_daily/test_database.py +++ b/tests/acquisition/covid_hosp/state_daily/test_database.py @@ -50,7 +50,7 @@ def test_insert_dataset(self): 17, 18, 19, 20, 21, 22, 23, 31, 24, 25, 15, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 29, 42, 43, 44, 45, 0, 29, 0, 29, 46, 47, 48, 49, 50, 51, 52, 58, 31, 32, 29, 32, 31, 196, 29, 189, 31, - 53, 54, 55, 56, 2, 29, 2, 29, 137, 31, 'D') + 53, 54, 55, 56, 2, 29, 2, 29, 137, 31) self.assertEqual(len(last_query_values), len(expected_query_values)) for actual, expected in zip(last_query_values, expected_query_values): diff --git a/tests/acquisition/covid_hosp/state_timeseries/test_database.py b/tests/acquisition/covid_hosp/state_timeseries/test_database.py index 24897d42d..ecea27f59 100644 --- a/tests/acquisition/covid_hosp/state_timeseries/test_database.py +++ b/tests/acquisition/covid_hosp/state_timeseries/test_database.py @@ -48,7 +48,7 @@ def test_insert_dataset(self): 24, 25, 13, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 26, 42, 43, 44, 45, 0, 21, 0, 22, 46, 47, 48, 49, 50, 51, 52, 49, 28, 10, 26, 7, 28, 17, 26, 14, 28, 53, 54, 55, 56, 0, 26, 0, 26, - 114, 28, 'T') + 114, 28) self.assertEqual(len(last_query_values), len(expected_query_values)) for actual, expected in zip(last_query_values, expected_query_values):