Skip to content

Flu inpatient #1476

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
4 changes: 2 additions & 2 deletions changehc/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ install-ci: venv
pip install .

lint:
. env/bin/activate; pylint $(dir)
. env/bin/activate; pydocstyle $(dir)
. env/bin/activate; pylint $(dir);\
pydocstyle $(dir)

test:
. env/bin/activate ;\
Expand Down
9 changes: 8 additions & 1 deletion changehc/delphi_changehc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,36 @@ class Config:
## data columns
COVID_COL = "COVID"
DENOM_COL = "Denominator"
DENOM_INPATIENT_STATE_COL = "Denominator-Inpatient-State"
FLU_COL = "Flu"
MIXED_COL = "Mixed"
FLU_LIKE_COL = "Flu-like"
COVID_LIKE_COL = "Covid-like"
COUNT_COLS = [COVID_COL,DENOM_COL,FLU_COL,MIXED_COL,FLU_LIKE_COL,COVID_LIKE_COL]
FLU_INPATIENT_COL = 'Flu-Inpatient'
COUNT_COLS = [COVID_COL,DENOM_COL,FLU_COL,MIXED_COL,FLU_LIKE_COL,COVID_LIKE_COL, FLU_INPATIENT_COL]
DATE_COL = "timestamp"
GEO_COL = "fips"
GEO_COL_STATE = 'state_code'
ID_COLS = [DATE_COL] + [GEO_COL]
FILT_COLS = ID_COLS + COUNT_COLS

DENOM_COLS = [DATE_COL, GEO_COL, DENOM_COL]
DENOM_COLS_STATE = [DATE_COL, GEO_COL_STATE, DENOM_INPATIENT_STATE_COL]
COVID_COLS = [DATE_COL, GEO_COL, COVID_COL]
FLU_COLS = [DATE_COL, GEO_COL, FLU_COL]
MIXED_COLS = [DATE_COL, GEO_COL, MIXED_COL]
FLU_LIKE_COLS = [DATE_COL, GEO_COL, FLU_LIKE_COL]
COVID_LIKE_COLS = [DATE_COL, GEO_COL, COVID_LIKE_COL]
FLU_INPATIENT_COLS = [GEO_COL_STATE, DATE_COL, FLU_INPATIENT_COL]

DENOM_DTYPES = {DATE_COL: str, DENOM_COL: str, GEO_COL: str}
DENOM_DTYPES_STATE = {DATE_COL: str, DENOM_INPATIENT_STATE_COL: str, GEO_COL_STATE: str}
COVID_DTYPES = {DATE_COL: str, COVID_COL: str, GEO_COL: str}
FLU_DTYPES = {DATE_COL: str, FLU_COL: str, GEO_COL: str}
MIXED_DTYPES = {DATE_COL: str, MIXED_COL: str, GEO_COL: str}
FLU_LIKE_DTYPES = {DATE_COL: str, FLU_LIKE_COL: str, GEO_COL: str}
COVID_LIKE_DTYPES = {DATE_COL: str, COVID_LIKE_COL: str, GEO_COL: str}
FLU_INPATIENT_DTYPES = {DATE_COL: str, FLU_INPATIENT_COL: str, GEO_COL_STATE: str}

SMOOTHER_BANDWIDTH = 100 # bandwidth for the linear left Gaussian filter
MIN_DEN = 100 # number of total visits needed to produce a sensor
Expand Down
5 changes: 4 additions & 1 deletion changehc/delphi_changehc/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
SMOOTHED_ADJ_CLI = "smoothed_adj_outpatient_cli"
SMOOTHED_FLU = "smoothed_outpatient_flu"
SMOOTHED_ADJ_FLU = "smoothed_adj_outpatient_flu"
SIGNALS = [SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, SMOOTHED_FLU, SMOOTHED_ADJ_FLU]
SMOOTHED_FLU_INPATIENT = "smoothed_inpatient_flu"
SMOOTHED_ADJ_FLU_INPATIENT = "smoothed_adj_inpatient_flu"

