diff --git a/changehc/Makefile b/changehc/Makefile index bc88f1fec..31d406fc4 100644 --- a/changehc/Makefile +++ b/changehc/Makefile @@ -17,8 +17,8 @@ install-ci: venv pip install . lint: - . env/bin/activate; pylint $(dir) - . env/bin/activate; pydocstyle $(dir) + . env/bin/activate; pylint $(dir);\ + pydocstyle $(dir) test: . env/bin/activate ;\ diff --git a/changehc/delphi_changehc/config.py b/changehc/delphi_changehc/config.py index a7232ddf7..81fed44dc 100644 --- a/changehc/delphi_changehc/config.py +++ b/changehc/delphi_changehc/config.py @@ -24,29 +24,36 @@ class Config: ## data columns COVID_COL = "COVID" DENOM_COL = "Denominator" + DENOM_INPATIENT_STATE_COL = "Denominator-Inpatient-State" FLU_COL = "Flu" MIXED_COL = "Mixed" FLU_LIKE_COL = "Flu-like" COVID_LIKE_COL = "Covid-like" - COUNT_COLS = [COVID_COL,DENOM_COL,FLU_COL,MIXED_COL,FLU_LIKE_COL,COVID_LIKE_COL] + FLU_INPATIENT_COL = 'Flu-Inpatient' + COUNT_COLS = [COVID_COL,DENOM_COL,FLU_COL,MIXED_COL,FLU_LIKE_COL,COVID_LIKE_COL, FLU_INPATIENT_COL] DATE_COL = "timestamp" GEO_COL = "fips" + GEO_COL_STATE = 'state_code' ID_COLS = [DATE_COL] + [GEO_COL] FILT_COLS = ID_COLS + COUNT_COLS DENOM_COLS = [DATE_COL, GEO_COL, DENOM_COL] + DENOM_COLS_STATE = [DATE_COL, GEO_COL_STATE, DENOM_INPATIENT_STATE_COL] COVID_COLS = [DATE_COL, GEO_COL, COVID_COL] FLU_COLS = [DATE_COL, GEO_COL, FLU_COL] MIXED_COLS = [DATE_COL, GEO_COL, MIXED_COL] FLU_LIKE_COLS = [DATE_COL, GEO_COL, FLU_LIKE_COL] COVID_LIKE_COLS = [DATE_COL, GEO_COL, COVID_LIKE_COL] + FLU_INPATIENT_COLS = [GEO_COL_STATE, DATE_COL, FLU_INPATIENT_COL] DENOM_DTYPES = {DATE_COL: str, DENOM_COL: str, GEO_COL: str} + DENOM_DTYPES_STATE = {DATE_COL: str, DENOM_INPATIENT_STATE_COL: str, GEO_COL_STATE: str} COVID_DTYPES = {DATE_COL: str, COVID_COL: str, GEO_COL: str} FLU_DTYPES = {DATE_COL: str, FLU_COL: str, GEO_COL: str} MIXED_DTYPES = {DATE_COL: str, MIXED_COL: str, GEO_COL: str} FLU_LIKE_DTYPES = {DATE_COL: str, FLU_LIKE_COL: str, GEO_COL: str} COVID_LIKE_DTYPES = {DATE_COL: str, COVID_LIKE_COL: str, GEO_COL: str} + FLU_INPATIENT_DTYPES = {DATE_COL: str, FLU_INPATIENT_COL: str, GEO_COL_STATE: str} SMOOTHER_BANDWIDTH = 100 # bandwidth for the linear left Gaussian filter MIN_DEN = 100 # number of total visits needed to produce a sensor diff --git a/changehc/delphi_changehc/constants.py b/changehc/delphi_changehc/constants.py index a458f8819..6654124cc 100644 --- a/changehc/delphi_changehc/constants.py +++ b/changehc/delphi_changehc/constants.py @@ -5,7 +5,10 @@ SMOOTHED_ADJ_CLI = "smoothed_adj_outpatient_cli" SMOOTHED_FLU = "smoothed_outpatient_flu" SMOOTHED_ADJ_FLU = "smoothed_adj_outpatient_flu" -SIGNALS = [SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, SMOOTHED_FLU, SMOOTHED_ADJ_FLU] +SMOOTHED_FLU_INPATIENT = "smoothed_inpatient_flu" +SMOOTHED_ADJ_FLU_INPATIENT = "smoothed_adj_inpatient_flu" + +SIGNALS = [SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, SMOOTHED_FLU, SMOOTHED_ADJ_FLU, SMOOTHED_FLU_INPATIENT, SMOOTHED_ADJ_FLU_INPATIENT] NA = "NA" HRR = "hrr" FIPS = "fips" diff --git a/changehc/delphi_changehc/load_data.py b/changehc/delphi_changehc/load_data.py index c4a5f1e9c..f55f1f2f1 100644 --- a/changehc/delphi_changehc/load_data.py +++ b/changehc/delphi_changehc/load_data.py @@ -30,7 +30,8 @@ def load_chng_data(filepath, dropdate, base_geo, Returns: cleaned dataframe """ - assert base_geo == "fips", "base unit must be 'fips'" + assert base_geo == "fips" or (counts_col in {Config.FLU_INPATIENT_COL,Config.DENOM_INPATIENT_STATE_COL} and + base_geo == "state_code"), "base unit must be 'fips', or state_code for Flu-Inpatient" count_flag = False date_flag = False geo_flag = False @@ -39,11 +40,11 @@ def load_chng_data(filepath, dropdate, base_geo, count_flag = True elif n == Config.DATE_COL: date_flag = True - elif n == "fips": + elif n == base_geo: geo_flag = True assert count_flag, "counts_col must be present in col_names" assert date_flag, "'%s' must be present in col_names"%(Config.DATE_COL) - assert geo_flag, "'fips' must be present in col_names" + assert geo_flag, "'base_geo (%s) must be present in col_names"%(base_geo) data = pd.read_csv( filepath, @@ -225,3 +226,36 @@ def load_flu_data(denom_filepath, flu_filepath, base_geo, issue_date, test_mode=False, check_nd=25) store_backfill_file(data, issue_date, backfill_dir, numtype, geo, weekday) return data + + +def load_flu_inpatient_data(denom_filepath, flu_filepath, dropdate, base_geo): + """Load in denominator and flu inpatient data, and combine them. + + Args: + denom_filepath: path to the aggregated denominator data + flu_filepath: path to the aggregated flu inpatient data + dropdate: data drop date (datetime object) + base_geo: base geographic unit before aggregation ('state_code') + + Returns: + combined multiindexed dataframe, index 0 is geo_base, index 1 is date + """ + assert base_geo == "state_code", "base unit must be 'state_code'" + + # load each data stream + denom_data = load_chng_data(denom_filepath, dropdate, base_geo, + Config.DENOM_COLS_STATE, Config.DENOM_DTYPES_STATE, Config.DENOM_INPATIENT_STATE_COL) + flu_data = load_chng_data(flu_filepath, dropdate, base_geo, + Config.FLU_INPATIENT_COLS, Config.FLU_INPATIENT_DTYPES, Config.FLU_INPATIENT_COL) + + # merge data + data = denom_data.merge(flu_data, how="outer", left_index=True, right_index=True) + assert data.isna().all(axis=1).sum() == 0, "entire row is NA after merge" + + # calculate combined numerator and denominator + data.fillna(0, inplace=True) + data["num"] = data[Config.FLU_INPATIENT_COL] + data["den"] = data[Config.DENOM_INPATIENT_STATE_COL] + data = data[["num", "den"]] + + return data diff --git a/changehc/delphi_changehc/run.py b/changehc/delphi_changehc/run.py index 8d4d25261..4b1688189 100644 --- a/changehc/delphi_changehc/run.py +++ b/changehc/delphi_changehc/run.py @@ -15,7 +15,7 @@ # first party from .download_ftp_files import download_counts -from .load_data import (load_combined_data, load_cli_data, load_flu_data) +from .load_data import load_combined_data, load_cli_data, load_flu_data, load_flu_inpatient_data from .update_sensor import CHCSensorUpdater @@ -34,6 +34,8 @@ def retrieve_files(params, filedate, logger): mixed_file = "%s/%s_Counts_Products_Mixed.dat.gz" % (params["indicator"]["input_cache_dir"],filedate) flu_like_file = "%s/%s_Counts_Products_Flu_Like.dat.gz" % (params["indicator"]["input_cache_dir"],filedate) covid_like_file = "%s/%s_Counts_Products_Covid_Like.dat.gz" % (params["indicator"]["input_cache_dir"],filedate) + flu_inpatient_file = "%s/%s_Counts_Products_Flu_Inpatient.dat.gz" % (params["indicator"]["input_cache_dir"],filedate) + denom_inpatient_state_file = "%s/%s_Counts_Products_Denom_Inpatient_By_State.dat.gz" % (params["indicator"]["input_cache_dir"],filedate) else: denom_file = files["denom"] covid_file = files["covid"] @@ -41,6 +43,8 @@ def retrieve_files(params, filedate, logger): mixed_file = files["mixed"] flu_like_file = files["flu_like"] covid_like_file = files["covid_like"] + flu_inpatient_file = files["flu_inpatient"] + denom_inpatient_state_file = files["denom_inpatient_state"] file_dict = {"denom": denom_file} if "covid" in params["indicator"]["types"]: @@ -52,6 +56,9 @@ def retrieve_files(params, filedate, logger): file_dict["covid_like"] = covid_like_file if "flu" in params["indicator"]["types"]: file_dict["flu"] = flu_file + if "flu_inpatient" in params["indicator"]["types"]: + file_dict["flu_inpatient"] = flu_inpatient_file + file_dict["denom_inpatient_state"] = denom_inpatient_state_file return file_dict @@ -77,6 +84,9 @@ def make_asserts(params): if "flu" in params["indicator"]["types"]: assert (files["denom"] is None) == (files["flu"] is None), \ "exactly one of denom and flu files are provided" + if "flu_inpatient" in params["indicator"]["types"]: + assert (files["denom_inpatient_state"] is None) == (files["flu_inpatient"] is None), \ + "exactly one of denom_inpatient_state and flu_inpatient files are provided" def run_module(params: Dict[str, Dict[str, Any]]): @@ -164,6 +174,9 @@ def run_module(params: Dict[str, Dict[str, Any]]): stats = [] for geo in params["indicator"]["geos"]: for numtype in params["indicator"]["types"]: + if numtype == "flu_inpatient" and geo not in ("state", "nation", "hhs"): + logger.info("Skipping because flu_inpatient is not available at this geo", geo = geo) + continue for weekday in params["indicator"]["weekday"]: if weekday: logger.info("starting weekday adj", geo = geo, numtype = numtype) @@ -182,22 +195,30 @@ def run_module(params: Dict[str, Dict[str, Any]]): logger ) if numtype == "covid": + base_geo = "fips" data = load_combined_data(file_dict["denom"], - file_dict["covid"], "fips", + file_dict["covid"], base_geo, backfill_dir, geo, weekday, numtype, generate_backfill_files, backfill_merge_day) elif numtype == "cli": + base_geo = "fips" data = load_cli_data(file_dict["denom"],file_dict["flu"],file_dict["mixed"], - file_dict["flu_like"],file_dict["covid_like"], "fips", + file_dict["flu_like"],file_dict["covid_like"], base_geo, backfill_dir, geo, weekday, numtype, generate_backfill_files, backfill_merge_day) elif numtype == "flu": + base_geo = "fips" data = load_flu_data(file_dict["denom"],file_dict["flu"], - "fips",backfill_dir, geo, weekday, + base_geo,backfill_dir, geo, weekday, numtype, generate_backfill_files, backfill_merge_day) + elif numtype == "flu_inpatient": + base_geo = "state_code" + data = load_flu_inpatient_data(file_dict["denom_inpatient_state"],file_dict["flu_inpatient"], + dropdate_dt,base_geo) more_stats = su_inst.update_sensor( data, params["common"]["export_dir"], + base_geo ) stats.extend(more_stats) diff --git a/changehc/delphi_changehc/update_sensor.py b/changehc/delphi_changehc/update_sensor.py index 52a1af47f..8bdd77337 100644 --- a/changehc/delphi_changehc/update_sensor.py +++ b/changehc/delphi_changehc/update_sensor.py @@ -16,7 +16,8 @@ # first party from .config import Config from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI,\ - SMOOTHED_FLU, SMOOTHED_ADJ_FLU, NA + SMOOTHED_FLU, SMOOTHED_ADJ_FLU, SMOOTHED_FLU_INPATIENT,\ + SMOOTHED_ADJ_FLU_INPATIENT, NA from .sensor import CHCSensor @@ -117,6 +118,8 @@ def __init__(self, signal_name = SMOOTHED_ADJ_CLI if self.weekday else SMOOTHED_CLI elif self.numtype == "flu": signal_name = SMOOTHED_ADJ_FLU if self.weekday else SMOOTHED_FLU + elif self.numtype == 'flu_inpatient': + signal_name = SMOOTHED_ADJ_FLU_INPATIENT if self.weekday else SMOOTHED_FLU_INPATIENT else: raise ValueError(f'Unsupported numtype received "{numtype}",' f' must be one of ["covid", "cli", "flu"]') @@ -138,7 +141,7 @@ def shift_dates(self): self.sensor_dates = drange(self.startdate, self.enddate) return True - def geo_reindex(self, data): + def geo_reindex(self, data, base_geo): """Reindex based on geography, include all date, geo pairs. Args: @@ -154,6 +157,7 @@ def geo_reindex(self, data): "'state', 'msa', 'hrr', 'hss','nation'".format(geo)) return False if geo == "county": + assert base_geo == "fips", "can only convert fips to county, not %s"%(base_geo) data_frame = gmpr.fips_to_megacounty(data, Config.MIN_DEN, Config.MAX_BACKFILL_WINDOW, @@ -161,11 +165,10 @@ def geo_reindex(self, data): mega_col=geo, date_col=Config.DATE_COL) elif geo == "state": - data_frame = gmpr.replace_geocode(data, "fips", "state_id", new_col="state", + data_frame = gmpr.replace_geocode(data, base_geo, "state_id", new_col="state", date_col=Config.DATE_COL) else: - data_frame = gmpr.replace_geocode(data, "fips", geo, date_col=Config.DATE_COL) - + data_frame = gmpr.replace_geocode(data, base_geo, geo, date_col=Config.DATE_COL) unique_geo_ids = pd.unique(data_frame[geo]) data_frame.set_index([geo, Config.DATE_COL],inplace=True) # for each location, fill in all missing dates with 0 values @@ -181,12 +184,14 @@ def geo_reindex(self, data): def update_sensor(self, data, - output_path): + output_path, + base_geo): """Generate sensor values, and write to csv format. Args: data: pd.DataFrame with columns num and den output_path: output path for the csv results + base_geo: base geographic unit of data before aggregation """ self.shift_dates() final_sensor_idxs = (self.burn_in_dates >= self.startdate) &\ @@ -194,7 +199,7 @@ def update_sensor(self, # load data data.reset_index(inplace=True) - data_frame = self.geo_reindex(data) + data_frame = self.geo_reindex(data, base_geo) # handle if we need to adjust by weekday wd_params = Weekday.get_params( data_frame, diff --git a/changehc/tests/test_data/20200601_Counts_Products_Denom_Inpatient_By_State.dat.gz b/changehc/tests/test_data/20200601_Counts_Products_Denom_Inpatient_By_State.dat.gz new file mode 100644 index 000000000..7370de22d Binary files /dev/null and b/changehc/tests/test_data/20200601_Counts_Products_Denom_Inpatient_By_State.dat.gz differ diff --git a/changehc/tests/test_data/20200601_Counts_Products_Flu_Inpatient.dat.gz b/changehc/tests/test_data/20200601_Counts_Products_Flu_Inpatient.dat.gz new file mode 100644 index 000000000..020d25e14 Binary files /dev/null and b/changehc/tests/test_data/20200601_Counts_Products_Flu_Inpatient.dat.gz differ diff --git a/changehc/tests/test_load_data.py b/changehc/tests/test_load_data.py index 9ce6f94f8..fc01f95d8 100644 --- a/changehc/tests/test_load_data.py +++ b/changehc/tests/test_load_data.py @@ -18,13 +18,19 @@ "input_denom_file": "test_data/20200601_Counts_Products_Denom.dat.gz", "input_covid_file": "test_data/20200601_Counts_Products_Covid.dat.gz", "input_flu_file": "test_data/20200601_Counts_Products_Covid.dat.gz", + "input_denom_inpatient_state_file": "test_data/20200601_Counts_Products_Denom_Inpatient_By_State.dat.gz", + "input_flu_inpatient_file": "test_data/20200601_Counts_Products_Flu_Inpatient.dat.gz", "backfill_dir": "./backfill", "drop_date": "2020-06-01" } } COVID_FILEPATH = PARAMS["indicator"]["input_covid_file"] FLU_FILEPATH = PARAMS["indicator"]["input_flu_file"] +FLU_INPATIENT_FILEPATH = PARAMS["indicator"]["input_flu_inpatient_file"] + DENOM_FILEPATH = PARAMS["indicator"]["input_denom_file"] +DENOM_INPATIENT_STATE_FILEPATH = PARAMS["indicator"]["input_denom_inpatient_state_file"] + DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"]) backfill_dir = PARAMS["indicator"]["backfill_dir"] @@ -42,6 +48,8 @@ class TestLoadData: True, backfill_merge_day) flu_data = load_flu_data(DENOM_FILEPATH, FLU_FILEPATH, "fips", backfill_dir, geo, weekday, "flu", True, backfill_merge_day) + flu_inpatient_data = load_flu_inpatient_data(DENOM_INPATIENT_STATE_FILEPATH, + FLU_INPATIENT_FILEPATH, DROP_DATE, "state_code") gmpr = GeoMapper() def test_base_unit(self): @@ -60,6 +68,10 @@ def test_base_unit(self): with pytest.raises(AssertionError): load_flu_data(DENOM_FILEPATH, FLU_FILEPATH, "foo", backfill_dir, geo, weekday, "covid", True, backfill_merge_day) + + with pytest.raises(AssertionError): + load_flu_inpatient_data(DENOM_INPATIENT_STATE_FILEPATH, FLU_INPATIENT_FILEPATH, + DROP_DATE, "foo") def test_denom_columns(self): assert "fips" in self.denom_data.index.names @@ -98,6 +110,17 @@ def test_flu_columns(self): assert col in self.flu_data.columns assert len( set(self.flu_data.columns) - set(expected_flu_columns)) == 0 + + def test_flu_inpatient_columns(self): + assert "state_code" in self.flu_inpatient_data.index.names + assert "timestamp" in self.flu_inpatient_data.index.names + + expected_flu_columns = ["num", "den"] + for col in expected_flu_columns: + assert col in self.flu_inpatient_data.columns + assert len( + set(self.flu_inpatient_data.columns) - set(expected_flu_columns)) == 0 + def test_edge_values(self): for data in [self.denom_data, diff --git a/changehc/tests/test_update_sensor.py b/changehc/tests/test_update_sensor.py index 999fed7e8..0bb3d9817 100644 --- a/changehc/tests/test_update_sensor.py +++ b/changehc/tests/test_update_sensor.py @@ -89,7 +89,7 @@ def test_geo_reindex(self): "fips": ['01001'] * 7 + ['04007'] * 6, "den": [1000] * 7 + [2000] * 6, "timestamp": [pd.Timestamp(f'03-{i}-2020') for i in range(1, 14)]}) - data_frame = su_inst.geo_reindex(test_data) + data_frame = su_inst.geo_reindex(test_data, "fips") assert data_frame.shape[0] == multiple*len(su_inst.fit_dates) assert (data_frame.sum(numeric_only=True) == (4200,19000)).all() @@ -120,7 +120,7 @@ def test_update_sensor(self): "den": [30, 50, 50, 10, 1, 5, 5, 50, 50, 50, 0, 0, 0] * 2, "timestamp": list(pd.date_range("20200301", "20200313")) * 2 }).set_index(["fips", "timestamp"]) - su_inst.update_sensor(small_test_data, td.name) + su_inst.update_sensor(small_test_data, td.name, "fips") for f in os.listdir(td.name): outputs[f] = pd.read_csv(os.path.join(td.name, f)) @@ -157,7 +157,7 @@ def test_update_sensor_output_daterange(self): "", TEST_LOGGER ) - su_inst.update_sensor(small_test_data.copy(), td.name) + su_inst.update_sensor(small_test_data.copy(), td.name, "fips") for f in os.listdir(td.name): outputs[startdate][f] = pd.read_csv(os.path.join(td.name, f)) assert len(os.listdir(td.name)) == len(su_inst.sensor_dates),\