From b3f3278c1b9f911c6bed2154c76c22d47e80d4b5 Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Wed, 7 Aug 2024 13:24:34 -0500 Subject: [PATCH 1/5] initial sql table format and python stubs --- src/client/delphi_epidata.py | 9 ++ src/ddl/wastewater.sql | 173 +++++++++++++++++++++++++++++ src/server/endpoints/__init__.py | 2 + src/server/endpoints/wastewater.py | 104 +++++++++++++++++ 4 files changed, 288 insertions(+) create mode 100644 src/ddl/wastewater.sql create mode 100644 src/server/endpoints/wastewater.py diff --git a/src/client/delphi_epidata.py b/src/client/delphi_epidata.py index 998c85281..0ee0cc2ee 100644 --- a/src/client/delphi_epidata.py +++ b/src/client/delphi_epidata.py @@ -35,6 +35,7 @@ class EpidataBadRequestException(EpidataException): ISSUES_LAG_EXCLUSIVE = "`issues` and `lag` are mutually exclusive" LOCATIONS_EPIWEEKS_REQUIRED = "`locations` and `epiweeks` are both required" + # Because the API is stateless, the Epidata class only contains static methods class Epidata: """An interface to DELPHI's Epidata API.""" @@ -342,6 +343,14 @@ def twitter(auth, locations, dates=None, epiweeks=None): # Make the API call return Epidata._request("twitter", params) + # Fetch wastewater data + @staticmethod + def wastewater(locations, location_type, epiweeks, issues=None, lag=None): + locations + 3 + location_type + sites + return 3 + # Fetch Wikipedia access data @staticmethod def wiki(articles, dates=None, epiweeks=None, hours=None, language="en"): diff --git a/src/ddl/wastewater.sql b/src/ddl/wastewater.sql new file mode 100644 index 000000000..5d4f3e975 --- /dev/null +++ b/src/ddl/wastewater.sql @@ -0,0 +1,173 @@ +USE epidata; + +CREATE TABLE agg_geo_dim ( + `geo_key_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + `geo_type` VARCHAR(12) NOT NULL, + `geo_value` VARCHAR(12) NOT NULL, + + UNIQUE INDEX `agg_geo_dim_index` (`geo_type`, `geo_value`) +) ENGINE=InnoDB; + +CREATE TABLE sample_site_dim ( + `site_key_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + `plant_id` INT(10) UNSIGNED NOT NULL, + `sample_loc_specify` VARCHAR(12), + + UNIQUE INDEX `sample_site_dim_index` (`plant_id`, `sample_loc_specify`) +) ENGINE=InnoDB; + +CREATE TABLE plant_dim ( + `plant_id` INT(10) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + `wwtp_jurisdiction` VARCHAR(12) UNSIGNED NOT NULL, -- may only need CHAR(2), it's a state id + `wwtp_id` INT(10) UNSIGNED NOT NULL + + UNIQUE INDEX `plant_index` (`wwtp_jurisdiction`, `wwtp_id`) +) ENGINE=InnoDB; + +CREATE TABLE site_county_cross ( + `site_county_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + `site_key_id` BIGINT(20) UNSIGNED NOT NULL, + `county_fips_id` CHAR(5) UNSIGNED NOT NULL + + UNIQUE INDEX `sit_county_index` (`site_key_id`, `county_fips_id`) +) ENGINE=InnoDB; + + +CREATE TABLE signal_dim ( + `signal_key_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + `source` VARCHAR(32) NOT NULL, + `signal` VARCHAR(64) NOT NULL, + `pathogen` VARCHAR(64) NOT NULL, + `provider` VARCHAR(64), -- null for potential future use + `normalization` VARCHAR(64), -- null for potential future use + + UNIQUE INDEX `signal_dim_index` (`source`, `signal`, `pathogen`, `provider`, `normalization`) +) ENGINE=InnoDB; + + +CREATE TABLE wastewater_granular_full ( + `wastewater_id` BIGINT(20) UNSIGNED NOT NULL PRIMARY KEY, + `signal_key_id` BIGINT(20) UNSIGNED NOT NULL, + `site_key_id` BIGINT(20) UNSIGNED NOT NULL, + `version` INT(11) NOT NULL, + `time_type` VARCHAR(12) NOT NULL, + `reference_date` INT(11) NOT NULL, + `value` DOUBLE, + `lag` INT(11) NOT NULL, + `value_updated_timestamp` INT(11) NOT NULL, + `computation_as_of_dt` DATETIME(0), -- TODO: for future use ; also "as_of" is problematic and should be renamed + `missing_value` INT(1) DEFAULT '0', + + UNIQUE INDEX `value_key_tig` (`signal_key_id`, `time_type`, `reference_date`, `version`, `site_key_id`), + UNIQUE INDEX `value_key_tgi` (`signal_key_id`, `time_type`, `reference_date`, `site_key_id`, `version`), + UNIQUE INDEX `value_key_itg` (`signal_key_id`, `version`, `time_type`, `reference_date`, `site_key_id`), + UNIQUE INDEX `value_key_igt` (`signal_key_id`, `version`, `site_key_id`, `time_type`, `reference_date`), + UNIQUE INDEX `value_key_git` (`signal_key_id`, `site_key_id`, `version`, `time_type`, `reference_date`), + UNIQUE INDEX `value_key_gti` (`signal_key_id`, `site_key_id`, `time_type`, `reference_date`, `version`) +) ENGINE=InnoDB; + +CREATE TABLE wastewater_granular_latest ( + PRIMARY KEY (`wastewater_id`), + UNIQUE INDEX `value_key_tg` (`signal_key_id`, `time_type`, `reference_date`, `site_key_id`), + UNIQUE INDEX `value_key_gt` (`signal_key_id`, `site_key_id`, `time_type`, `reference_date`) +) ENGINE=InnoDB +SELECT * FROM wastewater_granular_full; + + +-- NOTE: In production or any non-testing system that should maintain consistency, +-- **DO NOT** 'TRUNCATE' this table. +-- Doing so will function as a DROP/CREATE and reset the AUTO_INCREMENT counter for the `wastewater_id` field. +-- This field is used to populate the non-AUTO_INCREMENT fields of the same name in `wastewater_granular_latest` and `wastewater_granular_full`, +-- and resetting it will ultimately cause PK collisions. +-- To restore the counter, a row must be written with a `wastewater_id` value greater than the maximum +-- of its values in the other tables. +CREATE TABLE wastewater_granular_load ( + `wastewater_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + `signal_key_id` BIGINT(20) UNSIGNED, + `site_key_id` BIGINT(20) UNSIGNED, + `version` INT(11) NOT NULL, + `source` VARCHAR(32) NOT NULL, + `signal` VARCHAR(64) NOT NULL, + `site_key_id` BIGINT(20) NOT NULL, + `time_type` VARCHAR(12) NOT NULL, + `reference_date` INT(11) NOT NULL, + `reference_dt` DATETIME(0), -- TODO: for future use + `value` DOUBLE, + `lag` INT(11) NOT NULL, + `value_updated_timestamp` INT(11) NOT NULL, + `computation_as_of_dt` DATETIME(0), -- TODO: for future use ; also "as_of" is problematic and should be renamed + `is_latest_version` BINARY(1) NOT NULL DEFAULT '0', + `missing_value` INT(1) DEFAULT '0', + + UNIQUE INDEX (`source`, `signal`, `time_type`, `reference_date`, `site_key_id`, `version`) +) ENGINE=InnoDB; + + +CREATE OR REPLACE VIEW wastewater_granular_full_v AS + SELECT + 0 AS `is_latest_version`, -- provides column-compatibility to match `wastewater_granular` table + -- ^ this value is essentially undefined in this view, the notion of a 'latest' version is not encoded here and must be drawn from the 'latest' table or view or otherwise computed... + NULL AS `direction`, -- provides column-compatibility to match `covidcast` table TODO: what is this?? + `t2`.`source` AS `source`, + `t2`.`signal` AS `signal`, + `t2`.`pathogen` AS `pathogen`, + `t2`.`provider` AS `provider`, + `t2`.`normalization` AS `normalization`, + `t4`.`wwtp_id` AS `wwtp_id`, + `t4`.`wwtp_jurisdiction` AS `wwtp_jurisdiction`, + `t3`.`sample_loc_specify` AS `sample_loc_specify`, + `t1`.`wastewater_id` AS `wastewater_id`, + `t1`.`version` AS `version`, + `t1`.`time_type` AS `time_type`, + `t1`.`reference_date` AS `reference_date`, + `t1`.`reference_dt` AS `reference_dt`, -- TODO: for future use + `t1`.`value` AS `value`, + `t1`.`lag` AS `lag`, + `t1`.`value_updated_timestamp` AS `value_updated_timestamp`, + `t1`.`computation_as_of_dt` AS `computation_as_of_dt`, -- TODO: for future use ; also "as_of" is problematic and should be renamed + `t1`.`missing_value` AS `missing_value`, + `t1`.`signal_key_id` AS `signal_key_id`, + `t1`.`site_key_id` AS `site_key_id`, + `t3`.`plant_id` AS `plant_id` + FROM `wastewater_granular_full` `t1` + JOIN `signal_dim` `t2` USING (`signal_key_id`) + JOIN `sample_site_dim` `t3` USING (`site_key_id`) + JOIN `plant_dim` `t4` USING (`plant_id`); -- TODO not sure if this is the right way to do a join of a join + + --JOIN `site_county_cross` `t5` USING (`site_key_id`); +CREATE OR REPLACE VIEW wastewater_granular_latest_v AS + SELECT + 1 AS `is_latest_version`, -- provides column-compatibility to match `covidcast` table + NULL AS `direction`, -- provides column-compatibility to match `covidcast` table + `t2`.`source` AS `source`, + `t2`.`signal` AS `signal`, + `t3`.`geo_type` AS `geo_type`, + `t3`.`geo_value` AS `geo_value`, + `t1`.`wastewater_id` AS `wastewater_id`, + `t1`.`version` AS `version`, + `t1`.`data_as_of_dt` AS `data_as_of_dt`, -- TODO: for future use ; also "as_of" is problematic and should be renamed + `t1`.`time_type` AS `time_type`, + `t1`.`reference_date` AS `reference_date`, + `t1`.`reference_dt` AS `reference_dt`, -- TODO: for future use + `t1`.`value` AS `value`, + `t1`.`stderr` AS `stderr`, + `t1`.`sample_size` AS `sample_size`, + `t1`.`lag` AS `lag`, + `t1`.`value_updated_timestamp` AS `value_updated_timestamp`, + `t1`.`computation_as_of_dt` AS `computation_as_of_dt`, -- TODO: for future use ; also "as_of" is problematic and should be renamed + `t1`.`missing_value` AS `missing_value`, + `t1`.`missing_stderr` AS `missing_stderr`, + `t1`.`missing_sample_size` AS `missing_sample_size`, + `t1`.`signal_key_id` AS `signal_key_id`, + `t1`.`site_key_id` AS `site_key_id` + FROM `wastewater_granular_latest` `t1` + JOIN `signal_dim` `t2` USING (`signal_key_id`) + JOIN `agg_geo_dim` `t3` USING (`site_key_id`); + +CREATE TABLE `wastewater_meta_cache` ( + `timestamp` int(11) NOT NULL, + `epidata` LONGTEXT NOT NULL, + + PRIMARY KEY (`timestamp`) +) ENGINE=InnoDB; +INSERT INTO wastewater_meta_cache VALUES (0, '[]'); diff --git a/src/server/endpoints/__init__.py b/src/server/endpoints/__init__.py index 6dad05bf3..91e5a2060 100644 --- a/src/server/endpoints/__init__.py +++ b/src/server/endpoints/__init__.py @@ -28,6 +28,7 @@ sensors, twitter, wiki, + wastewater, signal_dashboard_status, signal_dashboard_coverage, ) @@ -62,6 +63,7 @@ sensors, twitter, wiki, + wastewater, signal_dashboard_status, signal_dashboard_coverage, ] diff --git a/src/server/endpoints/wastewater.py b/src/server/endpoints/wastewater.py new file mode 100644 index 000000000..130568e57 --- /dev/null +++ b/src/server/endpoints/wastewater.py @@ -0,0 +1,104 @@ +from typing import Any, Dict, List, Tuple + +from flask import Blueprint, request + +from .._params import extract_integer, extract_integers, extract_strings +from .._query import execute_queries, filter_integers, filter_strings +from .._security import current_user +from .._validate import require_all + +# first argument is the endpoint name +bp = Blueprint("wastewater", __name__) +alias = None + + +@bp.route("/", methods=("GET", "POST")) +def handle(): + authorized = False if not current_user else current_user.has_role("wastewater") + + require_all(request, "epiweeks", "regions") + + epiweeks = extract_integers("epiweeks") + regions = extract_strings("regions") + issues = extract_integers("issues") + lag = extract_integer("lag") + + # basic query info + table = "`wastewater` ww" + fields = "ww.`release_date`, ww.`issue`, ww.`time_type`, ww.`time_value`, ww.`region`, ww.`lag`, ww.`num_ili`, ww.`num_patients`, ww.`num_providers`, ww.`wili`, ww.`ili`, ww.`num_age_0`, ww.`num_age_1`, ww.`num_age_2`, ww.`num_age_3`, ww.`num_age_4`, ww.`num_age_5`" + + queries: List[Tuple[str, Dict[str, Any]]] = [] + + def get_wastewater_by_table(table: str, fields: str, regions: List[str]): + # a helper function to query `wastewater` and `wastewater_imputed` individually + # parameters + # basic query info + order = "ww.`epiweek` ASC, ww.`region` ASC, ww.`issue` ASC" + params: Dict[str, Any] = dict() + # build the epiweek filter + condition_epiweek = filter_integers("ww.`epiweek`", epiweeks, "epiweek", params) + # build the region filter + condition_region = filter_strings("ww.`region`", regions, "region", params) + if issues: + # build the issue filter + condition_issue = filter_integers("ww.`issue`", issues, "issues", params) + # final query using specific issues + query = f"SELECT {fields} FROM {table} WHERE ({condition_epiweek}) AND ({condition_region}) AND ({condition_issue}) ORDER BY {order}" + elif lag is not None: + # build the lag filter + condition_lag = "(ww.`lag` = :lag)" + params["lag"] = lag + # final query using lagged issues + query = f"SELECT {fields} FROM {table} WHERE ({condition_epiweek}) AND ({condition_region}) AND ({condition_lag}) ORDER BY {order}" + else: + # final query using most recent issues + subquery = f"(SELECT max(`issue`) `max_issue`, `epiweek`, `region` FROM {table} WHERE ({condition_epiweek}) AND ({condition_region}) GROUP BY `epiweek`, `region`) x" + condition = "x.`max_issue` = ww.`issue` AND x.`epiweek` = ww.`epiweek` AND x.`region` = ww.`region`" + query = f"SELECT {fields} FROM {table} JOIN {subquery} ON {condition} ORDER BY {order}" + + return query, params + + queries.append(get_wastewater_by_table(table, fields, regions)) + if not authorized: + # Make a special exception for New York. It is a (weighted) sum of two + # constituent locations -- "ny_minus_jfk" and "jfk" -- both of which are + # publicly available. + if "ny" in {r.lower() for r in regions}: + regions = ["ny"] + authorized = True + + if authorized: + # private data (no release date, no age groups, and wili is equal to ili) + table = "`wastewater_imputed` ww" + fields = "NULL `release_date`, ww.`issue`, ww.`epiweek`, ww.`region`, ww.`lag`, ww.`num_ili`, ww.`num_patients`, ww.`num_providers`, ww.`ili` `wili`, ww.`ili`, NULL `num_age_0`, NULL `num_age_1`, NULL `num_age_2`, NULL `num_age_3`, NULL `num_age_4`, NULL `num_age_5`" + queries.append(get_wastewater_by_table(table, fields, regions)) + + fields_string = [ + "geo_type", + "geo_value", + "source", + "signal", + "normalization", + "provider", + "release_date", + "region", + ] + fields_int = [ + "time_value", + "population", + "issue", + "lag", + "num_ili", + "num_patients", + "num_providers", + "num_age_0", + "num_age_1", + "num_age_2", + "num_age_3", + "num_age_4", + "num_age_5", + ] + fields_float = ["wili", "ili"] + + # send query + return execute_queries(queries, fields_string, fields_int, fields_float) From ed31f8bc33691ac3c65a6edc3ba1111b9e441c20 Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Thu, 15 Aug 2024 10:38:00 -0500 Subject: [PATCH 2/5] core ingestion code --- docs/new_endpoint_tutorial.md | 16 + src/acquisition/wastewater/nwss_csv.py | 329 ++++++++++++++++++ src/acquisition/wastewater/wastewater_main.py | 186 ++++++++++ .../wastewater/wastewater_utils.py | 37 ++ src/ddl/wastewater.sql | 5 +- 5 files changed, 571 insertions(+), 2 deletions(-) create mode 100644 src/acquisition/wastewater/nwss_csv.py create mode 100644 src/acquisition/wastewater/wastewater_main.py create mode 100644 src/acquisition/wastewater/wastewater_utils.py diff --git a/docs/new_endpoint_tutorial.md b/docs/new_endpoint_tutorial.md index abf8a5fa1..953e7e0d5 100644 --- a/docs/new_endpoint_tutorial.md +++ b/docs/new_endpoint_tutorial.md @@ -62,6 +62,22 @@ the following: - latest "issue", which is the publication epiweek - total size of the table +## Acquire data +If, unlike `fluview` you need to acquire add new data in addition to a new endpoint, you will need to add an appropriate data ingestion method. +These live in src/acquisition/, and needs one file with a `main` function, typically with a name `_update.py` or `_to_database.py`. +It is recommended to partition the functions based on use, for example +1. a file to download and format the data +2. a file to save backups +3. a file to update the database (this is typically including main) + + +Since we're using the `fluview` table, we're piggybacking off of [src/acquisition/fluview](https://github.com/cmu-delphi/delphi-epidata/tree/dev/src/acquisition/fluview). +To run ingestion, cronicle runs [fluview_update.py](https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/acquisition/fluview/fluview_update.py), while the other scripts provide methods for that. + +### Secrets +If you are pulling from an API or other source which needs authentication, you will need to add your secret into the backend. How to go about this for new endpoints is TODO. +## Adding new packages +If for whatever reason you need to add a new dependency TODO # update the server 1. create a new file in `/src/server/endpoints/` e.g., `fluview_meta.py`, or copy an existing one. diff --git a/src/acquisition/wastewater/nwss_csv.py b/src/acquisition/wastewater/nwss_csv.py new file mode 100644 index 000000000..747fc2321 --- /dev/null +++ b/src/acquisition/wastewater/nwss_csv.py @@ -0,0 +1,329 @@ +"""Functions to download the data from NWSS and save into a csv with a shared format.""" + +import numpy as np +import pandas as pd +from sodapy import Socrata +from logging import Logger +from delphi_utils import get_structured_logger + +# first party tools +from .wastewater_utils import sig_digit_round, convert_df_type +from delphi_utils import GeoMapper +import delphi_utils + +# Sample site names +from .nwss_constants import ( + AUX_INFO_COLUMNS, + COUNTY_INFO, + KEY_PLOT_NAMES, + KEY_PLOT_TYPES, + METRIC_SIGNALS, + SIG_DIGITS, + CONCENTRATION_SIGNALS, + TIME_SPANNED, + TYPE_DICT_CONC, + TYPE_DICT_METRIC, +) + + +def key_plot_id_parse(key_plot_ids: pd.Series) -> pd.DataFrame: + """Given a list of key plot id's, extract the various sub-values. + + The Dataframe returned has the following columns + Provider + wwtp_jurisdiction + wwtp_id + sample_location + sample_location_specify + sample_method + """ + # To understand the regex below, I suggest putting it into + # https://regex101.com/ to parse how it's getting the names in detail. + # But here are 2 representative example keys + # NWSS_mi_1040_Before treatment plant_147_raw wastewater + # CDC_VERILY_ak_1491_Treatment plant_raw wastewater + # every variable is separated by `_`, though some may also contain them + processed_names = key_plot_ids.str.extract( + ( + r"(?P.+)_" # provider, anything before the next one "NWSS" + r"(?P[a-z]{2})_" # state, exactly two letters "mi" + r"(?P[0-9]+)_" # an id, at least one digits "1040" + r"(?P[^_]+)_" # either "Before treatment plant" or "Treatment plant" + r"(?P[0-9]*)_*" # a number, may be missing (thus _*) + r"(?P.*)" # one of 4 values: + # 'post grit removal', 'primary effluent', 'primary sludge', 'raw wastewater' + ) + ) + processed_names.sample_location_specify = pd.to_numeric( + processed_names.sample_location_specify, errors="coerce" + ).astype("Int32") + processed_names["key_plot_id"] = key_plot_ids + processed_names = processed_names.set_index("key_plot_id") + processed_names = processed_names.astype(KEY_PLOT_TYPES) + # TODO warnings/errors when things don't parse + return processed_names + + +def validate_metric_key_plot_ids( + df_metric: pd.DataFrame, metric_keys: pd.DataFrame, logger: Logger +) -> None: + """Check that the metric key_plot_ids match the corresponding values in the table. + + One of the weirder edge cases is the `wwtp_jurisdiction`. The `key_plot_id` treats this as state only, whereas the actual `wwtp_jurisdiction` appears to be state + New York City. It is currently setup to warn if another location appears in `wwtp_jurisdiction`, but not error. + """ + geomapper = GeoMapper() + df_metric = geomapper.add_geocode( + df_metric, + from_code="state_name", + new_code="state_id", + from_col="wwtp_jurisdiction", + dropna=False, + ) + # NYC is the weird extra jurisdiction that shows up in wwtp_jur but not the actual key_plot_id + is_not_nyc = ( + df_metric.wwtp_jurisdiction[df_metric.state_id.isna()] != "New York City" + ) + if any(is_not_nyc): + logger.warn( + "There is a wwtp_jurisdiction that is not a state or nyc.", + not_nyc=df_metric.wwtp_jurisdiction[df_metric.state_id.isna()][is_not_nyc], + ) + df_metric.loc[df_metric.state_id.isna(), "state_id"] = "ny" + df_metric["wwtp_jurisdiction"] = df_metric["state_id"] + df_metric = df_metric.drop(columns="state_id") + + checking_metric_key_plot_ids = metric_keys.reset_index().merge( + df_metric.drop(columns=["date", *METRIC_SIGNALS]).drop_duplicates(), + how="outer", + indicator=True, + ) + missing_keys = checking_metric_key_plot_ids[ + checking_metric_key_plot_ids["_merge"] != "both" + ] + if missing_keys.size > 0: + logger.error( + "There are some keys which don't match their values in key_plot_id", + missing_keys=missing_keys, + ) + + +def deduplicate_aux(auxiliary_info: pd.DataFrame, logger: Logger) -> pd.DataFrame: + """Check that first_sample_date, sampling_prior, and counties have unique values across time and space.""" + should_be_site_unique = auxiliary_info.groupby("key_plot_id").agg( + { + "county_names": pd.Series.nunique, + "county_fips": pd.Series.nunique, + "first_sample_date": pd.Series.nunique, + "sampling_prior": pd.Series.nunique, + } + ) + is_site_unique = should_be_site_unique.max() == 1 + if not all(is_site_unique): + logger.error( + f"{is_site_unique.index[~is_site_unique]} are not unique! This means that the sample_site metadata is time dependent." + ) + auxiliary_info = auxiliary_info.drop_duplicates([*AUX_INFO_COLUMNS, *COUNTY_INFO]) + return auxiliary_info.drop(columns="date") + + +def download_raw_data(token: str, logger: Logger) -> tuple[pd.DataFrame, pd.DataFrame]: + """Bare bones download of data from the socrata API.""" + client = Socrata("data.cdc.gov", token) + results_concentration = client.get("g653-rqe2", limit=10**10) + results_metric = client.get("2ew6-ywp6", limit=10**10) + df_metric = pd.DataFrame.from_records(results_metric) + df_concentration = pd.DataFrame.from_records(results_concentration) + # Schema checks/conversions. + df_concentration = convert_df_type(df_concentration, TYPE_DICT_CONC, logger) + df_metric = convert_df_type(df_metric, TYPE_DICT_METRIC, logger) + df_metric = df_metric.rename(columns={"date_end": "date"}) + return df_concentration, df_metric + + +def format_nwss_data( + df_concentration: pd.DataFrame, df_metric: pd.DataFrame +) -> tuple[pd.DataFrame, pd.DataFrame]: + """Helper to pull_nwss_data, mainly separated to ease debugging without pulling.""" + # pull out the auxiliary_info first so we can drop it + # this means county, first sample date and sampling_prior, along with the keys to specify those + auxiliary_info = df_metric.loc[ + :, + ["date", *AUX_INFO_COLUMNS, *COUNTY_INFO], + ] + + # make sure there's not duplicates in time and space + auxiliary_info = deduplicate_aux(auxiliary_info, logger) + # TODO verify the county names match the codes + has_right_interval = df_metric.date - df_metric.date_start == TIME_SPANNED + # making sure that our assumption that date_start is redundant is correct + # default interval is 14 days as of the time of writing, and is also included in the column names + if not all(has_right_interval): + logger.error( + f"The time difference isn't strictly {TIME_SPANNED}. This means dropping `date_start` loses information now", + different_examples=df_metric[~has_right_interval], + ) + # This has been shuttled off to auxiliary_info, so we drop it for efficiency of pivoting + df_metric = df_metric.drop( + columns=[ + "county_names", + "county_fips", + "sampling_prior", + "first_sample_date", + "reporting_jurisdiction", + "date_start", + ] + ) + + # Warn if the normalization scheme is ever seen to be NA + na_columns = df_concentration[df_concentration["normalization"].isna()] + if na_columns.size != 0: + logger.info("There are columns without normalization.", na_columns=na_columns) + conc_keys = key_plot_id_parse(pd.Series(df_concentration.key_plot_id.unique())) + # get the keys+normalizations where there's an unambiguous choice of normalization + key_plot_norms = df_concentration.loc[ + :, ["key_plot_id", "normalization"] + ].drop_duplicates() + + # count the # of normalizations for a given key_plot_id + key_plot_norms["n_norms"] = key_plot_norms.groupby( + "key_plot_id" + ).normalization.transform("count") + key_plot_norms = ( + key_plot_norms[key_plot_norms.n_norms == 1] + .drop(columns="n_norms") + .set_index("key_plot_id") + ) + # add the unambiguous normalizations, others get an NA + conc_keys = conc_keys.join(key_plot_norms, how="left") + metric_keys = key_plot_id_parse(pd.Series(df_metric.key_plot_id.unique())) + validate_metric_key_plot_ids(df_metric, metric_keys, logger) + # form the joint table of keys found in both, along with a column + # identifying which direction there is missingness + joint_keys = conc_keys.reset_index().merge( + metric_keys.reset_index(), indicator=True, how="outer" + ) + joint_keys = joint_keys.astype(KEY_PLOT_TYPES) + only_in_one = joint_keys[joint_keys._merge != "both"] + if only_in_one.size > 0: + only_in_one = only_in_one.replace( + {"left_only": "conc", "right_only": "metric"} + ).rename(columns={"_merge": "key_in"}) + logger.info( + "There are locations present in one key_plot_id", only_in_one=only_in_one + ) + # these can cause 2 problems: + # in metric but not conc means a lack of normalization. + # in conc but not metric is harder, as it lacks population, + # sampling_prior, first_sample_date, county_names, and county_fips. Those + # will all have to be NA + + # we're dropping _merge since this will already be conveyed by NA's after + # joining the datasets + joint_keys = joint_keys.set_index("key_plot_id").drop(columns="_merge") + # add the key info to axuiliary info, and make sure any locations in the + # metric are included, even if their values are NA + auxiliary_info = auxiliary_info.merge(joint_keys, on="key_plot_id", how="outer") + # adding the key breakdown to df_concentration + df_conc_long = pd.melt( + df_concentration, + id_vars=["date", "key_plot_id", "normalization"], + value_vars=CONCENTRATION_SIGNALS, + var_name="signal", + ) + df_conc_long = df_conc_long.set_index(["key_plot_id", "date"]) + # concentration needs to keep the normalization info b/c it is accurate to the date + df_conc_long = df_conc_long.join( + joint_keys.drop(columns=["normalization"]), how="left" + ) + # forming the long version of the metric data + # note that melting with the minimal set of columns is significantly faster, even if we're reappending them after during the join + df_metric = df_metric.drop( + columns=[ + "wwtp_jurisdiction", + "wwtp_id", + "sample_location", + "sample_location_specify", + ] + ) + + df_metric_long = pd.melt( + df_metric, + id_vars=["date", "key_plot_id"], + value_vars=METRIC_SIGNALS, + var_name="signal", + ) + df_metric_long = df_metric_long.set_index(["key_plot_id", "date"]) + df_metric_long = df_metric_long.join(joint_keys, how="left") + + # join the two datasets + df = pd.concat([df_conc_long, df_metric_long]) + df = df.astype({"normalization": "category", "signal": "category"}) + # sorting and making sure there aren't duplicates + df = ( + df.reset_index() + .set_index( + [ + "signal", + "provider", + "normalization", + "wwtp_jurisdiction", + "wwtp_id", + "sample_location", + "sample_location_specify", + "sample_method", + "date", + ] + ) + .sort_index() + ) + df.drop(columns="key_plot_id").sort_index() + df = df.reset_index() + deduped = df.drop_duplicates( + subset=[ + "signal", + "provider", + "normalization", + "wwtp_jurisdiction", + "sample_location", + "sample_location_specify", + "sample_method", + "wwtp_id", + "date", + ], + ) + if deduped.shape != df.shape: + logger.error( + "there were duplicate entries", + df_shape=df.shape, + dedupe_shape=deduped.shape, + ) + + return df, auxiliary_info + + +def pull_nwss_data(token: str, logger: Logger) -> tuple[pd.DataFrame, pd.DataFrame]: + """Pull the latest NWSS Wastewater data for both the concentration and metric datasets, and conform them into a dataset and a county mapping. + + There are two output datasets: + The primary data, which has the rows + - (source_name, signal_name, pathogen, provider, normalization), (wwtp_jurisdiction, wwtp_id, sample_loc_specify), reference_date, value + There's a second, seperate table that contains just the county-sample_site mapping, which consists of rows that are: + - date_changed (likely only the first day the sample site is present), wwtp_jurisdictoin, wwtp_id, sample_loc_specify + + Parameters + ---------- + socrata_token: str + My App Token for pulling the NWSS data (could be the same as the nchs data) + logger: the structured logger + + Returns + ------- + pd.DataFrame + Dataframe as described above. + """ + # TODO temp, remove + logger = get_structured_logger("csv_ingestion", filename="writingErrorLog") + # TODO reinstate this, similar for metric + # df_concentration = df_concentration.rename(columns={"date": "reference_date"}) + df_concentration, df_metric = download_raw_data(token, logger) + df, auxiliary_info = format_nwss_data(df_concentration, df_metric) diff --git a/src/acquisition/wastewater/wastewater_main.py b/src/acquisition/wastewater/wastewater_main.py new file mode 100644 index 000000000..c1277e4e7 --- /dev/null +++ b/src/acquisition/wastewater/wastewater_main.py @@ -0,0 +1,186 @@ +"""Creates and imports wastewater CSVs and stores them in the wastewater epidata database.""" + + +def get_argument_parser(): + """Define command line arguments.""" + + parser = argparse.ArgumentParser() + parser.add_argument("--data_dir", help="top-level directory where CSVs are stored") + parser.add_argument( + "--specific_issue_date", + action="store_true", + help="indicates argument is where issuedate-specific subdirectories can be found.", + ) + parser.add_argument( + "--log_file", help="filename for log output (defaults to stdout)" + ) + parser.add_argument( + "--source_name", + nargs="?", + default="*", + help="Name of one indicator directory to run acquisition on", + ) + return parser + + +def collect_files(data_dir: str, specific_issue_date: bool, indicator_name="*"): + """Fetch path and data profile details for each file to upload.""" + logger = get_structured_logger("collect_files") + if specific_issue_date: + results = list(CsvImporter.find_issue_specific_csv_files(data_dir)) + else: + results = list( + CsvImporter.find_csv_files( + os.path.join(data_dir, "receiving"), indicator_dir=indicator_name + ) + ) + logger.info(f"found {len(results)} files") + return results + + +def make_handlers(data_dir: str, specific_issue_date: bool): + if specific_issue_date: + # issue-specific uploads are always one-offs, so we can leave all + # files in place without worrying about cleaning up + def handle_failed(path_src, filename, source, logger): + logger.info(event="leaving failed file alone", dest=source, file=filename) + + def handle_successful(path_src, filename, source, logger): + logger.info(event="archiving as successful", file=filename) + FileArchiver.archive_inplace(path_src, filename) + else: + # normal automation runs require some shuffling to remove files + # from receiving and place them in the archive + archive_successful_dir = os.path.join(data_dir, "archive", "successful") + archive_failed_dir = os.path.join(data_dir, "archive", "failed") + + # helper to archive a failed file without compression + def handle_failed(path_src, filename, source, logger): + logger.info(event="archiving as failed - ", detail=source, file=filename) + path_dst = os.path.join(archive_failed_dir, source) + compress = False + FileArchiver.archive_file(path_src, path_dst, filename, compress) + + # helper to archive a successful file with compression + def handle_successful(path_src, filename, source, logger): + logger.info(event="archiving as successful", file=filename) + path_dst = os.path.join(archive_successful_dir, source) + compress = True + FileArchiver.archive_file(path_src, path_dst, filename, compress) + + return handle_successful, handle_failed + + +def upload_archive( + path_details: Iterable[Tuple[str, Optional[PathDetails]]], + database: Database, + handlers: Tuple[Callable], + logger: Logger, +): + """Upload CSVs to the database and archive them using the specified handlers. + + :path_details: output from CsvImporter.find*_csv_files + + :database: an open connection to the epidata database + + :handlers: functions for archiving (successful, failed) files + + :return: the number of modified rows + """ + archive_as_successful, archive_as_failed = handlers + total_modified_row_count = 0 + # iterate over each file + for path, details in path_details: + logger.info(event="handling", dest=path) + path_src, filename = os.path.split(path) + + # file path or name was invalid, source is unknown + if not details: + archive_as_failed(path_src, filename, "unknown", logger) + continue + + csv_rows = CsvImporter.load_csv(path, details) + rows_list = list(csv_rows) + all_rows_valid = rows_list and all(r is not None for r in rows_list) + if all_rows_valid: + try: + modified_row_count = database.insert_or_update_bulk(rows_list) + logger.info( + f"insert_or_update_bulk {filename} returned {modified_row_count}" + ) + logger.info( + "Inserted database rows", + row_count=modified_row_count, + source=details.source, + signal=details.signal, + geo_type=details.geo_type, + time_value=details.time_value, + issue=details.issue, + lag=details.lag, + ) + if ( + modified_row_count is None or modified_row_count + ): # else would indicate zero rows inserted + total_modified_row_count += ( + modified_row_count if modified_row_count else 0 + ) + database.commit() + except DBLoadStateException as e: + # if the db is in a state that is not fit for loading new data, + # then we should stop processing any more files + raise e + except Exception as e: + all_rows_valid = False + logger.exception("exception while inserting rows", exc_info=e) + database.rollback() + + # archive the current file based on validation results + if all_rows_valid: + archive_as_successful(path_src, filename, details.source, logger) + else: + archive_as_failed(path_src, filename, details.source, logger) + + return total_modified_row_count + + +def main(args): + """Find, parse, and upload covidcast signals.""" + + logger = get_structured_logger("csv_ingestion", filename=args.log_file) + start_time = time.time() + # shortcut escape without hitting db if nothing to do + path_details = collect_files( + args.data_dir, args.specific_issue_date, indicator_name=args.indicator_name + ) + if not path_details: + logger.info("nothing to do; exiting...") + return + + logger.info("Ingesting CSVs", csv_count=len(path_details)) + + database = Database() + database.connect() + + try: + modified_row_count = upload_archive( + path_details, + database, + make_handlers(args.data_dir, args.specific_issue_date), + logger, + ) + logger.info( + "Finished inserting/updating database rows", row_count=modified_row_count + ) + finally: + database.do_analyze() + # unconditionally commit database changes since CSVs have been archived + database.disconnect(True) + + logger.info( + "Ingested CSVs into database", + total_runtime_in_seconds=round(time.time() - start_time, 2), + ) + + +if __name__ == "__main__": + main(get_argument_parser().parse_args()) diff --git a/src/acquisition/wastewater/wastewater_utils.py b/src/acquisition/wastewater/wastewater_utils.py new file mode 100644 index 000000000..6fc8841e4 --- /dev/null +++ b/src/acquisition/wastewater/wastewater_utils.py @@ -0,0 +1,37 @@ +# general utilities (that should maybe migrate elsewhere) +def sig_digit_round(value, n_digits): + """Truncate value to only n_digits. + + Truncate precision of elements in `value` (a numpy array) to the specified number of + significant digits (9.87e5 would be 3 sig digits). + """ + in_value = value + value = np.asarray(value).copy() + zero_mask = (value == 0) | np.isinf(value) | np.isnan(value) + value[zero_mask] = 1.0 + sign_mask = value < 0 + value[sign_mask] *= -1 + exponent = np.ceil(np.log10(value)) + result = 10**exponent * np.round(value * 10 ** (-exponent), n_digits) + result[sign_mask] *= -1 + result[zero_mask] = in_value[zero_mask] + return result + + +def convert_df_type(df, type_dict, logger): + """Convert types and warn if there are unexpected columns.""" + try: + df = df.astype(type_dict) + except KeyError as exc: + raise KeyError( + f""" +Expected column(s) missed, The dataset schema may +have changed. Please investigate and amend the code. + +expected={''.join(sorted(type_dict.keys()))} +received={''.join(sorted(df.columns))} +""" + ) from exc + if new_columns := set(df.columns) - set(type_dict.keys()): + logger.info("New columns found in NWSS dataset.", new_columns=new_columns) + return df diff --git a/src/ddl/wastewater.sql b/src/ddl/wastewater.sql index 5d4f3e975..d686ec38c 100644 --- a/src/ddl/wastewater.sql +++ b/src/ddl/wastewater.sql @@ -11,14 +11,15 @@ CREATE TABLE agg_geo_dim ( CREATE TABLE sample_site_dim ( `site_key_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, `plant_id` INT(10) UNSIGNED NOT NULL, - `sample_loc_specify` VARCHAR(12), + `sample_loc_specify` INT(10) UNSIGNED, -- definitely can be null + `sampling_method` VARCHAR(20), UNIQUE INDEX `sample_site_dim_index` (`plant_id`, `sample_loc_specify`) ) ENGINE=InnoDB; CREATE TABLE plant_dim ( `plant_id` INT(10) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, - `wwtp_jurisdiction` VARCHAR(12) UNSIGNED NOT NULL, -- may only need CHAR(2), it's a state id + `wwtp_jurisdiction` CHAR(3) UNSIGNED NOT NULL, -- may only need CHAR(3), it's a state id + NYC `wwtp_id` INT(10) UNSIGNED NOT NULL UNIQUE INDEX `plant_index` (`wwtp_jurisdiction`, `wwtp_id`) From 4fa74b61b1ab08613d42851b738d42d27c5c174f Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Thu, 15 Aug 2024 14:54:27 -0500 Subject: [PATCH 3/5] sql schema fixes --- src/ddl/wastewater.sql | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/ddl/wastewater.sql b/src/ddl/wastewater.sql index d686ec38c..3cc49801e 100644 --- a/src/ddl/wastewater.sql +++ b/src/ddl/wastewater.sql @@ -12,15 +12,15 @@ CREATE TABLE sample_site_dim ( `site_key_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, `plant_id` INT(10) UNSIGNED NOT NULL, `sample_loc_specify` INT(10) UNSIGNED, -- definitely can be null - `sampling_method` VARCHAR(20), + `sample_method` VARCHAR(20), UNIQUE INDEX `sample_site_dim_index` (`plant_id`, `sample_loc_specify`) ) ENGINE=InnoDB; CREATE TABLE plant_dim ( `plant_id` INT(10) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, - `wwtp_jurisdiction` CHAR(3) UNSIGNED NOT NULL, -- may only need CHAR(3), it's a state id + NYC - `wwtp_id` INT(10) UNSIGNED NOT NULL + `wwtp_jurisdiction` CHAR(3) NOT NULL, -- may only need CHAR(2), it's a state id + `wwtp_id` INT(10) UNSIGNED NOT NULL, UNIQUE INDEX `plant_index` (`wwtp_jurisdiction`, `wwtp_id`) ) ENGINE=InnoDB; @@ -28,9 +28,9 @@ CREATE TABLE plant_dim ( CREATE TABLE site_county_cross ( `site_county_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, `site_key_id` BIGINT(20) UNSIGNED NOT NULL, - `county_fips_id` CHAR(5) UNSIGNED NOT NULL + `county_fips_id` CHAR(5) NOT NULL, - UNIQUE INDEX `sit_county_index` (`site_key_id`, `county_fips_id`) + UNIQUE INDEX `site_county_index` (`site_key_id`, `county_fips_id`) ) ENGINE=InnoDB; @@ -53,6 +53,7 @@ CREATE TABLE wastewater_granular_full ( `version` INT(11) NOT NULL, `time_type` VARCHAR(12) NOT NULL, `reference_date` INT(11) NOT NULL, + `reference_dt` DATETIME(0), -- TODO: for future use `value` DOUBLE, `lag` INT(11) NOT NULL, `value_updated_timestamp` INT(11) NOT NULL, @@ -89,7 +90,6 @@ CREATE TABLE wastewater_granular_load ( `version` INT(11) NOT NULL, `source` VARCHAR(32) NOT NULL, `signal` VARCHAR(64) NOT NULL, - `site_key_id` BIGINT(20) NOT NULL, `time_type` VARCHAR(12) NOT NULL, `reference_date` INT(11) NOT NULL, `reference_dt` DATETIME(0), -- TODO: for future use @@ -135,35 +135,35 @@ CREATE OR REPLACE VIEW wastewater_granular_full_v AS JOIN `sample_site_dim` `t3` USING (`site_key_id`) JOIN `plant_dim` `t4` USING (`plant_id`); -- TODO not sure if this is the right way to do a join of a join - --JOIN `site_county_cross` `t5` USING (`site_key_id`); CREATE OR REPLACE VIEW wastewater_granular_latest_v AS SELECT - 1 AS `is_latest_version`, -- provides column-compatibility to match `covidcast` table - NULL AS `direction`, -- provides column-compatibility to match `covidcast` table + 1 AS `is_latest_version`, -- provides column-compatibility to match `wastewater_granular` table + NULL AS `direction`, -- provides column-compatibility to match `wastewater_granular` table `t2`.`source` AS `source`, `t2`.`signal` AS `signal`, - `t3`.`geo_type` AS `geo_type`, - `t3`.`geo_value` AS `geo_value`, + `t2`.`pathogen` AS `pathogen`, + `t2`.`provider` AS `provider`, + `t2`.`normalization` AS `normalization`, + `t4`.`wwtp_id` AS `wwtp_id`, + `t4`.`wwtp_jurisdiction` AS `wwtp_jurisdiction`, + `t3`.`sample_loc_specify` AS `sample_loc_specify`, `t1`.`wastewater_id` AS `wastewater_id`, `t1`.`version` AS `version`, - `t1`.`data_as_of_dt` AS `data_as_of_dt`, -- TODO: for future use ; also "as_of" is problematic and should be renamed `t1`.`time_type` AS `time_type`, `t1`.`reference_date` AS `reference_date`, `t1`.`reference_dt` AS `reference_dt`, -- TODO: for future use `t1`.`value` AS `value`, - `t1`.`stderr` AS `stderr`, - `t1`.`sample_size` AS `sample_size`, `t1`.`lag` AS `lag`, `t1`.`value_updated_timestamp` AS `value_updated_timestamp`, `t1`.`computation_as_of_dt` AS `computation_as_of_dt`, -- TODO: for future use ; also "as_of" is problematic and should be renamed `t1`.`missing_value` AS `missing_value`, - `t1`.`missing_stderr` AS `missing_stderr`, - `t1`.`missing_sample_size` AS `missing_sample_size`, `t1`.`signal_key_id` AS `signal_key_id`, - `t1`.`site_key_id` AS `site_key_id` + `t1`.`site_key_id` AS `site_key_id`, + `t3`.`plant_id` AS `plant_id` FROM `wastewater_granular_latest` `t1` JOIN `signal_dim` `t2` USING (`signal_key_id`) - JOIN `agg_geo_dim` `t3` USING (`site_key_id`); + JOIN `sample_site_dim` `t3` USING (`site_key_id`) + JOIN `plant_dim` `t4` USING (`plant_id`); -- TODO not sure if this is the right way to do a join of a join CREATE TABLE `wastewater_meta_cache` ( `timestamp` int(11) NOT NULL, From 511607f1560af0a578ce9fc8b580f4a18ef48916 Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Mon, 26 Aug 2024 18:01:11 -0500 Subject: [PATCH 4/5] Functional database insertion, needs tests --- docs/epidata_development.md | 23 +- docs/new_endpoint_tutorial.md | 118 +++- src/acquisition/wastewater/database.py | 563 ++++++++++++++++++ src/acquisition/wastewater/nwss_constants.py | 62 ++ src/acquisition/wastewater/nwss_csv.py | 146 ++++- src/acquisition/wastewater/nwss_database.py | 317 ++++++++++ .../wastewater/wastewater_utils.py | 3 + src/ddl/wastewater.sql | 107 ++-- tests/acquisition/wastewater/database.py | 290 +++++++++ 9 files changed, 1563 insertions(+), 66 deletions(-) create mode 100644 src/acquisition/wastewater/database.py create mode 100644 src/acquisition/wastewater/nwss_constants.py create mode 100644 src/acquisition/wastewater/nwss_database.py create mode 100644 tests/acquisition/wastewater/database.py diff --git a/docs/epidata_development.md b/docs/epidata_development.md index eb38e6ae9..db669b210 100644 --- a/docs/epidata_development.md +++ b/docs/epidata_development.md @@ -87,11 +87,14 @@ You can test your changes manually by: What follows is a worked demonstration based on the `fluview` endpoint. Before starting, make sure that you have the `delphi_database_epidata`, -`delphi_web_epidata`, and `delphi_redis` containers running (with `docker ps`); -if you don't, see the Makefile instructions above. - +`delphi_web_epidata`, and `delphi_redis` containers running: to start them, from the `driver` directory, run +1. `make db` for `delphi_database_epidata` +2. `make web` for `dephi_web_epidata` +3. `make redis` to run `delphi_redis`. +You can check that they are running via `docker ps`. First, let's insert some fake data into the `fluview` table: + ```bash # If you have the mysql client installed locally: echo 'insert into fluview values \ @@ -118,6 +121,20 @@ For the inserts above, absence of command-line output is a sign of success. On the other hand, output after the insertion likely indicates failure (like, for example, attempting to insert a duplicate unique key). +Or if you would prefer to interact with the server via python (for example to +prototype functions and database commands) + +```python +from sqlalchemy import create_engine + +engine = create_engine( + "mysql+pymysql://user:pass@127.0.0.1:13306/epidata", + echo=True, +) +engine.connect() +``` + + Next, you can query the API directly (and parse with Python's JSON tool): ```bash diff --git a/docs/new_endpoint_tutorial.md b/docs/new_endpoint_tutorial.md index 953e7e0d5..551bfab97 100644 --- a/docs/new_endpoint_tutorial.md +++ b/docs/new_endpoint_tutorial.md @@ -11,12 +11,15 @@ nav_order: 5 In this tutorial we'll create a brand new endpoint for the Epidata API: `fluview_meta`. At a high level, we'll do the following steps: -1. understand the data that we want to surface -2. add the new endpoint to the API server -3. add the new endpoint to the various client libraries -4. write an integration test for the new endpoint -5. update API documentation for the new endpoint -6. run all unit and integration tests +0. understand the data that we want to surface +1. add the new endpoint to the API server + - get (transformed) data into a database + - add internal server maintenance code + - add frontend code +2. write an integration test for the new endpoint +3. update API documentation for the new endpoint +4. run all unit and integration tests +5. add the new endpoint to the various client libraries # setup @@ -44,7 +47,93 @@ tree -L 3 .    ├── py3tester    └── undef-analysis ``` - +# Adding data to a database +Before we could possibly serve data in an API, we need to retrieve it, clean it, +and store it locally. This is known as +[ETL](https://en.wikipedia.org/wiki/Extract,_transform,_load), and for any of +the endpoints the code to do this lives in the [acquisition +folder](https://github.com/cmu-delphi/delphi-epidata/tree/dev/src/acquisition). +Retrieving is the least structured of these and depends heavily on the source of +the data. Transforming can be anything from simply cleaning to fit the format of +our database/api, aggregating to higher geographic or temporal levels, to +correcting for knowable anomalies in the data. +## SQL table design +The first step is determining the format of the tables, which is written in a +[ddl](https://stackoverflow.com/questions/2578194/what-are-ddl-and-dml) and +stored [here](https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/ddl/), +for example +[epimetrics](https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/ddl/v4_schema.sql)). +Consider prototyping with something like +[dbdiagram](https://dbdiagram.io/d/wastewater_db-6691809a9939893daecc5d57). +Ideally, any redundant values or metadata should be stored in separate related +tables so as to [normalize the +tables](https://en.wikipedia.org/wiki/Database_normalization) and improve +performance. A rule of thumb for these is if you can think of it as a +categorical, it should have a table to record the possible categories. + +In addition to the primary table and it's relational tables, it can be useful to +include a loading table to ease addition of new data to the database, a latest +table to speed up access for getting only the latest data, and several views for +TODO reasons. + +Another design consideration is the addition of indexes based on likely queries. +### Data format +In many endpoints, dates are represented using integers as `yyyymmdd` for actual +dates and `yyyyww` for epiweeks. +### Versioning +If there's a possibility you are inserting versions older than latest, it is best practice to include a boolean column in the load table indicating. This column will also be useful for generating a view of the full table +## ETL +After you know the target format, you should start writing methods to perform +each step of the ETL process. Eventually, they should be called within a +`__main__` function in src/acquisition/ (ideally +\_main.py). You should partition your code into separate files +for each step in ETL, especially if the transform steps are more than +simply data cleaning. + +## Extract +There is not terribly much to be said for extraction; depending on how you get +your data see (TODO list of endpoints based on how they extract data) for an +example, but there is no guarantee that these actually have achieved the optimal +method for that particular method of acquiring data. + +One less obvious aspect of the extraction step is validation. Make sure to add +validation checks to the extraction module, with any violations getting recorded +to a logger object. + +Another less obvious extraction step is to make sure to record approximately raw +data, stored in a compressed format. This makes recovery from validation or +other errors much easier. +## Transform +If you will be doing significant transformations, consider writing an external +package for that. + +One of the more common transformation steps is the removal of redundant +versions. Epimetrics handles this by first exporting the transformed data to +CSVs (for every source, as handled in covidcast-indicators), then comparing with +the previously saved copy of the CSV for differences and only keeping the newer +values. Wastewater handles this entirely within sql by comparing the latest +table with the load table. + +## Load +In general, we use [Core +sqlalchemy](https://docs.sqlalchemy.org/en/20/tutorial/index.html) to manage +database connections. The process for loading should roughly be +### Move the data into the load table +The current +recommendation is to use [pandas' +`to_sql`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html) +with the method set to `multi` for the initial insertion into the `load` table as an initial method, for ease of writing. +If this proves too slow, [see +epimetrics](https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/acquisition/covidcast/database.py) +for an alternative using approximately raw sql, or write a [custom insert method](https://pandas.pydata.org/docs/user_guide/io.html#io-sql-method) that e.g. uses temporary csv's. +### Move categorical data +After inserting into the load table, any new values for the related tables, such as signal or geo\_type, need to be included. +### Insert load data into the full and latest tables +Fairly straightforward. Note that the keys for the related tables need to be added either during or before inserting into either table. + +Note that wastewater removes duplicated values with different versions just after adding the key values from the related tables. +### Remove the inserted data from the load table +Since the id of the load table is used to set the id of the full and latest tables, it is important not to drop or truncate when deleting these rows, since this would reset the index. # the data Here's the requirement: we need to quickly surface the most recent "issue" @@ -64,29 +153,24 @@ the following: ## Acquire data If, unlike `fluview` you need to acquire add new data in addition to a new endpoint, you will need to add an appropriate data ingestion method. -These live in src/acquisition/, and needs one file with a `main` function, typically with a name `_update.py` or `_to_database.py`. -It is recommended to partition the functions based on use, for example -1. a file to download and format the data -2. a file to save backups -3. a file to update the database (this is typically including main) - Since we're using the `fluview` table, we're piggybacking off of [src/acquisition/fluview](https://github.com/cmu-delphi/delphi-epidata/tree/dev/src/acquisition/fluview). To run ingestion, cronicle runs [fluview_update.py](https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/acquisition/fluview/fluview_update.py), while the other scripts provide methods for that. - ### Secrets If you are pulling from an API or other source which needs authentication, you will need to add your secret into the backend. How to go about this for new endpoints is TODO. +## Tests +It is recommended to use a dummy database as a part of unit testing; for an example see TODO ## Adding new packages If for whatever reason you need to add a new dependency TODO -# update the server +# update the server API -1. create a new file in `/src/server/endpoints/` e.g., `fluview_meta.py`, or copy an existing one. +1. create a new file in `/src/server/endpoints/`, e.g. `fluview_meta.py`, or copy an existing one. 2. edit the created file `Blueprint("fluview_meta", __name__)` such that the first argument matches the target endpoint name 3. edit the existing `/src/server/endpoints/__init__.py` to add the newly-created file to the imports (top) and to the list of endpoints (below). # update the client libraries - + There are currently four client libraries. They all need to be updated to make the new `fluview_meta` endpoint available to callers. The pattern is very similar for all endpoints so that copy-paste will get you 90% of the way there. diff --git a/src/acquisition/wastewater/database.py b/src/acquisition/wastewater/database.py new file mode 100644 index 000000000..5fd56923b --- /dev/null +++ b/src/acquisition/wastewater/database.py @@ -0,0 +1,563 @@ +"""A utility class that handles database operations related to covidcast. + +See src/ddl/covidcast.sql for an explanation of each field. +""" +import threading +from math import ceil +from multiprocessing import cpu_count +from queue import Queue, Empty +from typing import List + +# third party +import json +import mysql.connector + +# first party +import delphi.operations.secrets as secrets +from delphi_utils import get_structured_logger +from delphi.epidata.common.covidcast_row import CovidcastRow + + +class DBLoadStateException(Exception): + pass + + +class Database: + """A collection of covidcast database operations.""" + + DATABASE_NAME = 'covid' + + load_table = "epimetric_load" + # if you want to deal with foreign key ids: use table + # if you want to deal with source/signal names, geo type/values, etc: use view + latest_table = "epimetric_latest" + latest_view = latest_table + "_v" + history_table = "epimetric_full" + history_view = history_table + "_v" + # TODO: consider using class variables like this for dimension table names too + # TODO: also consider that for composite key tuples, like short_comp_key and long_comp_key as used in delete_batch() + + + def connect(self, connector_impl=mysql.connector): + """Establish a connection to the database.""" + + u, p = secrets.db.epi + self._connector_impl = connector_impl + self._connection = self._connector_impl.connect( + host=secrets.db.host, + user=u, + password=p, + database=Database.DATABASE_NAME) + self._cursor = self._connection.cursor() + + def commit(self): + self._connection.commit() + + def rollback(self): + self._connection.rollback() + + def disconnect(self, commit): + """Close the database connection. + + commit: if true, commit changes, otherwise rollback + """ + + self._cursor.close() + if commit: + self._connection.commit() + self._connection.close() + + + + def count_all_load_rows(self): + self._cursor.execute(f'SELECT count(1) FROM `{self.load_table}`') + for (num,) in self._cursor: + return num + + def _reset_load_table_ai_counter(self): + """Corrects the AUTO_INCREMENT counter in the load table. + + To be used in emergencies only, if the load table was accidentally TRUNCATEd. + This ensures any `epimetric_id`s generated by the load table will not collide with the history or latest tables. + This is also destructive to any data in the load table. + """ + + self._cursor.execute('DELETE FROM epimetric_load') + # NOTE: 'ones' are used as filler here for the (required) NOT NULL columns. + self._cursor.execute(""" + INSERT INTO epimetric_load + (epimetric_id, + source, `signal`, geo_type, geo_value, time_type, time_value, issue, `lag`, value_updated_timestamp) + VALUES + ((SELECT 1+MAX(epimetric_id) FROM epimetric_full), + '1', '1', '1', '1', '1', 1, 1, 1, 1);""") + self._cursor.execute('DELETE FROM epimetric_load') + + def do_analyze(self): + """performs and stores key distribution analyses, used for join order and index selection""" + # TODO: consider expanding this to update columns' histograms + # https://dev.mysql.com/doc/refman/8.0/en/analyze-table.html#analyze-table-histogram-statistics-analysis + self._cursor.execute( + f'''ANALYZE TABLE + signal_dim, geo_dim, + {self.load_table}, {self.history_table}, {self.latest_table}''') + output = [self._cursor.column_names] + self._cursor.fetchall() + get_structured_logger('do_analyze').info("ANALYZE results", results=str(output)) + + def insert_or_update_bulk(self, cc_rows): + return self.insert_or_update_batch(cc_rows) + + def insert_or_update_batch(self, cc_rows: List[CovidcastRow], batch_size=2**20, commit_partial=False, suppress_jobs=False): + """ + Insert new rows into the load table and dispatch into dimension and fact tables. + """ + + if 0 != self.count_all_load_rows(): + err_msg = "Non-zero count in the load table!!! This indicates a previous acquisition run may have failed, another acquisition is in progress, or this process does not otherwise have exclusive access to the db!" + get_structured_logger("insert_or_update_batch").fatal(err_msg) + raise DBLoadStateException(err_msg) + + # NOTE: `value_update_timestamp` is hardcoded to "NOW" (which is appropriate) and + # `is_latest_issue` is hardcoded to 1 (which is temporary and addressed later in this method) + insert_into_loader_sql = f''' + INSERT INTO `{self.load_table}` + (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, + `value_updated_timestamp`, `value`, `stderr`, `sample_size`, `issue`, `lag`, + `is_latest_issue`, `missing_value`, `missing_stderr`, `missing_sample_size`) + VALUES + (%s, %s, %s, %s, %s, %s, + UNIX_TIMESTAMP(NOW()), %s, %s, %s, %s, %s, + 1, %s, %s, %s) + ''' + + # all load table entries are already marked "is_latest_issue". + # if an entry in the load table is NOT in the latest table, it is clearly now the latest value for that key (so we do nothing (thanks to INNER join)). + # if an entry *IS* in both load and latest tables, but latest table issue is newer, unmark is_latest_issue in load. + fix_is_latest_issue_sql = f''' + UPDATE + `{self.load_table}` JOIN `{self.latest_view}` + USING (`source`, `signal`, `geo_type`, `geo_value`, `time_type`, `time_value`) + SET `{self.load_table}`.`is_latest_issue`=0 + WHERE `{self.load_table}`.`issue` < `{self.latest_view}`.`issue` + ''' + + # TODO: consider handling cc_rows as a generator instead of a list + + try: + num_rows = len(cc_rows) + total = 0 + if not batch_size: + batch_size = num_rows + num_batches = ceil(num_rows/batch_size) + for batch_num in range(num_batches): + start = batch_num * batch_size + end = min(num_rows, start + batch_size) + + args = [( + row.source, + row.signal, + row.time_type, + row.geo_type, + row.time_value, + row.geo_value, + row.value, + row.stderr, + row.sample_size, + row.issue, + row.lag, + row.missing_value, + row.missing_stderr, + row.missing_sample_size + ) for row in cc_rows[start:end]] + + + self._cursor.executemany(insert_into_loader_sql, args) + modified_row_count = self._cursor.rowcount + self._cursor.execute(fix_is_latest_issue_sql) + if not suppress_jobs: + self.run_dbjobs() # TODO: incorporate the logic of dbjobs() into this method [once calls to dbjobs() are no longer needed for migrations] + + if modified_row_count is None or modified_row_count == -1: + # the SQL connector does not support returning number of rows affected (see PEP 249) + total = None + else: + total += modified_row_count + if commit_partial: + self._connection.commit() + except Exception as e: + # rollback is handled in csv_to_database; if you're calling this yourself, handle your own rollback + raise e + return total + + def run_dbjobs(self): + + # we do this LEFT JOIN trick because mysql cant do set difference (aka EXCEPT or MINUS) + # (as in " select distinct source, signal from signal_dim minus select distinct source, signal from epimetric_load ") + signal_dim_add_new_load = f''' + INSERT INTO signal_dim (`source`, `signal`) + SELECT DISTINCT sl.source, sl.signal + FROM {self.load_table} AS sl LEFT JOIN signal_dim AS sd + USING (`source`, `signal`) + WHERE sd.source IS NULL + ''' + + # again, same trick to get around lack of EXCEPT/MINUS + geo_dim_add_new_load = f''' + INSERT INTO geo_dim (`geo_type`, `geo_value`) + SELECT DISTINCT sl.geo_type, sl.geo_value + FROM {self.load_table} AS sl LEFT JOIN geo_dim AS gd + USING (`geo_type`, `geo_value`) + WHERE gd.geo_type IS NULL + ''' + + epimetric_full_load = f''' + INSERT INTO {self.history_table} + (epimetric_id, signal_key_id, geo_key_id, issue, data_as_of_dt, + time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, + computation_as_of_dt, missing_value, missing_stderr, missing_sample_size) + SELECT + epimetric_id, sd.signal_key_id, gd.geo_key_id, issue, data_as_of_dt, + time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, + computation_as_of_dt, missing_value, missing_stderr, missing_sample_size + FROM `{self.load_table}` sl + INNER JOIN signal_dim sd USING (source, `signal`) + INNER JOIN geo_dim gd USING (geo_type, geo_value) + ON DUPLICATE KEY UPDATE + `epimetric_id` = sl.`epimetric_id`, + `value_updated_timestamp` = sl.`value_updated_timestamp`, + `value` = sl.`value`, + `stderr` = sl.`stderr`, + `sample_size` = sl.`sample_size`, + `lag` = sl.`lag`, + `missing_value` = sl.`missing_value`, + `missing_stderr` = sl.`missing_stderr`, + `missing_sample_size` = sl.`missing_sample_size` + ''' + + epimetric_latest_load = f''' + INSERT INTO {self.latest_table} + (epimetric_id, signal_key_id, geo_key_id, issue, data_as_of_dt, + time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, + computation_as_of_dt, missing_value, missing_stderr, missing_sample_size) + SELECT + epimetric_id, sd.signal_key_id, gd.geo_key_id, issue, data_as_of_dt, + time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, + computation_as_of_dt, missing_value, missing_stderr, missing_sample_size + FROM `{self.load_table}` sl + INNER JOIN signal_dim sd USING (source, `signal`) + INNER JOIN geo_dim gd USING (geo_type, geo_value) + WHERE is_latest_issue = 1 + ON DUPLICATE KEY UPDATE + `epimetric_id` = sl.`epimetric_id`, + `value_updated_timestamp` = sl.`value_updated_timestamp`, + `value` = sl.`value`, + `stderr` = sl.`stderr`, + `sample_size` = sl.`sample_size`, + `issue` = sl.`issue`, + `lag` = sl.`lag`, + `missing_value` = sl.`missing_value`, + `missing_stderr` = sl.`missing_stderr`, + `missing_sample_size` = sl.`missing_sample_size` + ''' + + # NOTE: DO NOT `TRUNCATE` THIS TABLE! doing so will ruin the AUTO_INCREMENT counter that the history and latest tables depend on... + epimetric_load_delete_processed = f''' + DELETE FROM `{self.load_table}` + ''' + + logger = get_structured_logger("run_dbjobs") + import time + time_q = [time.time()] + + try: + self._cursor.execute(signal_dim_add_new_load) + time_q.append(time.time()) + logger.debug('signal_dim_add_new_load', rows=self._cursor.rowcount, elapsed=time_q[-1]-time_q[-2]) + + self._cursor.execute(geo_dim_add_new_load) + time_q.append(time.time()) + logger.debug('geo_dim_add_new_load', rows=self._cursor.rowcount, elapsed=time_q[-1]-time_q[-2]) + + self._cursor.execute(epimetric_full_load) + time_q.append(time.time()) + logger.debug('epimetric_full_load', rows=self._cursor.rowcount, elapsed=time_q[-1]-time_q[-2]) + + self._cursor.execute(epimetric_latest_load) + time_q.append(time.time()) + logger.debug('epimetric_latest_load', rows=self._cursor.rowcount, elapsed=time_q[-1]-time_q[-2]) + + self._cursor.execute(epimetric_load_delete_processed) + time_q.append(time.time()) + logger.debug('epimetric_load_delete_processed', rows=self._cursor.rowcount, elapsed=time_q[-1]-time_q[-2]) + except Exception as e: + raise e + + return self + + + def delete_batch(self, cc_deletions): + """ + Remove rows specified by a csv file or list of tuples. + + If cc_deletions is a filename, the file should include a header row and use the following field order: + - geo_id + - value (ignored) + - stderr (ignored) + - sample_size (ignored) + - issue (YYYYMMDD format) + - time_value (YYYYMMDD format) + - geo_type + - signal + - source + + If cc_deletions is a list of tuples, the tuples should use the following field order (=same as above, plus time_type): + - geo_id + - value (ignored) + - stderr (ignored) + - sample_size (ignored) + - issue (YYYYMMDD format) + - time_value (YYYYMMDD format) + - geo_type + - signal + - source + - time_type + """ + + tmp_table_name = "tmp_delete_table" + # composite keys: + short_comp_key = "`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`" + long_comp_key = short_comp_key + ", `issue`" + + create_tmp_table_sql = f''' +CREATE TABLE {tmp_table_name} LIKE {self.load_table}; +''' + + amend_tmp_table_sql = f''' +ALTER TABLE {tmp_table_name} ADD COLUMN delete_history_id BIGINT UNSIGNED, + ADD COLUMN delete_latest_id BIGINT UNSIGNED, + ADD COLUMN update_latest BINARY(1) DEFAULT 0; +''' + + load_tmp_table_infile_sql = f''' +LOAD DATA INFILE "{cc_deletions}" +INTO TABLE {tmp_table_name} +FIELDS TERMINATED BY "," +IGNORE 1 LINES +(`geo_value`, `value`, `stderr`, `sample_size`, `issue`, `time_value`, `geo_type`, `signal`, `source`) +SET time_type="day"; +''' + + load_tmp_table_insert_sql = f''' +INSERT INTO {tmp_table_name} +(`geo_value`, `value`, `stderr`, `sample_size`, `issue`, `time_value`, `geo_type`, `signal`, `source`, `time_type`, +`value_updated_timestamp`, `lag`, `is_latest_issue`) +VALUES +(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, +0, 0, 0) +''' + + add_history_id_sql = f''' +UPDATE {tmp_table_name} d INNER JOIN {self.history_view} h USING ({long_comp_key}) +SET d.delete_history_id=h.epimetric_id; +''' + + # if a row we are deleting also appears in the 'latest' table (with a matching 'issue')... + mark_for_update_latest_sql = f''' +UPDATE {tmp_table_name} d INNER JOIN {self.latest_view} ell USING ({long_comp_key}) +SET d.update_latest=1, d.delete_latest_id=ell.epimetric_id; +''' + + delete_history_sql = f''' +DELETE h FROM {tmp_table_name} d INNER JOIN {self.history_table} h ON d.delete_history_id=h.epimetric_id; +''' + + # ...remove it from 'latest'... + delete_latest_sql = f''' +DELETE ell FROM {tmp_table_name} d INNER JOIN {self.latest_table} ell ON d.delete_latest_id=ell.epimetric_id; +''' + + # ...and re-write that record with its next-latest issue (from 'history') instead. + # NOTE: this must be executed *AFTER* `delete_history_sql` to ensure we get the correct `issue` + # AND also after `delete_latest_sql` so that we dont get a key collision on insert. + update_latest_sql = f''' +INSERT INTO {self.latest_table} + (epimetric_id, + signal_key_id, geo_key_id, time_type, time_value, issue, + value, stderr, sample_size, `lag`, value_updated_timestamp, + missing_value, missing_stderr, missing_sample_size) +SELECT + h.epimetric_id, + h.signal_key_id, h.geo_key_id, h.time_type, h.time_value, h.issue, + h.value, h.stderr, h.sample_size, h.`lag`, h.value_updated_timestamp, + h.missing_value, h.missing_stderr, h.missing_sample_size +FROM {self.history_view} h JOIN ( + SELECT {short_comp_key}, MAX(hh.issue) AS issue + FROM {self.history_view} hh JOIN {tmp_table_name} dd USING ({short_comp_key}) + WHERE dd.update_latest=1 GROUP BY {short_comp_key} + ) d USING ({long_comp_key}); +''' + + drop_tmp_table_sql = f'DROP TABLE IF EXISTS {tmp_table_name}' + + total = None + try: + self._cursor.execute(drop_tmp_table_sql) + self._cursor.execute(create_tmp_table_sql) + self._cursor.execute(amend_tmp_table_sql) + if isinstance(cc_deletions, str): + self._cursor.execute(load_tmp_table_infile_sql) + elif isinstance(cc_deletions, list): + def split_list(lst, n): + for i in range(0, len(lst), n): + yield lst[i:(i+n)] + for deletions_batch in split_list(cc_deletions, 100000): + self._cursor.executemany(load_tmp_table_insert_sql, deletions_batch) + print(f"load_tmp_table_insert_sql:{self._cursor.rowcount}") + else: + raise Exception(f"Bad deletions argument: need a filename or a list of tuples; got a {type(cc_deletions)}") + self._cursor.execute(add_history_id_sql) + print(f"add_history_id_sql:{self._cursor.rowcount}") + self._cursor.execute(mark_for_update_latest_sql) + print(f"mark_for_update_latest_sql:{self._cursor.rowcount}") + self._cursor.execute(delete_history_sql) + print(f"delete_history_sql:{self._cursor.rowcount}") + total = self._cursor.rowcount + # TODO: consider reporting rows removed and/or replaced in latest table as well + self._cursor.execute(delete_latest_sql) + print(f"delete_latest_sql:{self._cursor.rowcount}") + self._cursor.execute(update_latest_sql) + print(f"update_latest_sql:{self._cursor.rowcount}") + self._connection.commit() + + if total == -1: + # the SQL connector does not support returning number of rows affected (see PEP 249) + total = None + except Exception as e: + raise e + finally: + self._cursor.execute(drop_tmp_table_sql) + return total + + + def compute_covidcast_meta(self, table_name=None, n_threads=None): + """Compute and return metadata on all COVIDcast signals.""" + logger = get_structured_logger("compute_covidcast_meta") + + if table_name is None: + table_name = self.latest_view + + if n_threads is None: + logger.info("n_threads unspecified, automatically choosing based on number of detected cores...") + n_threads = max(1, cpu_count()*9//10) # aka number of concurrent db connections, which [sh|c]ould be ~<= 90% of the #cores available to SQL server + # NOTE: this may present a small problem if this job runs on different hardware than the db, + # which is why this value can be overriden by optional argument. + logger.info(f"using {n_threads} workers") + + srcsigs = Queue() # multi-consumer threadsafe! + sql = f'SELECT `source`, `signal` FROM `{table_name}` GROUP BY `source`, `signal` ORDER BY `source` ASC, `signal` ASC;' + self._cursor.execute(sql) + for source, signal in self._cursor: + srcsigs.put((source, signal)) + + inner_sql = f''' + SELECT + `source` AS `data_source`, + `signal`, + `time_type`, + `geo_type`, + MIN(`time_value`) AS `min_time`, + MAX(`time_value`) AS `max_time`, + COUNT(DISTINCT `geo_value`) AS `num_locations`, + MIN(`value`) AS `min_value`, + MAX(`value`) AS `max_value`, + ROUND(AVG(`value`),7) AS `mean_value`, + ROUND(STD(`value`),7) AS `stdev_value`, + MAX(`value_updated_timestamp`) AS `last_update`, + MAX(`issue`) as `max_issue`, + MIN(`lag`) as `min_lag`, + MAX(`lag`) as `max_lag` + FROM + `{table_name}` + WHERE + `source` = %s AND + `signal` = %s + GROUP BY + `time_type`, + `geo_type` + ORDER BY + `time_type` ASC, + `geo_type` ASC + ''' + + meta = [] + meta_lock = threading.Lock() + + def worker(): + name = threading.current_thread().name + logger.info("starting thread", thread=name) + # set up new db connection for thread + worker_dbc = Database() + worker_dbc.connect(connector_impl=self._connector_impl) + w_cursor = worker_dbc._cursor + try: + while True: + (source, signal) = srcsigs.get_nowait() # this will throw the Empty caught below + logger.info("starting pair", thread=name, pair=f"({source}, {signal})") + w_cursor.execute(inner_sql, (source, signal)) + with meta_lock: + meta.extend(list( + dict(zip(w_cursor.column_names, x)) for x in w_cursor + )) + srcsigs.task_done() + except Empty: + logger.info("no jobs left, thread terminating", thread=name) + finally: + worker_dbc.disconnect(False) # cleanup + + threads = [] + for n in range(n_threads): + t = threading.Thread(target=worker, name='MetacacheThread-'+str(n)) + t.start() + threads.append(t) + + srcsigs.join() + logger.info("jobs complete") + for t in threads: + t.join() + logger.info("all threads terminated") + + meta = sorted(meta, key=lambda x: (x['data_source'], x['signal'], x['time_type'], x['geo_type'])) + + return meta + + + def update_covidcast_meta_cache(self, metadata): + """Updates the `covidcast_meta_cache` table.""" + + sql = ''' + UPDATE + `covidcast_meta_cache` + SET + `timestamp` = UNIX_TIMESTAMP(NOW()), + `epidata` = %s + ''' + epidata_json = json.dumps(metadata) + + self._cursor.execute(sql, (epidata_json,)) + + def retrieve_covidcast_meta_cache(self): + """Useful for viewing cache entries (was used in debugging)""" + + sql = ''' + SELECT `epidata` + FROM `covidcast_meta_cache` + ORDER BY `timestamp` DESC + LIMIT 1; + ''' + self._cursor.execute(sql) + cache_json = self._cursor.fetchone()[0] + cache = json.loads(cache_json) + cache_hash = {} + for entry in cache: + cache_hash[(entry['data_source'], entry['signal'], entry['time_type'], entry['geo_type'])] = entry + return cache_hash diff --git a/src/acquisition/wastewater/nwss_constants.py b/src/acquisition/wastewater/nwss_constants.py new file mode 100644 index 000000000..2eb228bc7 --- /dev/null +++ b/src/acquisition/wastewater/nwss_constants.py @@ -0,0 +1,62 @@ +import pandas as pd + +CONCENTRATION_SIGNALS = ["pcr_conc_lin"] +METRIC_SIGNALS = ["detect_prop_15d", "percentile", "ptc_15d", "population_served"] + +TYPE_DICT_CONC = {key: float for key in CONCENTRATION_SIGNALS} +TYPE_DICT_CONC.update( + {"date": "datetime64[ns]", "key_plot_id": str, "normalization": str} +) +TYPE_DICT_METRIC = {key: float for key in METRIC_SIGNALS} + +SIG_DIGITS = 4 + +TYPE_DICT_METRIC.update( + { + "key_plot_id": str, + "date_start": "datetime64[ns]", + "date_end": "datetime64[ns]", + "first_sample_date": "datetime64[ns]", + "wwtp_jurisdiction": "category", + "wwtp_id": "Int32", + "reporting_jurisdiction": "category", + "sample_location": "category", + "county_names": "category", + "county_fips": "category", + "population_served": float, + "sampling_prior": bool, + "sample_location_specify": "Int32", + } +) +KEY_PLOT_NAMES = [ + "provider", + "wwtp_jurisdiction", + "wwtp_id", + "sample_location", + "sample_location_specify", + "sample_method", +] +KEY_PLOT_TYPES = { + "provider": "category", + "wwtp_jurisdiction": "category", + "wwtp_id": "Int32", + "sample_location": "category", + "sample_location_specify": "Int32", + "sample_method": "category", +} +AUX_INFO_COLUMNS = [ + "key_plot_id", + "wwtp_jurisdiction", + "wwtp_id", + "reporting_jurisdiction", + "sample_location", + "sample_location_specify", + "sampling_prior", + "first_sample_date", +] +COUNTY_INFO = [ + "county_names", + "county_fips", +] + +TIME_SPANNED = pd.Timedelta(14, unit="days") diff --git a/src/acquisition/wastewater/nwss_csv.py b/src/acquisition/wastewater/nwss_csv.py index 747fc2321..f9c299d93 100644 --- a/src/acquisition/wastewater/nwss_csv.py +++ b/src/acquisition/wastewater/nwss_csv.py @@ -8,6 +8,7 @@ # first party tools from .wastewater_utils import sig_digit_round, convert_df_type +from .nwss_database import Database from delphi_utils import GeoMapper import delphi_utils @@ -57,6 +58,10 @@ def key_plot_id_parse(key_plot_ids: pd.Series) -> pd.DataFrame: processed_names.sample_location_specify = pd.to_numeric( processed_names.sample_location_specify, errors="coerce" ).astype("Int32") + # representing NULL as -1 so that equality matching actually works well + processed_names.loc[ + processed_names.sample_location_specify.isna(), "sample_location_specify" + ] = -1 processed_names["key_plot_id"] = key_plot_ids processed_names = processed_names.set_index("key_plot_id") processed_names = processed_names.astype(KEY_PLOT_TYPES) @@ -178,6 +183,7 @@ def format_nwss_data( na_columns = df_concentration[df_concentration["normalization"].isna()] if na_columns.size != 0: logger.info("There are columns without normalization.", na_columns=na_columns) + conc_keys = key_plot_id_parse(pd.Series(df_concentration.key_plot_id.unique())) # get the keys+normalizations where there's an unambiguous choice of normalization key_plot_norms = df_concentration.loc[ @@ -193,8 +199,12 @@ def format_nwss_data( .drop(columns="n_norms") .set_index("key_plot_id") ) - # add the unambiguous normalizations, others get an NA - conc_keys = conc_keys.join(key_plot_norms, how="left") + # add the unambiguous normalizations + conc_keys = conc_keys.join( + key_plot_norms, + how="left", + ) + conc_keys.loc[conc_keys.normalization.isna(), "normalization"] = "unknown" metric_keys = key_plot_id_parse(pd.Series(df_metric.key_plot_id.unique())) validate_metric_key_plot_ids(df_metric, metric_keys, logger) # form the joint table of keys found in both, along with a column @@ -202,6 +212,8 @@ def format_nwss_data( joint_keys = conc_keys.reset_index().merge( metric_keys.reset_index(), indicator=True, how="outer" ) + # set any NA normalizations to "unknown" for reasons of db comparison + joint_keys.loc[joint_keys.normalization.isna(), "normalization"] = "unknown" joint_keys = joint_keys.astype(KEY_PLOT_TYPES) only_in_one = joint_keys[joint_keys._merge != "both"] if only_in_one.size > 0: @@ -322,8 +334,134 @@ def pull_nwss_data(token: str, logger: Logger) -> tuple[pd.DataFrame, pd.DataFra Dataframe as described above. """ # TODO temp, remove + import os + + token = os.environ.get("SODAPY_APPTOKEN") logger = get_structured_logger("csv_ingestion", filename="writingErrorLog") - # TODO reinstate this, similar for metric - # df_concentration = df_concentration.rename(columns={"date": "reference_date"}) + # end temp df_concentration, df_metric = download_raw_data(token, logger) df, auxiliary_info = format_nwss_data(df_concentration, df_metric) + df = df.rename( + columns={ + "date": "reference_date", + "sample_location_specify": "sample_loc_specify", + } + ) + # sample location is a boolean indictating whether sample_loc_specify is NA + df = df.drop(columns=["key_plot_id", "sample_location"]) + df["lag"] = pd.Timestamp.today().normalize() - df.reference_date + df["lag"] = df["lag"].dt.days + df["source"] = "NWSS" + # TODO missing value code + df["missing_value"] = 0 + df["version"] = pd.Timestamp("today").normalize() + df["pathogen"] = "COVID" + df["value_updated_timestamp"] = pd.Timestamp("now") + return df, auxiliary_info + + +from sqlalchemy import create_engine +from sqlalchemy import text + + +for table_name in inspector.get_table_names(): + for column in inspector.get_columns(table_name): + print("Column: %s" % column["name"]) + +with database.engine.connect() as conn: + res = conn.execute(text("show tables;")) + print(pd.DataFrame(res.all())) + +# accident +with database.engine.begin() as conn: + conn.execute(text("DROP TABLE wastewater_granular")) + +with database.engine.connect() as conn: + result = conn.execute(text("SELECT * FROM wastewater_granular_load")) + for row in result: + print(row) + +with database.engine.connect() as conn: + result = conn.execute(text("SELECT * FROM `signal_dim`")) + print(result.keys()) + for row in result: + print(row) + +with database.engine.connect() as conn: + result = conn.execute(text("SELECT * FROM `plant_dim`")) + print(result.keys()) + print(pd.DataFrame(result)) +stmt = """ +SELECT ld.source, ld.signal, ld.pathogen, ld.provider, ld.normalization, td.source, td.signal, td.pathogen, td.provider, td.normalization +FROM `wastewater_granular_load` AS ld +LEFT JOIN `signal_dim` AS td + USING (`source`, `signal`, `pathogen`, `provider`, `normalization`) +""" +# WHERE td.`source` IS NULL +print(stmt) +with database.engine.connect() as conn: + result = conn.execute(text(stmt)) + print(result.keys()) + for row in result: + print(row) + +with database.engine.connect() as conn: + result = conn.execute(text("SELECT * FROM `sample_site_dim`")) + print(result.keys()) + for row in result: + print(row) +toClaa = """ +INSERT INTO plant_dim (`wwtp_jurisdiction`, `wwtp_id`) + SELECT DISTINCT ld.wwtp_jurisdiction, ld.wwtp_id + FROM `wastewater_granular_load` AS ld + LEFT JOIN `plant_dim` AS td + USING (`wwtp_jurisdiction`, `wwtp_id`) + WHERE td.`wwtp_id` IS NULL +""" +with database.engine.connect() as conn: + result = conn.execute(text(toClaa)) +with database.engine.connect() as conn: + result = conn.execute(text("SELECT * FROM `plant_dim`")) + print(result.keys()) + for row in result: + print(row) + +toCall = f""" + SELECT DISTINCT ld.sample_loc_specify, ld.sample_method, ft.plant_id + FROM `wastewater_granular_load` AS ld + LEFT JOIN `sample_site_dim` AS td + USING (`sample_loc_specify`, `sample_method`) + LEFT JOIN `plant_dim` AS ft + USING (`wwtp_jurisdiction`, `wwtp_id`) + WHERE td.`sample_method` IS NULL +""" +print(toCall) +with database.engine.connect() as conn: + result = conn.execute(text(toCall)) + print(result.cursor.description) + print(result.keys()) + for row in result: + print(row) + +df_tiny = df.sample(n=20, random_state=1) +df_tiny2 = df.sample(n=20, random_state=2) +df_tiny +df_tiny.to_sql( + name=["wastewater_granular_full", "plant_dim"], + con=engine, + if_exists="append", + method="multi", + index=False, +) + + +# roughly what needs to happen (unsure which language/where): +# 1. SELECT * FROM latest +# 2. compare and drop if it's redundant +# 3. assign the various id's (I think this may be handled by the load datatable) +# 4. split into seprate tables +# 5. INSERT the new rows +# 6? aggregate to state level (maybe from load,) +# 7. delete load +# Ok but actually, I will also be writing a separate aux table, that shouldn't need a separate load table and will happen separately +# Also, I am going to need to dedupe, which I guess can happen during the join diff --git a/src/acquisition/wastewater/nwss_database.py b/src/acquisition/wastewater/nwss_database.py new file mode 100644 index 000000000..18dee1bd8 --- /dev/null +++ b/src/acquisition/wastewater/nwss_database.py @@ -0,0 +1,317 @@ +import pandas as pd + +from sqlalchemy import Date, create_engine +from sqlalchemy import insert +from sqlalchemy import update +from sqlalchemy import text +from sqlalchemy import MetaData +from logging import Logger +import sqlalchemy +from .nwss_constants import SIG_DIGITS + + +# TODO add a bunch of try catches and logger stuff +# TODO make sure this catches redundants when value is NULL +def add_backticks(string_array: list[str]): + """enforce that every entry starts and ends with a backtick.""" + string_array = [x + "`" if not x.endswith("`") else x for x in string_array] + string_array = ["`" + x if not x.startswith("`") else x for x in string_array] + return string_array + + +def subtract_backticks(string_array: list[str]): + """enforce that no entry starts and ends with a backtick.""" + string_array = [x[:-1] if x.endswith("`") else x for x in string_array] + string_array = [x[1:] if x.startswith("`") else x for x in string_array] + return string_array + + +def add_backticks_single(string): + if not string.endswith("`"): + string = string + "`" + if not string.startswith("`"): + string = "`" + string + return string + + +def sql_rel_equal(x, y, tol): + """Are x and y are within tol of one another? + + `x` and `y` are expected to already have backticks. + """ + within_tolerance = ( + f"(ABS({x} - {y}) / GREATEST(ABS({x}), ABS({y})) < {10**(-tol)}) " + ) + + return f"({within_tolerance})" + + +class Database: + """A collection of wastewater database operations""" + + DATABASE_NAME = "covid" + load_table = "`wastewater_granular_load`" + full_table = "`wastewater_granular_full`" + latest_table = "`wastewater_granular_latest`" + latest_view = "`wastewater_granular_latest_v`" + sample_site_values = add_backticks(["sample_loc_specify", "sample_method"]) + plant_values = add_backticks(["wwtp_jurisdiction", "wwtp_id"]) + signal_values = add_backticks( + ["source", "signal", "pathogen", "provider", "normalization"] + ) + full_table_core = add_backticks(["reference_date", "value", "lag"]) + full_table_meta = add_backticks( + ["version", "value_updated_timestamp", "computation_as_of_dt", "missing_value"] + ) + key_ids = add_backticks(["signal_key_id", "site_key_id"]) + main_id = "`wastewater_id`" + + def __init__( + self, + logger: Logger, + engine: sqlalchemy.engine.base.Engine = create_engine( + "mysql+pymysql://user:pass@127.0.0.1:13306/epidata" + ), + meta: MetaData | None = None, + execute=True, + ): + if meta is None: + meta = MetaData() + meta.reflect(bind=engine) + self.engine = engine + self.meta = meta + self.logger = logger + self.execute = execute + + def load_DataFrame(self, df: pd.DataFrame, chunk_size=2**20): + """Import a dataframe into the load table.""" + df.to_sql( + name="wastewater_granular_load", + con=self.engine, + if_exists="append", + index=False, + method="multi", + dtype={"version": Date}, + chunksize=chunk_size, + ) + + def standard_run(self, df: pd.DataFrame): + """Given a Dataframe, perform the entire insertion operation. + + This means: + 1. loading into the load table, + 2. adding any new keys to `signal_dim`, `plant_dim`, and `sample_site_dim`, + 3. removing any values that are indistiguishable from the corresponding value in latest, + 4. inserting into both full and latest, and + 5. deleting the contents of the load table. + """ + self.load_DataFrame(df) + self.add_new_from_load("signal_dim", self.signal_values, key="signal_key_id") + self.add_new_from_load("plant_dim", self.plant_values, key="plant_id") + self.add_new_from_load( + "sample_site_dim", + self.sample_site_values, + key="site_key_id", + foreign_table="plant_dim", + foreign_keys=["plant_id"], + shared_values=self.plant_values, + execute=True, + ) + # have to wait until we've added the keys to efficiently delete + self.delete_redundant() + self.set_latest_flag() + self.add_full_latest(self.full_table) + self.add_full_latest(self.latest_table) + self.delete_load() + + def delete_redundant(self, execute=None): + # filtering out cases where: + # 2. both values are null + # 3. the values are approximately equal + delete_irrelevant_version_sql = f""" + DELETE {self.load_table} + FROM {self.load_table} + JOIN {self.latest_view} USING + ({", ".join(self.key_ids)}) + WHERE + (({self.load_table}.`value` IS NULL) AND ({self.latest_view}.`value` IS NULL)) + OR + {sql_rel_equal(f"{self.load_table}.`value`", f"{self.latest_view}.`value`", SIG_DIGITS)} + """ + if execute is None: + execute = self.execute + if execute: + with self.engine.begin() as conn: + conn.execute(text(delete_irrelevant_version_sql)) + return delete_irrelevant_version_sql + + def set_latest_flag(self, execute=None): + """Drop repeated values,and set the `is_latest_version` bit for insertion into latest.""" + # 1. version is older than the latest (unlikely to happen) + # filtering out cases where: + # 2. both values are null + # 3. the values are approximately equal + filter_relevant_version_sql = f""" + UPDATE {self.load_table} JOIN {self.latest_view} USING + ({", ".join(self.signal_values)}, {", ".join(self.sample_site_values)}, + {", ".join(self.plant_values)}, {", ".join(self.full_table_core)}) + SET {self.load_table}.`is_latest_version`=0 + WHERE + ({self.load_table}.`version` < {self.latest_view}.`version`) + """ + if execute is None: + execute = self.execute + if execute: + with self.engine.begin() as conn: + conn.execute(text(filter_relevant_version_sql)) + return filter_relevant_version_sql + + def add_new_from_load( + self, + target_table: str, + values: list[str], + key: str | None, + foreign_table: str | None = None, + shared_values: list[str] | None = None, + foreign_keys: list[str] | None = None, + execute: bool | None = None, + ): + """Add new values from the load table to the target table. + + Values specifies the list of columns as strings; they will be formatted + to contain backticks if they do not already. `key` is the name of the + `key` in the target table, and is used to insert the key value back into + the load table. + + `foreign_table`, `foreign_values`, and `foreign_keys` allow for inserting + foreign keys. `foreign_table` is the table with the foreign key, and should + already be updated. `shared_values` are the values in the load table coming + from `extra_table`. `foreign_keys` are just that. + """ + if execute is None: + execute = self.execute + if foreign_keys is None: + foreign_insert = [""] + foreign_select = "" + else: + foreign_insert = [""] + add_backticks(foreign_keys) + foreign_select = ", foreign_table." + ", foreign_table.".join( + add_backticks(foreign_keys) + ) + values_bt = add_backticks(values) + values_no_bt = subtract_backticks(values) + target_table = add_backticks_single(target_table) + add_new_target = f""" + INSERT INTO {target_table} ({", ".join(values_bt)}{", ".join(foreign_insert)}) + SELECT DISTINCT ld.{", ld.".join(values_no_bt)}{foreign_select} + FROM {self.load_table} AS ld + LEFT JOIN {target_table} AS td + USING ({", ".join(values_bt)})""" + if foreign_table is not None and shared_values is not None: + foreign_table = add_backticks_single(foreign_table) + shared_values = add_backticks(shared_values) + add_new_target += f""" + LEFT JOIN {foreign_table} AS foreign_table + USING ({", ".join(shared_values)}) + """ + # TODO TODO ADD THE ID'S BACK TO THE ORIGINAL + + # this could actually be any column as long as it's required to not be NULL; here every aux table satisfies that + add_new_target += f" WHERE td.{values_bt[0]} IS NULL" + + # if we're handed a key, add the value from that back to the load table + if key is not None: + key = add_backticks_single(key) + add_load_ids = f""" + UPDATE {self.load_table} load_table + INNER JOIN {target_table} target_table + USING ({", ".join(values_bt)}) + """ + if ( + foreign_table is not None + and shared_values is not None + and foreign_keys is not None + ): + # foreign_table.foo = load_table.foo + shared = " ".join( + [f"foreign_table.{x} = load_table.{x} AND " for x in shared_values] + ) + foreign = " ".join( + [ + f"foreign_table.{x} = target_table.{x}" + for x in add_backticks(foreign_keys) + ] + ) + # to join with a foreign table, we need to use ON so the key from + # the foreign table can match with the key from the target table, + # and not the load table (since that is the null we're trying to + # replace) + add_load_ids += f"""INNER JOIN {foreign_table} AS foreign_table + ON ({shared} {foreign}) + """ + add_load_ids += f"SET load_table.{key} = target_table.{key}" + else: + add_load_ids = "" + if execute: + with self.engine.begin() as conn: + conn.execute(text(add_new_target)) + if key is not None: + conn.execute(text(add_load_ids)) + return add_new_target, add_load_ids + + def read_table(self, table_name, max_entries: int | None = None): + table_name = add_backticks_single(table_name) + if max_entries is None: + statement = f"SELECT * FROM {table_name}" + else: + statement = f"SELECT top {max_entries} FROM `{table_name}`" + res = None + with self.engine.connect() as conn: + res = conn.execute(text(statement)) + res = pd.DataFrame(res) + return res + + def add_full_latest(self, target_table, execute: bool | None = None): + if execute is None: + execute = self.execute + target_table = add_backticks_single(target_table) + + add_to_table = f""" + INSERT INTO {target_table} + ({self.main_id}, {", ".join(self.key_ids)}, {", ".join(self.full_table_core)}, {", ".join(self.full_table_meta)}) + SELECT + {self.main_id}, {", ".join(self.key_ids)}, {", ".join(self.full_table_core)}, {", ".join(self.full_table_meta)} + FROM {self.load_table} lt + """ + + if target_table == self.latest_table: + add_to_table += "WHERE `is_latest_version` = 1" + meta_partial = self.full_table_meta + elif target_table == self.full_table: + # we don't update version for the full database + meta_partial = self.full_table_meta.copy() + meta_partial.remove("`version`") + else: + raise AssertionError("can't handle table names other than latest or full") + core_list = " \n".join( + [f"{x} = lt.{x}," for x in self.full_table_core] + ) + meta_list = " \n".join([f"{x} = lt.{x}," for x in meta_partial]) + add_to_table += f""" + ON DUPLICATE KEY UPDATE + {core_list} + {meta_list.rstrip()[:-1]} + """ + if execute: + with self.engine.begin() as conn: + res = conn.execute(text(add_to_table)) + return add_to_table + + def delete_load(self, execute=None): + if execute is None: + execute = self.execute + statement = f"DELETE FROM {self.load_table}" + if execute: + with self.engine.begin() as conn: + res = conn.execute(text(statement)) + return statement diff --git a/src/acquisition/wastewater/wastewater_utils.py b/src/acquisition/wastewater/wastewater_utils.py index 6fc8841e4..f4d238132 100644 --- a/src/acquisition/wastewater/wastewater_utils.py +++ b/src/acquisition/wastewater/wastewater_utils.py @@ -1,3 +1,6 @@ +import numpy as np + + # general utilities (that should maybe migrate elsewhere) def sig_digit_round(value, n_digits): """Truncate value to only n_digits. diff --git a/src/ddl/wastewater.sql b/src/ddl/wastewater.sql index 3cc49801e..53a7a912b 100644 --- a/src/ddl/wastewater.sql +++ b/src/ddl/wastewater.sql @@ -8,70 +8,82 @@ CREATE TABLE agg_geo_dim ( UNIQUE INDEX `agg_geo_dim_index` (`geo_type`, `geo_value`) ) ENGINE=InnoDB; -CREATE TABLE sample_site_dim ( - `site_key_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, - `plant_id` INT(10) UNSIGNED NOT NULL, - `sample_loc_specify` INT(10) UNSIGNED, -- definitely can be null - `sample_method` VARCHAR(20), - - UNIQUE INDEX `sample_site_dim_index` (`plant_id`, `sample_loc_specify`) -) ENGINE=InnoDB; - CREATE TABLE plant_dim ( `plant_id` INT(10) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, - `wwtp_jurisdiction` CHAR(3) NOT NULL, -- may only need CHAR(2), it's a state id + `wwtp_jurisdiction` VARCHAR(3) NOT NULL, -- may only need CHAR(2), it's a state id `wwtp_id` INT(10) UNSIGNED NOT NULL, UNIQUE INDEX `plant_index` (`wwtp_jurisdiction`, `wwtp_id`) ) ENGINE=InnoDB; -CREATE TABLE site_county_cross ( - `site_county_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, - `site_key_id` BIGINT(20) UNSIGNED NOT NULL, - `county_fips_id` CHAR(5) NOT NULL, - - UNIQUE INDEX `site_county_index` (`site_key_id`, `county_fips_id`) -) ENGINE=InnoDB; - - CREATE TABLE signal_dim ( `signal_key_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, `source` VARCHAR(32) NOT NULL, `signal` VARCHAR(64) NOT NULL, `pathogen` VARCHAR(64) NOT NULL, - `provider` VARCHAR(64), -- null for potential future use - `normalization` VARCHAR(64), -- null for potential future use + `provider` VARCHAR(64) NOT NULL, + `normalization` VARCHAR(64) NOT NULL, UNIQUE INDEX `signal_dim_index` (`source`, `signal`, `pathogen`, `provider`, `normalization`) ) ENGINE=InnoDB; +CREATE TABLE sample_site_dim ( + `site_key_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + `plant_id` INT(10) UNSIGNED NOT NULL, + `sample_method` VARCHAR(20) NOT NULL, + `sample_loc_specify` INT(10) NOT NULL, -- nulls are represented as -1 + + UNIQUE INDEX `sample_site_dim_index` (`plant_id`, `sample_loc_specify`), + FOREIGN KEY (`plant_id`) + REFERENCES `plant_dim`(`plant_id`) + ON DELETE RESTRICT ON UPDATE RESTRICT -- plant_id shouldn't change, it's auto_increment +) ENGINE=InnoDB; + + +CREATE TABLE site_county_cross ( + `site_county_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + `site_key_id` BIGINT(20) UNSIGNED NOT NULL, + `county_fips_id` CHAR(5) NOT NULL, + + UNIQUE INDEX `site_county_index` (`site_key_id`, `county_fips_id`), + FOREIGN KEY (`site_key_id`) + REFERENCES `sample_site_dim`(`site_key_id`) + ON DELETE RESTRICT ON UPDATE RESTRICT -- plant_id shouldn't change, it's auto_increment +) ENGINE=InnoDB; + + CREATE TABLE wastewater_granular_full ( `wastewater_id` BIGINT(20) UNSIGNED NOT NULL PRIMARY KEY, `signal_key_id` BIGINT(20) UNSIGNED NOT NULL, `site_key_id` BIGINT(20) UNSIGNED NOT NULL, - `version` INT(11) NOT NULL, - `time_type` VARCHAR(12) NOT NULL, - `reference_date` INT(11) NOT NULL, + `version` DATE NOT NULL, + `reference_date` DATE NOT NULL, `reference_dt` DATETIME(0), -- TODO: for future use `value` DOUBLE, `lag` INT(11) NOT NULL, - `value_updated_timestamp` INT(11) NOT NULL, + `value_updated_timestamp` DATETIME NOT NULL, `computation_as_of_dt` DATETIME(0), -- TODO: for future use ; also "as_of" is problematic and should be renamed `missing_value` INT(1) DEFAULT '0', - UNIQUE INDEX `value_key_tig` (`signal_key_id`, `time_type`, `reference_date`, `version`, `site_key_id`), - UNIQUE INDEX `value_key_tgi` (`signal_key_id`, `time_type`, `reference_date`, `site_key_id`, `version`), - UNIQUE INDEX `value_key_itg` (`signal_key_id`, `version`, `time_type`, `reference_date`, `site_key_id`), - UNIQUE INDEX `value_key_igt` (`signal_key_id`, `version`, `site_key_id`, `time_type`, `reference_date`), - UNIQUE INDEX `value_key_git` (`signal_key_id`, `site_key_id`, `version`, `time_type`, `reference_date`), - UNIQUE INDEX `value_key_gti` (`signal_key_id`, `site_key_id`, `time_type`, `reference_date`, `version`) + UNIQUE INDEX `value_key_tig` (`signal_key_id`, `reference_date`, `version`, `site_key_id`), + UNIQUE INDEX `value_key_tgi` (`signal_key_id`, `reference_date`, `site_key_id`, `version`), + UNIQUE INDEX `value_key_itg` (`signal_key_id`, `version`, `reference_date`, `site_key_id`), + UNIQUE INDEX `value_key_igt` (`signal_key_id`, `version`, `site_key_id`, `reference_date`), + UNIQUE INDEX `value_key_git` (`signal_key_id`, `site_key_id`, `version`, `reference_date`), + UNIQUE INDEX `value_key_gti` (`signal_key_id`, `site_key_id`, `reference_date`, `version`), + FOREIGN KEY (`site_key_id`) + REFERENCES `sample_site_dim`(`site_key_id`) + ON DELETE RESTRICT ON UPDATE RESTRICT, -- plant_id shouldn't change, it's auto_increment + FOREIGN KEY (`signal_key_id`) + REFERENCES `signal_dim`(`signal_key_id`) + ON DELETE RESTRICT ON UPDATE RESTRICT -- plant_id shouldn't change, it's auto_increment ) ENGINE=InnoDB; CREATE TABLE wastewater_granular_latest ( PRIMARY KEY (`wastewater_id`), - UNIQUE INDEX `value_key_tg` (`signal_key_id`, `time_type`, `reference_date`, `site_key_id`), - UNIQUE INDEX `value_key_gt` (`signal_key_id`, `site_key_id`, `time_type`, `reference_date`) + UNIQUE INDEX `value_key_tg` (`signal_key_id`, `reference_date`, `site_key_id`), + UNIQUE INDEX `value_key_gt` (`signal_key_id`, `site_key_id`, `reference_date`) ) ENGINE=InnoDB SELECT * FROM wastewater_granular_full; @@ -87,23 +99,32 @@ CREATE TABLE wastewater_granular_load ( `wastewater_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, `signal_key_id` BIGINT(20) UNSIGNED, `site_key_id` BIGINT(20) UNSIGNED, - `version` INT(11) NOT NULL, + `version` DATE NOT NULL, + -- signal_dim `source` VARCHAR(32) NOT NULL, `signal` VARCHAR(64) NOT NULL, - `time_type` VARCHAR(12) NOT NULL, - `reference_date` INT(11) NOT NULL, + `pathogen` VARCHAR(64) NOT NULL, + `provider` VARCHAR(64) NOT NULL, -- null for potential future use + `normalization` VARCHAR(64) NOT NULL, -- current data contains nulls in metric nwss source + -- sample_site_dim + `sample_loc_specify` INT(10) NOT NULL, + `sample_method` VARCHAR(20) NOT NULL, + -- plant_dim + `wwtp_jurisdiction` VARCHAR(3) NOT NULL, -- may only need CHAR(2), it's a state id + `wwtp_id` INT(10) UNSIGNED NOT NULL, + -- full + `reference_date` DATE NOT NULL, `reference_dt` DATETIME(0), -- TODO: for future use `value` DOUBLE, `lag` INT(11) NOT NULL, - `value_updated_timestamp` INT(11) NOT NULL, + `value_updated_timestamp` DATETIME NOT NULL, `computation_as_of_dt` DATETIME(0), -- TODO: for future use ; also "as_of" is problematic and should be renamed - `is_latest_version` BINARY(1) NOT NULL DEFAULT '0', + `is_latest_version` BINARY(1) NOT NULL DEFAULT '1', -- default to including `missing_value` INT(1) DEFAULT '0', - UNIQUE INDEX (`source`, `signal`, `time_type`, `reference_date`, `site_key_id`, `version`) + UNIQUE INDEX (`source`, `signal`, `reference_date`, `site_key_id`, `version`) ) ENGINE=InnoDB; - CREATE OR REPLACE VIEW wastewater_granular_full_v AS SELECT 0 AS `is_latest_version`, -- provides column-compatibility to match `wastewater_granular` table @@ -119,7 +140,6 @@ CREATE OR REPLACE VIEW wastewater_granular_full_v AS `t3`.`sample_loc_specify` AS `sample_loc_specify`, `t1`.`wastewater_id` AS `wastewater_id`, `t1`.`version` AS `version`, - `t1`.`time_type` AS `time_type`, `t1`.`reference_date` AS `reference_date`, `t1`.`reference_dt` AS `reference_dt`, -- TODO: for future use `t1`.`value` AS `value`, @@ -139,17 +159,20 @@ CREATE OR REPLACE VIEW wastewater_granular_latest_v AS SELECT 1 AS `is_latest_version`, -- provides column-compatibility to match `wastewater_granular` table NULL AS `direction`, -- provides column-compatibility to match `wastewater_granular` table + -- signal_dim `t2`.`source` AS `source`, `t2`.`signal` AS `signal`, `t2`.`pathogen` AS `pathogen`, `t2`.`provider` AS `provider`, `t2`.`normalization` AS `normalization`, + -- plant_dim `t4`.`wwtp_id` AS `wwtp_id`, `t4`.`wwtp_jurisdiction` AS `wwtp_jurisdiction`, + -- sample_site_dim `t3`.`sample_loc_specify` AS `sample_loc_specify`, + `t3`.`sample_method` AS `sample_method`, `t1`.`wastewater_id` AS `wastewater_id`, `t1`.`version` AS `version`, - `t1`.`time_type` AS `time_type`, `t1`.`reference_date` AS `reference_date`, `t1`.`reference_dt` AS `reference_dt`, -- TODO: for future use `t1`.`value` AS `value`, diff --git a/tests/acquisition/wastewater/database.py b/tests/acquisition/wastewater/database.py new file mode 100644 index 000000000..aa64aa3bb --- /dev/null +++ b/tests/acquisition/wastewater/database.py @@ -0,0 +1,290 @@ +#!/usr/bin/env python3 +import pandas as pd +from delphi_utils.logger import get_structured_logger +from logging import Logger +from delphi.epidata.acquisition.wastewater.database import Database + +df_tiny = pd.DataFrame() + +# todo remove, temp for development +logger = get_structured_logger("csv_ingestion", filename="writingErrorLog") + +database = Database(logger, execute=True) +df_tiny.dtypes +df_old = df_tiny.copy() +df_old.version = (pd.Timestamp("today") - pd.Timedelta("7 days")).normalize() +with pd.option_context( + "display.max_rows", None, "display.max_columns", 700, "display.width", 181 +): + print(df_old.sort_values("wwtp_id")) + print(df_tiny.sort_values("wwtp_id")) +database.read_table("wastewater_granular_load") +database.read_table("signal_dim") +database.read_table("plant_dim").sort_values("wwtp_id") +database.read_table("sample_site_dim") +database.read_table("wastewater_granular_full") +database.read_table("wastewater_granular_latest") +database.read_table("wastewater_granular_latest_v") +with pd.option_context( + "display.max_rows", None, "display.max_columns", 700, "display.width", 181 +): + print(database.read_table("wastewater_granular_full")) + print(database.read_table("wastewater_granular_latest_v")) + print(database.read_table("wastewater_granular_load")) +database.read_table("wastewater_granular_latest") +database.load_DataFrame(df_tiny) +test_read = """ + SELECT + lt.`wastewater_id`, sd.`signal_key_id`, ssd.`site_key_id`, sd.`source`, sd.`signal`, sd.`pathogen`, sd.`provider`, sd.`normalization`, pd.`wwtp_jurisdiction`, pd.`wwtp_id`, ssd.`sample_loc_specify`, ssd.`sample_method` + FROM `wastewater_granular_load` lt + INNER JOIN signal_dim sd USING (`source`, `signal`, `pathogen`, `provider`, `normalization`) + INNER JOIN sample_site_dim ssd USING (`sample_loc_specify`, `sample_method`) + INNER JOIN plant_dim pd ON pd.`wwtp_jurisdiction`=lt.`wwtp_jurisdiction` AND pd.`wwtp_id`=lt.`wwtp_id` AND pd.`plant_id`=ssd.`plant_id` +""" +test_read = """ +SELECT * FROM `wastewater_granular_load` load_table +LEFT JOIN `wastewater_granular_latest_v` USING +(`signal_key_id`, `site_key_id`, `reference_date`) +""" +res = 3 +with database.engine.connect() as conn: + res = conn.execute(text(test_read)) + with pd.option_context( + "display.max_rows", None, "display.max_columns", 700, "display.width", 181 + ): + print(pd.DataFrame(res)) + +database.standard_run(df_old) +with pd.option_context( + "display.max_rows", None, "display.max_columns", 700, "display.width", 181 +): + print(database.read_table("wastewater_granular_load")) +database.read_table("signal_dim") +database.read_table("plant_dim").sort_values("wwtp_id") +database.read_table("sample_site_dim") +database.read_table("wastewater_granular_load").transpose() +database.read_table("wastewater_granular_latest").transpose() +with pd.option_context( + "display.max_rows", None, "display.max_columns", 700, "display.width", 181 +): + print(database.read_table("wastewater_granular_latest")) +with pd.option_context( + "display.max_rows", None, "display.max_columns", 700, "display.width", 181 +): + print(database.read_table("wastewater_granular_latest_v")) +database.load_DataFrame(df_tiny) +database.load_DataFrame(df_old) +database.read_table("signal_dim") +database.read_table("wastewater_granular_full").transpose() +database.read_table("sample_site_dim") +database.read_table("wastewater_granular_load") +database.add_new_from_load("signal_dim", database.signal_values, "signal_key_id") +print(database.add_new_from_load("plant_dim", database.plant_values, None)) +print( + database.add_new_from_load( + "sample_site_dim", + database.sample_site_values, + key="site_key_id", + foreign_table="plant_dim", + foreign_keys=["plant_id"], + shared_values=database.plant_values, + execute=True, + ) +) +print(database.delete_redundant()) +print(database.set_latest_flag()) +print(database.add_full_latest(database.full_table)) +print(database.add_full_latest(database.latest_table)) +database.delete_load() +database.add_full_latest(database.full_table) +database.add_full_latest(database.latest_table) +database.load_DataFrame(df_tiny) +load_table = database.read_table("wastewater_granular_load") +load_table.dtypes +database.delete_redundant() +database +database.read_table("wastewater_granular_load") +database.set_latest_flag() +database.read_table("wastewater_granular_load") +# test that add_new_load is idempotent +# test that it adds the correct features +print(database.add_new_from_load("signal_dim", database.signal_values, "source")) +database.read_table("signal_dim") +print(database.add_new_from_load("plant_dim", database.plant_values, "wwtp_id")) +database.read_table("plant_dim") +print( + database.add_new_from_load( + "sample_site_dim", + database.sample_site_values, + "sample_method", + foreign_table="plant_dim", + foreign_keys=["plant_id"], + shared_values=database.plant_values, + execute=True, + ) +) +database.read_table("sample_site_dim") +database.add_full_latest(database.full_table) +database.read_table(database.full_table) +database.read_table(database.latest_table) +database.delete_load() + + +"\n".join([f"{x} = lt.{x}," for x in database.full_table_meta]) +load_table = "`wastewater_granular_load`" +latest_view = "`wastewater_granular_latest_v`" +sample_site_values = add_backticks(["sample_loc_specify", "sample_method"]) +plant_values = add_backticks(["wwtp_jurisdiction", "wwtp_id"]) +signal_values = add_backticks( + ["source", "signal", "pathogen", "provider", "normalization"] +) +full_table_index = add_backticks(["reference_date", "value", "lag"]) +full_table_values = add_backticks(["value", "lag"]) + + +values = signal_values +target_table = "signal_dim" + + +signal_dim_add_new_load = f""" + INSERT INTO signal_dim (`source`, `signal`) + SELECT DISTINCT sl.source, sl.signal + FROM {self.load_table} AS sl LEFT JOIN signal_dim AS sd + USING (`source`, `signal`) + WHERE sd.source IS NULL +""" + +meta.tables["wastewater_granular_load"].c.site_key_id +meta.tables["wastewater_granular_load"].primary_key +for table in meta.tables.values(): + print(table.name) + for column in table.c: + print(" " + column.name) +a = meta.reflect(bind=engine) +a +# The way epimetrics does it: +# 1. put into load +# +insert_into_loader_sql = f""" + INSERT INTO `{self.load_table}` + (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, + `value_updated_timestamp` , `value`, `stderr`, `sample_size`, `issue`, `lag`, + `is_latest_issue` 1, `missing_value`, `missing_stderr`, `missing_sample_size`) + VALUES + (%s, %s, %s, %s, %s, %s, + UNIX_TIMESTAMP(NOW()), %s, %s, %s, %s, %s, + 1, %s, %s, %s) +""" +# 2. join latest into load +fix_is_latest_version_sql = f""" + UPDATE + `{self.load_table}` JOIN `{self.latest_view}` + USING (`source`, `signal`, `geo_type`, `geo_value`, `time_type`, `time_value`) + SET `{self.load_table}`.`is_latest_issue`=0 + WHERE `{self.load_table}`.`version` < `{self.latest_view}`.`version` +""" +# 3. export from load to the other tables +signal_dim_add_new_load = f""" + INSERT INTO signal_dim (`source`, `signal`) + SELECT DISTINCT sl.source, sl.signal + FROM {self.load_table} AS sl LEFT JOIN signal_dim AS sd + USING (`source`, `signal`) + WHERE sd.source IS NULL +""" +geo_dim_add_new_load = f""" + INSERT INTO geo_dim (`geo_type`, `geo_value`) + SELECT DISTINCT sl.geo_type, sl.geo_value + FROM {self.load_table} AS sl LEFT JOIN geo_dim AS gd + USING (`geo_type`, `geo_value`) + WHERE gd.geo_type IS NULL +""" +epimetric_full_load = f""" + INSERT INTO {self.history_table} + (epimetric_id, signal_key_id, geo_key_id, issue, data_as_of_dt, + time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, + computation_as_of_dt, missing_value, missing_stderr, missing_sample_size) + SELECT + epimetric_id, sd.signal_key_id, gd.geo_key_id, issue, data_as_of_dt, + time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, + computation_as_of_dt, missing_value, missing_stderr, missing_sample_size + FROM `{self.load_table}` sl + INNER JOIN signal_dim sd USING (source, `signal`) + INNER JOIN geo_dim gd USING (geo_type, geo_value) + ON DUPLICATE KEY UPDATE + `epimetric_id` = sl.`epimetric_id`, + `value_updated_timestamp` = sl.`value_updated_timestamp`, + `value` = sl.`value`, + `stderr` = sl.`stderr`, + `sample_size` = sl.`sample_size`, + `lag` = sl.`lag`, + `missing_value` = sl.`missing_value`, + `missing_stderr` = sl.`missing_stderr`, + `missing_sample_size` = sl.`missing_sample_size` +""" + +epimetric_latest_load = f""" + INSERT INTO {self.latest_table} + (epimetric_id, signal_key_id, geo_key_id, issue, data_as_of_dt, + time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, + computation_as_of_dt, missing_value, missing_stderr, missing_sample_size) + SELECT + epimetric_id, sd.signal_key_id, gd.geo_key_id, issue, data_as_of_dt, + time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, + computation_as_of_dt, missing_value, missing_stderr, missing_sample_size + FROM `{self.load_table}` sl + INNER JOIN signal_dim sd USING (source, `signal`) + INNER JOIN geo_dim gd USING (geo_type, geo_value) + WHERE is_latest_issue = 1 + ON DUPLICATE KEY UPDATE + `epimetric_id` = sl.`epimetric_id`, + `value_updated_timestamp` = sl.`value_updated_timestamp`, + `value` = sl.`value`, + `stderr` = sl.`stderr`, + `sample_size` = sl.`sample_size`, + `issue` = sl.`issue`, + `lag` = sl.`lag`, + `missing_value` = sl.`missing_value`, + `missing_stderr` = sl.`missing_stderr`, + `missing_sample_size` = sl.`missing_sample_size` +""" + +epimetric_load_delete_processed = f""" + DELETE FROM `{self.load_table}` +""" + + +def add_new_load( + self, + target_table: str, + values: list[str], + not_null_value: str, + execute: bool | None = None, +): + """Add new values from the load table to the target table. + + Values specifies the list of columns as strings; they will be formatted + to contain backticks if they do not already. `not_null_value` gives a + column in `target_table` which has a `NOT NULL` restriction; this means + if it's `NULL`, then that row doesn't exist. The particular column + doesn't matter much, just that it's non-NULL. Likewise, it will be + formatted to contain backticks. + + extra_table and extra values give any foreign + """ + if execute is None: + execute = self.execute + values = add_backticks(values) + not_null_value = add_backticks_single(not_null_value) + add_new_target = f""" + INSERT INTO {target_table} + ({", ".join(values)}) + SELECT DISTINCT load.{", load.".join(values)} + FROM {self.load_table} AS load + LEFT JOIN {target_table} AS target + USING ({", ".join(values)}) + WHERE target.{not_null_value} IS NULL + """ + if execute: + with engine.connect() as conn: + conn.execute(text(add_new_target)) + return add_new_target From 62d4857d6c9460ddaa0fdded36c0501461f7bc4b Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Tue, 3 Sep 2024 16:48:05 -0500 Subject: [PATCH 5/5] functional log tests --- docs/new_endpoint_tutorial.md | 121 +++++----- pyproject.toml | 4 + src/acquisition/wastewater/__init__.py | 3 + src/acquisition/wastewater/nwss_constants.py | 4 + src/acquisition/wastewater/nwss_csv.py | 220 +++++++++--------- src/acquisition/wastewater/nwss_database.py | 44 +++- .../acquisition/wastewater/generating_csvs.py | 68 ++++++ .../wastewater/test_nwss_formatting.py | 177 ++++++++++++++ 8 files changed, 464 insertions(+), 177 deletions(-) create mode 100644 src/acquisition/wastewater/__init__.py create mode 100644 tests/acquisition/wastewater/generating_csvs.py create mode 100644 tests/acquisition/wastewater/test_nwss_formatting.py diff --git a/docs/new_endpoint_tutorial.md b/docs/new_endpoint_tutorial.md index 551bfab97..602c7dfe4 100644 --- a/docs/new_endpoint_tutorial.md +++ b/docs/new_endpoint_tutorial.md @@ -134,6 +134,21 @@ Fairly straightforward. Note that the keys for the related tables need to be add Note that wastewater removes duplicated values with different versions just after adding the key values from the related tables. ### Remove the inserted data from the load table Since the id of the load table is used to set the id of the full and latest tables, it is important not to drop or truncate when deleting these rows, since this would reset the index. +## Logging +Throughout our repos we use [structlog](https://www.structlog.org/en/stable/) +for logging, with some standard configuration constructed by +[`get_structured_logger`](https://github.com/cmu-delphi/covidcast-indicators/blob/1451e0742169e6e8909da4059cb37a17d397145f/_delphi_utils_python/delphi_utils/logger.py#L48). + +## Unit Testing +We need to test that the functions we're adding cover the expected kinds of data, and error appropriately when the data is invalid. To do this, we'll add some unit tests. + +To test the logs generated by this system in unit tests, create a `logger` +object as normal with `get_structured_logger`, and then wrap any lines whose +logs you want to test in `with capture_logs() as cap_logs:`, as done in the +[first example here](https://www.structlog.org/en/latest/testing.html), or in +[the wastewater +endpoint](https://github.com/cmu-delphi/delphi-epidata/tree/511607f1560af0a578ce9fc8b580f4a18ef48916/tests/acquisition/wastewater/test_nwss_formatting.py). + # the data Here's the requirement: we need to quickly surface the most recent "issue" @@ -169,6 +184,59 @@ If for whatever reason you need to add a new dependency TODO 3. edit the existing `/src/server/endpoints/__init__.py` to add the newly-created file to the imports (top) and to the list of endpoints (below). +# add an integration test + +Now that we've changed several files, we need to make sure that the changes +work as intended _before_ submitting code for review or committing code to the +repository. Given that the code spans multiple components and languages, this +needs to be an integration test. See more about integration testing in Delphi's +[frontend development guide](https://github.com/cmu-delphi/operations/blob/main/docs/frontend_development.md#integration). + +Create an integration test for the new endpoint by creating a new file, +`integrations/server/test_fluview_meta.py`. There's a good amount of +boilerplate, but fortunately, this can be copied _almost_ verbatim from the +[`fluview` endpoint integration test](https://github.com/cmu-delphi/delphi-epidata/blob/main/integrations/server/test_fluview.py). + +Include the following pieces: + +- top-level docstring (update name to `fluview_meta`) +- the imports section (no changes needed) +- the test class (update name and docstring for `fluview_meta`) +- the methods `setUpClass`, `setUp`, and `tearDown` (no changes needed) + +Add the following test method, which creates some dummy data, fetches the new +`fluview_meta` endpoint using the Python client library, and asserts that the +returned value is what we expect. + +```python +def test_round_trip(self): + """Make a simple round-trip with some sample data.""" + + # insert dummy data + self.cur.execute(''' + insert into fluview values + (0, "2020-04-07", 202021, 202020, "nat", 1, 2, 3, 4, 3.14159, 1.41421, + 10, 11, 12, 13, 14, 15), + (0, "2020-04-28", 202022, 202022, "hhs1", 5, 6, 7, 8, 1.11111, 2.22222, + 20, 21, 22, 23, 24, 25) + ''') + self.cnx.commit() + + # make the request + response = Epidata.fluview_meta() + + # assert that the right data came back + self.assertEqual(response, { + 'result': 1, + 'epidata': [{ + 'latest_update': '2020-04-28', + 'latest_issue': 202022, + 'table_rows': 2, + }], + 'message': 'success', + }) +``` + # update the client libraries There are currently four client libraries. They all need to be updated to make @@ -251,59 +319,6 @@ Here's what we add to each client: fluview_meta = fluview_meta, ``` -# add an integration test - -Now that we've changed several files, we need to make sure that the changes -work as intended _before_ submitting code for review or committing code to the -repository. Given that the code spans multiple components and languages, this -needs to be an integration test. See more about integration testing in Delphi's -[frontend development guide](https://github.com/cmu-delphi/operations/blob/main/docs/frontend_development.md#integration). - -Create an integration test for the new endpoint by creating a new file, -`integrations/server/test_fluview_meta.py`. There's a good amount of -boilerplate, but fortunately, this can be copied _almost_ verbatim from the -[`fluview` endpoint integration test](https://github.com/cmu-delphi/delphi-epidata/blob/main/integrations/server/test_fluview.py). - -Include the following pieces: - -- top-level docstring (update name to `fluview_meta`) -- the imports section (no changes needed) -- the test class (update name and docstring for `fluview_meta`) -- the methods `setUpClass`, `setUp`, and `tearDown` (no changes needed) - -Add the following test method, which creates some dummy data, fetches the new -`fluview_meta` endpoint using the Python client library, and asserts that the -returned value is what we expect. - -```python -def test_round_trip(self): - """Make a simple round-trip with some sample data.""" - - # insert dummy data - self.cur.execute(''' - insert into fluview values - (0, "2020-04-07", 202021, 202020, "nat", 1, 2, 3, 4, 3.14159, 1.41421, - 10, 11, 12, 13, 14, 15), - (0, "2020-04-28", 202022, 202022, "hhs1", 5, 6, 7, 8, 1.11111, 2.22222, - 20, 21, 22, 23, 24, 25) - ''') - self.cnx.commit() - - # make the request - response = Epidata.fluview_meta() - - # assert that the right data came back - self.assertEqual(response, { - 'result': 1, - 'epidata': [{ - 'latest_update': '2020-04-28', - 'latest_issue': 202022, - 'table_rows': 2, - }], - 'message': 'success', - }) -``` - # write documentation This consists of two steps: add a new document for the `fluview_meta` endpoint, diff --git a/pyproject.toml b/pyproject.toml index a4399ca9b..42e720988 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,3 +24,7 @@ include = 'server,tests/server' [tool.pylint.'DESIGN'] ignored-argument-names = ['(_.*|run_as_module)'] +[tool.pytest.ini_options] +pythonpath = [ + ".", "src" +] diff --git a/src/acquisition/wastewater/__init__.py b/src/acquisition/wastewater/__init__.py new file mode 100644 index 000000000..5891a6bdd --- /dev/null +++ b/src/acquisition/wastewater/__init__.py @@ -0,0 +1,3 @@ +from os import path + +WASTEWATER_DDL = path.join(path.dirname(__file__), "../../ddl/wastewater.sql") diff --git a/src/acquisition/wastewater/nwss_constants.py b/src/acquisition/wastewater/nwss_constants.py index 2eb228bc7..83f146096 100644 --- a/src/acquisition/wastewater/nwss_constants.py +++ b/src/acquisition/wastewater/nwss_constants.py @@ -1,5 +1,9 @@ import pandas as pd +from os import path + +WASTEWATER_DDL = path.join(path.dirname(__file__), "../../ddl/wastewater.sql") + CONCENTRATION_SIGNALS = ["pcr_conc_lin"] METRIC_SIGNALS = ["detect_prop_15d", "percentile", "ptc_15d", "population_served"] diff --git a/src/acquisition/wastewater/nwss_csv.py b/src/acquisition/wastewater/nwss_csv.py index f9c299d93..dd9219f7a 100644 --- a/src/acquisition/wastewater/nwss_csv.py +++ b/src/acquisition/wastewater/nwss_csv.py @@ -27,7 +27,7 @@ ) -def key_plot_id_parse(key_plot_ids: pd.Series) -> pd.DataFrame: +def key_plot_id_parse(key_plot_ids: pd.Series, logger: Logger) -> pd.DataFrame: """Given a list of key plot id's, extract the various sub-values. The Dataframe returned has the following columns @@ -46,11 +46,11 @@ def key_plot_id_parse(key_plot_ids: pd.Series) -> pd.DataFrame: # every variable is separated by `_`, though some may also contain them processed_names = key_plot_ids.str.extract( ( - r"(?P.+)_" # provider, anything before the next one "NWSS" + r"(?P.+)_" # provider, glues together everything before the next 4 r"(?P[a-z]{2})_" # state, exactly two letters "mi" - r"(?P[0-9]+)_" # an id, at least one digits "1040" + r"(?P[-,0-9]+)_" # an id, at least one digits "1040" r"(?P[^_]+)_" # either "Before treatment plant" or "Treatment plant" - r"(?P[0-9]*)_*" # a number, may be missing (thus _*) + r"(?P[-,0-9]*)_*" # a number, may be missing (thus _*) r"(?P.*)" # one of 4 values: # 'post grit removal', 'primary effluent', 'primary sludge', 'raw wastewater' ) @@ -66,9 +66,90 @@ def key_plot_id_parse(key_plot_ids: pd.Series) -> pd.DataFrame: processed_names = processed_names.set_index("key_plot_id") processed_names = processed_names.astype(KEY_PLOT_TYPES) # TODO warnings/errors when things don't parse + validate_key_plot_id_assumptions(processed_names, logger) return processed_names +def validate_key_plot_id_assumptions( + processed_names: pd.DataFrame, logger: Logger +) -> None: + """Check that the values in the key_plot_id fit our assuptions.""" + # wwtp_jurisdiction is validated alongside the metric keys + + # wwtp_id + processed_names = processed_names.reset_index() + + if any(processed_names.wwtp_id < 0): + negative_key_plot_ids = processed_names.loc[ + processed_names.wwtp_id < 0, "key_plot_id" + ] + logger.error( + "the wwtp_id's have negative values", key_plot_ids=negative_key_plot_ids + ) + if any(processed_names.wwtp_id.isna()): + wwtp_is_na = processed_names.loc[processed_names.wwtp_id.isna(), "key_plot_id"] + logger.error("the wwtp_id's have NA values", key_plot_ids=wwtp_is_na) + if any(processed_names.wwtp_id > 10_000): + wwtp_is_too_big = processed_names.loc[ + processed_names.wwtp_id > 10_000, "key_plot_id" + ] + logger.warn("wwtp_id values are over 10,000", key_plot_ids=wwtp_is_too_big) + + # sample_location is actually a boolean + sample_categories = processed_names.sample_location.cat.categories + if len(sample_categories) > 2 or not all( + processed_names.sample_location.cat.categories.isin( + np.array(["Before treatment plant", "Treatment plant"]) + ) + ): + logger.error( + "There is a new value for sample location", + sample_locations=sample_categories, + ) + + # sample location not defined for treatment plants + treatment_plants_mismatched = processed_names.query( + "sample_location=='Treatment plant' and sample_location_specify > 0", + engine="python", + ) + if treatment_plants_mismatched.size > 0: + logger.error( + "There are samples at treatment plants which have the sample location specified", + key_plot_ids=treatment_plants_mismatched.key_plot_id, + ) + + # sample location is defined for before treatment plants + before_treatment_plants_unnumbered = processed_names.query( + "sample_location=='Before treatment plant' and sample_location_specify < 0", + engine="python", + ) + if before_treatment_plants_unnumbered.size > 0: + logger.error( + "There are samples before treatment plants which don't have the sample location specified", + key_plot_ids=before_treatment_plants_unnumbered.key_plot_id, + ) + + # sample location is never smaller than -1 + if any(processed_names.sample_location_specify < -1): + too_negative = processed_names.loc[ + processed_names.sample_location_specify < -1, "key_plot_id" + ] + logger.error( + "sample_location_specify has an unexpected value", + key_plot_ids=too_negative, + ) + + # sample location is never larger than 10,000 + if any(processed_names.sample_location_specify > 10_000): + too_big = processed_names.loc[ + processed_names.sample_location_specify > 10_000, "key_plot_id" + ] + logger.warn( + "sample_location_specify values are over 10,000", + key_plot_ids=too_big, + ) + + def validate_metric_key_plot_ids( df_metric: pd.DataFrame, metric_keys: pd.DataFrame, logger: Logger ) -> None: @@ -145,8 +226,24 @@ def download_raw_data(token: str, logger: Logger) -> tuple[pd.DataFrame, pd.Data return df_concentration, df_metric +def pull_from_file( + file_path_conc, file_path_metric, logger: Logger +) -> tuple[pd.DataFrame, pd.DataFrame]: + """Pull from local files instead. + + The files are assumed to be a pandas dataframe, saved using the `to_parquet` + method like `df_metric.to_parquet(file_name_metric, compression="gzip")`. + """ + df_concentration = pd.read_parquet(file_path_conc) + df_metric = pd.read_parquet(file_path_metric) + df_concentration = convert_df_type(df_concentration, TYPE_DICT_CONC, logger) + df_metric = convert_df_type(df_metric, TYPE_DICT_METRIC, logger) + df_metric = df_metric.rename(columns={"date_end": "date"}) + return df_concentration, df_metric + + def format_nwss_data( - df_concentration: pd.DataFrame, df_metric: pd.DataFrame + df_concentration: pd.DataFrame, df_metric: pd.DataFrame, logger: Logger ) -> tuple[pd.DataFrame, pd.DataFrame]: """Helper to pull_nwss_data, mainly separated to ease debugging without pulling.""" # pull out the auxiliary_info first so we can drop it @@ -184,7 +281,9 @@ def format_nwss_data( if na_columns.size != 0: logger.info("There are columns without normalization.", na_columns=na_columns) - conc_keys = key_plot_id_parse(pd.Series(df_concentration.key_plot_id.unique())) + conc_keys = key_plot_id_parse( + pd.Series(df_concentration.key_plot_id.unique()), logger + ) # get the keys+normalizations where there's an unambiguous choice of normalization key_plot_norms = df_concentration.loc[ :, ["key_plot_id", "normalization"] @@ -205,7 +304,7 @@ def format_nwss_data( how="left", ) conc_keys.loc[conc_keys.normalization.isna(), "normalization"] = "unknown" - metric_keys = key_plot_id_parse(pd.Series(df_metric.key_plot_id.unique())) + metric_keys = key_plot_id_parse(pd.Series(df_metric.key_plot_id.unique()), logger) validate_metric_key_plot_ids(df_metric, metric_keys, logger) # form the joint table of keys found in both, along with a column # identifying which direction there is missingness @@ -358,110 +457,3 @@ def pull_nwss_data(token: str, logger: Logger) -> tuple[pd.DataFrame, pd.DataFra df["pathogen"] = "COVID" df["value_updated_timestamp"] = pd.Timestamp("now") return df, auxiliary_info - - -from sqlalchemy import create_engine -from sqlalchemy import text - - -for table_name in inspector.get_table_names(): - for column in inspector.get_columns(table_name): - print("Column: %s" % column["name"]) - -with database.engine.connect() as conn: - res = conn.execute(text("show tables;")) - print(pd.DataFrame(res.all())) - -# accident -with database.engine.begin() as conn: - conn.execute(text("DROP TABLE wastewater_granular")) - -with database.engine.connect() as conn: - result = conn.execute(text("SELECT * FROM wastewater_granular_load")) - for row in result: - print(row) - -with database.engine.connect() as conn: - result = conn.execute(text("SELECT * FROM `signal_dim`")) - print(result.keys()) - for row in result: - print(row) - -with database.engine.connect() as conn: - result = conn.execute(text("SELECT * FROM `plant_dim`")) - print(result.keys()) - print(pd.DataFrame(result)) -stmt = """ -SELECT ld.source, ld.signal, ld.pathogen, ld.provider, ld.normalization, td.source, td.signal, td.pathogen, td.provider, td.normalization -FROM `wastewater_granular_load` AS ld -LEFT JOIN `signal_dim` AS td - USING (`source`, `signal`, `pathogen`, `provider`, `normalization`) -""" -# WHERE td.`source` IS NULL -print(stmt) -with database.engine.connect() as conn: - result = conn.execute(text(stmt)) - print(result.keys()) - for row in result: - print(row) - -with database.engine.connect() as conn: - result = conn.execute(text("SELECT * FROM `sample_site_dim`")) - print(result.keys()) - for row in result: - print(row) -toClaa = """ -INSERT INTO plant_dim (`wwtp_jurisdiction`, `wwtp_id`) - SELECT DISTINCT ld.wwtp_jurisdiction, ld.wwtp_id - FROM `wastewater_granular_load` AS ld - LEFT JOIN `plant_dim` AS td - USING (`wwtp_jurisdiction`, `wwtp_id`) - WHERE td.`wwtp_id` IS NULL -""" -with database.engine.connect() as conn: - result = conn.execute(text(toClaa)) -with database.engine.connect() as conn: - result = conn.execute(text("SELECT * FROM `plant_dim`")) - print(result.keys()) - for row in result: - print(row) - -toCall = f""" - SELECT DISTINCT ld.sample_loc_specify, ld.sample_method, ft.plant_id - FROM `wastewater_granular_load` AS ld - LEFT JOIN `sample_site_dim` AS td - USING (`sample_loc_specify`, `sample_method`) - LEFT JOIN `plant_dim` AS ft - USING (`wwtp_jurisdiction`, `wwtp_id`) - WHERE td.`sample_method` IS NULL -""" -print(toCall) -with database.engine.connect() as conn: - result = conn.execute(text(toCall)) - print(result.cursor.description) - print(result.keys()) - for row in result: - print(row) - -df_tiny = df.sample(n=20, random_state=1) -df_tiny2 = df.sample(n=20, random_state=2) -df_tiny -df_tiny.to_sql( - name=["wastewater_granular_full", "plant_dim"], - con=engine, - if_exists="append", - method="multi", - index=False, -) - - -# roughly what needs to happen (unsure which language/where): -# 1. SELECT * FROM latest -# 2. compare and drop if it's redundant -# 3. assign the various id's (I think this may be handled by the load datatable) -# 4. split into seprate tables -# 5. INSERT the new rows -# 6? aggregate to state level (maybe from load,) -# 7. delete load -# Ok but actually, I will also be writing a separate aux table, that shouldn't need a separate load table and will happen separately -# Also, I am going to need to dedupe, which I guess can happen during the join diff --git a/src/acquisition/wastewater/nwss_database.py b/src/acquisition/wastewater/nwss_database.py index 18dee1bd8..9365af056 100644 --- a/src/acquisition/wastewater/nwss_database.py +++ b/src/acquisition/wastewater/nwss_database.py @@ -1,3 +1,4 @@ +from os import path import pandas as pd from sqlalchemy import Date, create_engine @@ -7,7 +8,10 @@ from sqlalchemy import MetaData from logging import Logger import sqlalchemy -from .nwss_constants import SIG_DIGITS +from .nwss_constants import WASTEWATER_DDL, SIG_DIGITS + +# TODO temp +# WASTEWATER_DDL = "/home/dsweber/allHail/delphi/epidataLocalBuild/driver/repos/delphi/delphi-epidata/src/ddl/wastewater.sql" # TODO add a bunch of try catches and logger stuff @@ -26,6 +30,14 @@ def subtract_backticks(string_array: list[str]): return string_array +def subtract_backticks_single(string): + if string.endswith("`"): + string = string[:-1] + if string.startswith("`"): + string = string[1:] + return string + + def add_backticks_single(string): if not string.endswith("`"): string = string + "`" @@ -37,7 +49,7 @@ def add_backticks_single(string): def sql_rel_equal(x, y, tol): """Are x and y are within tol of one another? - `x` and `y` are expected to already have backticks. + `x` and `y` are expected to already have backticks, and tol is a positive base 10 exponent. """ within_tolerance = ( f"(ABS({x} - {y}) / GREATEST(ABS({x}), ABS({y})) < {10**(-tol)}) " @@ -125,6 +137,7 @@ def standard_run(self, df: pd.DataFrame): self.delete_load() def delete_redundant(self, execute=None): + """Deletes entries which are within tolerance of the corresponding version in latest.""" # filtering out cases where: # 2. both values are null # 3. the values are approximately equal @@ -145,6 +158,11 @@ def delete_redundant(self, execute=None): conn.execute(text(delete_irrelevant_version_sql)) return delete_irrelevant_version_sql + def delete_redundant_backfill(self, execute=None): + """Remove redundant versions, comparing against everything and not just latest.""" + self.logger.error("not yet implemented!") + pass + def set_latest_flag(self, execute=None): """Drop repeated values,and set the `is_latest_version` bit for insertion into latest.""" # 1. version is older than the latest (unlikely to happen) @@ -259,17 +277,13 @@ def add_new_from_load( conn.execute(text(add_load_ids)) return add_new_target, add_load_ids - def read_table(self, table_name, max_entries: int | None = None): - table_name = add_backticks_single(table_name) + def read_table(self, table_name, max_entries: int | None = None, **kwargs): if max_entries is None: - statement = f"SELECT * FROM {table_name}" + statement = subtract_backticks_single(table_name) else: + table_name = add_backticks_single(table_name) statement = f"SELECT top {max_entries} FROM `{table_name}`" - res = None - with self.engine.connect() as conn: - res = conn.execute(text(statement)) - res = pd.DataFrame(res) - return res + return pd.read_sql(sql=statement, con=self.engine, **kwargs) def add_full_latest(self, target_table, execute: bool | None = None): if execute is None: @@ -315,3 +329,13 @@ def delete_load(self, execute=None): with self.engine.begin() as conn: res = conn.execute(text(statement)) return statement + + def build_database(self, target_ddl=WASTEWATER_DDL): + """Strictly for testing purposes.""" + + with self.engine.begin() as conn: + with open(target_ddl) as file: + ddl = file.read() + ddl = ddl.split("\n", maxsplit=1)[1] + res = conn.execute(text(ddl)) + return res diff --git a/tests/acquisition/wastewater/generating_csvs.py b/tests/acquisition/wastewater/generating_csvs.py new file mode 100644 index 000000000..ee48d4fd0 --- /dev/null +++ b/tests/acquisition/wastewater/generating_csvs.py @@ -0,0 +1,68 @@ +# these don't need to be run, but are kept as a recording of how these csv's were generated +import pandas as pd +import os + +test_data = "../../../testdata/acquisition/wastewater/" +df_read_first = pd.read_csv( + os.path.join(test_data, "nwss_test_data.csv"), + index_col=0, + parse_dates=["reference_date", "version", "value_updated_timestamp"], +) +# as if written 2 days later, with 5 entries that ~ correspond to the same days +# but varying changes to the values and versions +df_read_second = pd.read_csv( + os.path.join(test_data, "nwss_test_data_second_round.csv"), + index_col=0, + parse_dates=["reference_date", "version", "value_updated_timestamp"], +) + + +signal_columns = ["source", "signal", "pathogen", "provider", "normalization"] +first_signal_values = ( + df_read_first.loc[:, signal_columns] + .drop_duplicates() + .sort_values(by=signal_columns) +) +first_signal_values.to_csv( + os.path.join(test_data, "first_signal_values.csv"), index=False +) +second_signal_values = ( + df_read_second.loc[:, signal_columns] + .drop_duplicates() + .sort_values(by=signal_columns) +) +# this is a hack to get a set difference (removing the values in first_signal_values from second_signal_values) +only_new = pd.concat( + [ + second_signal_values, + first_signal_values, + first_signal_values, + ] +).drop_duplicates(keep=False) +only_new.to_csv(os.path.join(test_data, "added_signal_values.csv"), index=False) + + +# similar idea, but for the rows in site join plant +site_columns = [ + "wwtp_jurisdiction", + "wwtp_id", + "sample_loc_specify", + "sample_method", +] +first_site_values = ( + df_read_first.loc[:, site_columns].drop_duplicates().sort_values(by=site_columns) +) +first_site_values.to_csv(os.path.join(test_data, "first_site_values.csv"), index=False) +second_site_values = ( + df_read_second.loc[:, site_columns].drop_duplicates().sort_values(by=site_columns) +) +# this is a hack to get a set difference (removing the values in first_site_values from second_site_values) +only_new = pd.concat( + [ + second_site_values, + first_site_values, + first_site_values, + ] +).drop_duplicates(keep=False) +only_new.to_csv(os.path.join(test_data, "added_site_values.csv"), index=False) +df_read_first.dtypes diff --git a/tests/acquisition/wastewater/test_nwss_formatting.py b/tests/acquisition/wastewater/test_nwss_formatting.py new file mode 100644 index 000000000..fdab1605b --- /dev/null +++ b/tests/acquisition/wastewater/test_nwss_formatting.py @@ -0,0 +1,177 @@ +import pytest +from structlog.testing import capture_logs +from delphi_utils import get_structured_logger + +from pandas.testing import assert_frame_equal +from acquisition.wastewater.nwss_csv import ( + format_nwss_data, + key_plot_id_parse, + pull_from_file, +) +import pandas as pd + + +@pytest.fixture(name="logger") +def fixture_logger(tmp_path): + logger = get_structured_logger(filename=tmp_path / "log_output.txt") + return logger + + +class TestKeyPlotIdFormatting: + def test_typical_key_plot_id_input(self, logger): + """A couple of examples of typical behavior.""" + typical_examples = pd.Series( + [ + "NWSS_mi_889_Before treatment plant_122_raw wastewater", + "NWSS_wi_2528_Before treatment plant_636_raw wastewater", + "WWS_fl_2359_Before treatment plant_553_post grit removal", + "NWSS_il_606_Treatment plant_raw wastewater", # no sample loc specify + "CDC_VERILY_la_1651_Treatment plant_post grit removal", # verily + ] + ) + result = pd.DataFrame( + data={ + "key_plot_id": typical_examples, + "provider": ["NWSS", "NWSS", "WWS", "NWSS", "CDC_VERILY"], + "wwtp_jurisdiction": ["mi", "wi", "fl", "il", "la"], + "wwtp_id": [889, 2528, 2359, 606, 1651], + "sample_location": [ + "Before treatment plant", + "Before treatment plant", + "Before treatment plant", + "Treatment plant", + "Treatment plant", + ], + "sample_location_specify": [122, 636, 553, -1, -1], + "sample_method": [ + "raw wastewater", + "raw wastewater", + "post grit removal", + "raw wastewater", + "post grit removal", + ], + } + ) + result = result.set_index("key_plot_id") + result = result.astype( + dtype={ + "provider": "category", + "wwtp_jurisdiction": "category", + "wwtp_id": "Int32", + "sample_location": "category", + "sample_location_specify": "Int32", + "sample_method": "category", + } + ) + assert_frame_equal(result, key_plot_id_parse(typical_examples, logger)) + + def test_edge_key_plot_id_input(self, logger): + """Examples that might break the regex but shouldn't.""" + examples = pd.Series( + [ + "Some_name_with_lots_of_spaces_wa_88_Before treatment plant_348_raw wastewater", + "provider_is_broken_va_258_Treatment plant_raw wastewater", + ] + ) + expected_output = pd.DataFrame( + data={ + "key_plot_id": examples, + "provider": ["Some_name_with_lots_of_spaces", "provider_is_broken"], + "wwtp_jurisdiction": ["wa", "va"], + "wwtp_id": [88, 258], + "sample_location": ["Before treatment plant", "Treatment plant"], + "sample_location_specify": [348, -1], + "sample_method": ["raw wastewater", "raw wastewater"], + } + ) + expected_output = expected_output.set_index("key_plot_id") + expected_output = expected_output.astype( + dtype={ + "provider": "category", + "wwtp_jurisdiction": "category", + "wwtp_id": "Int32", + "sample_location": "category", + "sample_location_specify": "Int32", + "sample_method": "category", + } + ) + expected_output.dtypes + processed_names = key_plot_id_parse(examples, logger) + assert_frame_equal(expected_output, key_plot_id_parse(examples, logger)) + + ########### breaking cases ############# + def test_treatment_plant_numbering(self, logger, tmp_path): + """tests for sample location numbering""" + # shouldn't have a number for a treatment plant + examples = pd.Series( + [ + "NWSS_wa_88_Treatment plant_348_raw wastewater", + "NWSS_wa_88_Maybe treatment plant_348_raw wastewater", + "NWSS_wa_88_Before treatment plant_-1_raw wastewater", + "NWSS_wa_88_Treatment plant_-10_raw wastewater", + ], + name="key_plot_id", + ) + with capture_logs() as cap_logs: + key_plot_id_parse(examples, logger) + + assert ( + cap_logs[0]["sample_locations"] + == pd.Index( + ["Before treatment plant", "Maybe treatment plant", "Treatment plant"], + dtype="object", + ) + ).all() + assert cap_logs[0]["event"] == "There is a new value for sample location" + assert cap_logs[0]["log_level"] == "error" + + # treatment plants with sample location specified when they shouldn't + assert (cap_logs[1]["key_plot_ids"] == examples[0:1]).all() + assert ( + cap_logs[1]["event"] + == "There are samples at treatment plants which have the sample location specified" + ) + assert cap_logs[1]["log_level"] == "error" + + # the opposite case: unspecified sample location when it's before the + # treatment plant + assert (cap_logs[2]["key_plot_ids"] == examples[2:3]).all() + assert ( + cap_logs[2]["event"] + == "There are samples before treatment plants which don't have the sample location specified" + ) + assert cap_logs[2]["log_level"] == "error" + + # too negative + assert (cap_logs[3]["key_plot_ids"] == examples[3:4]).all() + assert cap_logs[3]["event"] == "sample_location_specify has an unexpected value" + assert cap_logs[3]["log_level"] == "error" + + def test_wwtp_id_assumptions(self, logger, tmp_path): + """Negative, missing, or too large wwtp_id's""" + # shouldn't have a number for a treatment plant + examples = pd.Series( + [ + "NWSS_wa_-88_Before treatment plant_348_raw wastewater", + f"NWSS_wa_{10_001}_Before treatment plant_348_raw wastewater", + "NWSS_wa_NA_Before treatment plant_348_raw wastewater", + ], + name="key_plot_id", + ) + with capture_logs() as cap_logs: + key_plot_id_parse(examples, logger) + + # negative values + assert (examples[0:1] == cap_logs[0]["key_plot_ids"]).all() + assert cap_logs[0]["event"] == "the wwtp_id's have negative values" + assert cap_logs[0]["log_level"] == "error" + + # NA value + assert (examples[2:3] == cap_logs[1]["key_plot_ids"]).all() + assert cap_logs[1]["event"] == "the wwtp_id's have NA values" + assert cap_logs[1]["log_level"] == "error" + + # too large (warning only) + assert (examples[1:2] == cap_logs[2]["key_plot_ids"]).all() + assert cap_logs[2]["event"] == "wwtp_id values are over 10,000" + assert cap_logs[2]["log_level"] == "warning"