SIGNALS = [SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, SMOOTHED_FLU, SMOOTHED_ADJ_FLU, SMOOTHED_FLU_INPATIENT, SMOOTHED_ADJ_FLU_INPATIENT]
NA = "NA"
HRR = "hrr"
FIPS = "fips"
Expand Down
40 changes: 37 additions & 3 deletions changehc/delphi_changehc/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ def load_chng_data(filepath, dropdate, base_geo,
Returns:
cleaned dataframe
"""
assert base_geo == "fips", "base unit must be 'fips'"
assert base_geo == "fips" or (counts_col in {Config.FLU_INPATIENT_COL,Config.DENOM_INPATIENT_STATE_COL} and
base_geo == "state_code"), "base unit must be 'fips', or state_code for Flu-Inpatient"
count_flag = False
date_flag = False
geo_flag = False
Expand All @@ -39,11 +40,11 @@ def load_chng_data(filepath, dropdate, base_geo,
count_flag = True
elif n == Config.DATE_COL:
date_flag = True
elif n == "fips":
elif n == base_geo:
geo_flag = True
assert count_flag, "counts_col must be present in col_names"
assert date_flag, "'%s' must be present in col_names"%(Config.DATE_COL)
assert geo_flag, "'fips' must be present in col_names"
assert geo_flag, "'base_geo (%s) must be present in col_names"%(base_geo)

data = pd.read_csv(
filepath,
Expand Down Expand Up @@ -225,3 +226,36 @@ def load_flu_data(denom_filepath, flu_filepath, base_geo,
issue_date, test_mode=False, check_nd=25)
store_backfill_file(data, issue_date, backfill_dir, numtype, geo, weekday)
return data


def load_flu_inpatient_data(denom_filepath, flu_filepath, dropdate, base_geo):
"""Load in denominator and flu inpatient data, and combine them.

Args:
denom_filepath: path to the aggregated denominator data
flu_filepath: path to the aggregated flu inpatient data
dropdate: data drop date (datetime object)
base_geo: base geographic unit before aggregation ('state_code')

Returns:
combined multiindexed dataframe, index 0 is geo_base, index 1 is date
"""
assert base_geo == "state_code", "base unit must be 'state_code'"

# load each data stream
denom_data = load_chng_data(denom_filepath, dropdate, base_geo,
Config.DENOM_COLS_STATE, Config.DENOM_DTYPES_STATE, Config.DENOM_INPATIENT_STATE_COL)
flu_data = load_chng_data(flu_filepath, dropdate, base_geo,
Config.FLU_INPATIENT_COLS, Config.FLU_INPATIENT_DTYPES, Config.FLU_INPATIENT_COL)

# merge data
data = denom_data.merge(flu_data, how="outer", left_index=True, right_index=True)
assert data.isna().all(axis=1).sum() == 0, "entire row is NA after merge"

# calculate combined numerator and denominator
data.fillna(0, inplace=True)
data["num"] = data[Config.FLU_INPATIENT_COL]
data["den"] = data[Config.DENOM_INPATIENT_STATE_COL]
data = data[["num", "den"]]

return data
29 changes: 25 additions & 4 deletions changehc/delphi_changehc/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

# first party
from .download_ftp_files import download_counts
from .load_data import (load_combined_data, load_cli_data, load_flu_data)
from .load_data import load_combined_data, load_cli_data, load_flu_data, load_flu_inpatient_data
from .update_sensor import CHCSensorUpdater


Expand All @@ -34,13 +34,17 @@ def retrieve_files(params, filedate, logger):
mixed_file = "%s/%s_Counts_Products_Mixed.dat.gz" % (params["indicator"]["input_cache_dir"],filedate)
flu_like_file = "%s/%s_Counts_Products_Flu_Like.dat.gz" % (params["indicator"]["input_cache_dir"],filedate)
covid_like_file = "%s/%s_Counts_Products_Covid_Like.dat.gz" % (params["indicator"]["input_cache_dir"],filedate)
flu_inpatient_file = "%s/%s_Counts_Products_Flu_Inpatient.dat.gz" % (params["indicator"]["input_cache_dir"],filedate)
denom_inpatient_state_file = "%s/%s_Counts_Products_Denom_Inpatient_By_State.dat.gz" % (params["indicator"]["input_cache_dir"],filedate)
else:
denom_file = files["denom"]
covid_file = files["covid"]
flu_file = files["flu"]
mixed_file = files["mixed"]
flu_like_file = files["flu_like"]
covid_like_file = files["covid_like"]
flu_inpatient_file = files["flu_inpatient"]
denom_inpatient_state_file = files["denom_inpatient_state"]

file_dict = {"denom": denom_file}
if "covid" in params["indicator"]["types"]:
Expand All @@ -52,6 +56,9 @@ def retrieve_files(params, filedate, logger):
file_dict["covid_like"] = covid_like_file
if "flu" in params["indicator"]["types"]:
file_dict["flu"] = flu_file
if "flu_inpatient" in params["indicator"]["types"]:
file_dict["flu_inpatient"] = flu_inpatient_file
file_dict["denom_inpatient_state"] = denom_inpatient_state_file
return file_dict


Expand All @@ -77,6 +84,9 @@ def make_asserts(params):
if "flu" in params["indicator"]["types"]:
assert (files["denom"] is None) == (files["flu"] is None), \
"exactly one of denom and flu files are provided"
if "flu_inpatient" in params["indicator"]["types"]:
assert (files["denom_inpatient_state"] is None) == (files["flu_inpatient"] is None), \
"exactly one of denom_inpatient_state and flu_inpatient files are provided"


def run_module(params: Dict[str, Dict[str, Any]]):
Expand Down Expand Up @@ -164,6 +174,9 @@ def run_module(params: Dict[str, Dict[str, Any]]):
stats = []
for geo in params["indicator"]["geos"]:
for numtype in params["indicator"]["types"]:
if numtype == "flu_inpatient" and geo not in ("state", "nation", "hhs"):
logger.info("Skipping because flu_inpatient is not available at this geo", geo = geo)
continue
for weekday in params["indicator"]["weekday"]:
if weekday:
logger.info("starting weekday adj", geo = geo, numtype = numtype)
Expand All @@ -182,22 +195,30 @@ def run_module(params: Dict[str, Dict[str, Any]]):
logger
)
if numtype == "covid":
base_geo = "fips"
data = load_combined_data(file_dict["denom"],
file_dict["covid"], "fips",
file_dict["covid"], base_geo,
backfill_dir, geo, weekday, numtype,
generate_backfill_files, backfill_merge_day)
elif numtype == "cli":
base_geo = "fips"
data = load_cli_data(file_dict["denom"],file_dict["flu"],file_dict["mixed"],
file_dict["flu_like"],file_dict["covid_like"], "fips",
file_dict["flu_like"],file_dict["covid_like"], base_geo,
backfill_dir, geo, weekday, numtype,
generate_backfill_files, backfill_merge_day)
elif numtype == "flu":
base_geo = "fips"
data = load_flu_data(file_dict["denom"],file_dict["flu"],
"fips",backfill_dir, geo, weekday,
base_geo,backfill_dir, geo, weekday,
numtype, generate_backfill_files, backfill_merge_day)
elif numtype == "flu_inpatient":
base_geo = "state_code"
data = load_flu_inpatient_data(file_dict["denom_inpatient_state"],file_dict["flu_inpatient"],
dropdate_dt,base_geo)
more_stats = su_inst.update_sensor(
data,
params["common"]["export_dir"],
base_geo
)
stats.extend(more_stats)

Expand Down
19 changes: 12 additions & 7 deletions changehc/delphi_changehc/update_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
# first party
from .config import Config
from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI,\
SMOOTHED_FLU, SMOOTHED_ADJ_FLU, NA
SMOOTHED_FLU, SMOOTHED_ADJ_FLU, SMOOTHED_FLU_INPATIENT,\
SMOOTHED_ADJ_FLU_INPATIENT, NA
from .sensor import CHCSensor


Expand Down Expand Up @@ -117,6 +118,8 @@ def __init__(self,
signal_name = SMOOTHED_ADJ_CLI if self.weekday else SMOOTHED_CLI
elif self.numtype == "flu":
signal_name = SMOOTHED_ADJ_FLU if self.weekday else SMOOTHED_FLU
elif self.numtype == 'flu_inpatient':
signal_name = SMOOTHED_ADJ_FLU_INPATIENT if self.weekday else SMOOTHED_FLU_INPATIENT
else:
raise ValueError(f'Unsupported numtype received "{numtype}",'
f' must be one of ["covid", "cli", "flu"]')
Expand All @@ -138,7 +141,7 @@ def shift_dates(self):
self.sensor_dates = drange(self.startdate, self.enddate)
return True

def geo_reindex(self, data):
def geo_reindex(self, data, base_geo):
"""Reindex based on geography, include all date, geo pairs.

Args:
Expand All @@ -154,18 +157,18 @@ def geo_reindex(self, data):
"'state', 'msa', 'hrr', 'hss','nation'".format(geo))
return False
if geo == "county":
assert base_geo == "fips", "can only convert fips to county, not %s"%(base_geo)
data_frame = gmpr.fips_to_megacounty(data,
Config.MIN_DEN,
Config.MAX_BACKFILL_WINDOW,
thr_col="den",
mega_col=geo,
date_col=Config.DATE_COL)
elif geo == "state":
data_frame = gmpr.replace_geocode(data, "fips", "state_id", new_col="state",
data_frame = gmpr.replace_geocode(data, base_geo, "state_id", new_col="state",
date_col=Config.DATE_COL)
else:
data_frame = gmpr.replace_geocode(data, "fips", geo, date_col=Config.DATE_COL)

data_frame = gmpr.replace_geocode(data, base_geo, geo, date_col=Config.DATE_COL)
unique_geo_ids = pd.unique(data_frame[geo])
data_frame.set_index([geo, Config.DATE_COL],inplace=True)
# for each location, fill in all missing dates with 0 values
Expand All @@ -181,20 +184,22 @@ def geo_reindex(self, data):

def update_sensor(self,
data,
output_path):
output_path,
base_geo):
"""Generate sensor values, and write to csv format.

Args:
data: pd.DataFrame with columns num and den
output_path: output path for the csv results
base_geo: base geographic unit of data before aggregation
"""
self.shift_dates()
final_sensor_idxs = (self.burn_in_dates >= self.startdate) &\
(self.burn_in_dates <= self.enddate)

# load data
data.reset_index(inplace=True)
data_frame = self.geo_reindex(data)
data_frame = self.geo_reindex(data, base_geo)
# handle if we need to adjust by weekday
wd_params = Weekday.get_params(
data_frame,
Expand Down
Binary file not shown.
Binary file not shown.
23 changes: 23 additions & 0 deletions changehc/tests/test_load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@
"input_denom_file": "test_data/20200601_Counts_Products_Denom.dat.gz",
"input_covid_file": "test_data/20200601_Counts_Products_Covid.dat.gz",
"input_flu_file": "test_data/20200601_Counts_Products_Covid.dat.gz",
"input_denom_inpatient_state_file": "test_data/20200601_Counts_Products_Denom_Inpatient_By_State.dat.gz",
"input_flu_inpatient_file": "test_data/20200601_Counts_Products_Flu_Inpatient.dat.gz",
"backfill_dir": "./backfill",
"drop_date": "2020-06-01"
}
}
COVID_FILEPATH = PARAMS["indicator"]["input_covid_file"]
FLU_FILEPATH = PARAMS["indicator"]["input_flu_file"]
FLU_INPATIENT_FILEPATH = PARAMS["indicator"]["input_flu_inpatient_file"]

DENOM_FILEPATH = PARAMS["indicator"]["input_denom_file"]
DENOM_INPATIENT_STATE_FILEPATH = PARAMS["indicator"]["input_denom_inpatient_state_file"]

DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"])
backfill_dir = PARAMS["indicator"]["backfill_dir"]

Expand All @@ -42,6 +48,8 @@ class TestLoadData:
True, backfill_merge_day)
flu_data = load_flu_data(DENOM_FILEPATH, FLU_FILEPATH, "fips",
backfill_dir, geo, weekday, "flu", True, backfill_merge_day)
flu_inpatient_data = load_flu_inpatient_data(DENOM_INPATIENT_STATE_FILEPATH,
FLU_INPATIENT_FILEPATH, DROP_DATE, "state_code")
gmpr = GeoMapper()

def test_base_unit(self):
Expand All @@ -60,6 +68,10 @@ def test_base_unit(self):
with pytest.raises(AssertionError):
load_flu_data(DENOM_FILEPATH, FLU_FILEPATH, "foo",
backfill_dir, geo, weekday, "covid", True, backfill_merge_day)

with pytest.raises(AssertionError):
load_flu_inpatient_data(DENOM_INPATIENT_STATE_FILEPATH, FLU_INPATIENT_FILEPATH,
DROP_DATE, "foo")

def test_denom_columns(self):
assert "fips" in self.denom_data.index.names
Expand Down Expand Up @@ -98,6 +110,17 @@ def test_flu_columns(self):
assert col in self.flu_data.columns
assert len(
set(self.flu_data.columns) - set(expected_flu_columns)) == 0

def test_flu_inpatient_columns(self):
assert "state_code" in self.flu_inpatient_data.index.names
assert "timestamp" in self.flu_inpatient_data.index.names

expected_flu_columns = ["num", "den"]
for col in expected_flu_columns:
assert col in self.flu_inpatient_data.columns
assert len(
set(self.flu_inpatient_data.columns) - set(expected_flu_columns)) == 0


def test_edge_values(self):
for data in [self.denom_data,
Expand Down
6 changes: 3 additions & 3 deletions changehc/tests/test_update_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def test_geo_reindex(self):
"fips": ['01001'] * 7 + ['04007'] * 6,
"den": [1000] * 7 + [2000] * 6,
"timestamp": [pd.Timestamp(f'03-{i}-2020') for i in range(1, 14)]})
data_frame = su_inst.geo_reindex(test_data)
data_frame = su_inst.geo_reindex(test_data, "fips")
assert data_frame.shape[0] == multiple*len(su_inst.fit_dates)
assert (data_frame.sum(numeric_only=True) == (4200,19000)).all()

Expand Down Expand Up @@ -120,7 +120,7 @@ def test_update_sensor(self):
"den": [30, 50, 50, 10, 1, 5, 5, 50, 50, 50, 0, 0, 0] * 2,
"timestamp": list(pd.date_range("20200301", "20200313")) * 2
}).set_index(["fips", "timestamp"])
su_inst.update_sensor(small_test_data, td.name)
su_inst.update_sensor(small_test_data, td.name, "fips")
for f in os.listdir(td.name):
outputs[f] = pd.read_csv(os.path.join(td.name, f))

Expand Down Expand Up @@ -157,7 +157,7 @@ def test_update_sensor_output_daterange(self):
"",
TEST_LOGGER
)
su_inst.update_sensor(small_test_data.copy(), td.name)
su_inst.update_sensor(small_test_data.copy(), td.name, "fips")
for f in os.listdir(td.name):
outputs[startdate][f] = pd.read_csv(os.path.join(td.name, f))
assert len(os.listdir(td.name)) == len(su_inst.sensor_dates),\
Expand Down