From c5c1989a127fb57f332c49e5a72f4fe2c4ae35ef Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Fri, 7 Oct 2022 13:57:52 -0700 Subject: [PATCH 01/17] Server: add CovidcastRow helper class for testing * no default values * helper functions for creating rows --- integrations/server/test_covidcast.py | 20 +- src/acquisition/covidcast/covidcast_row.py | 241 ++++++++++++++++++ src/acquisition/covidcast/database.py | 12 +- .../covidcast/test_covidcast_row.py | 103 ++++++++ 4 files changed, 357 insertions(+), 19 deletions(-) create mode 100644 src/acquisition/covidcast/covidcast_row.py create mode 100644 tests/acquisition/covidcast/test_covidcast_row.py diff --git a/integrations/server/test_covidcast.py b/integrations/server/test_covidcast.py index bcca3b199..b69a68b06 100644 --- a/integrations/server/test_covidcast.py +++ b/integrations/server/test_covidcast.py @@ -77,11 +77,11 @@ def _insert_placeholder_set_four(self): def _insert_placeholder_set_five(self): rows = [ - self._make_placeholder_row(time_value=2000_01_01, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03+i)[0] + CovidcastRow(time_value=2000_01_01, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03+i) for i in [1, 2, 3] ] + [ # different time_values, same issues - self._make_placeholder_row(time_value=2000_01_01+i-3, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03+i-3)[0] + CovidcastRow(time_value=2000_01_01+i-3, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03+i-3) for i in [4, 5, 6] ] self._insert_rows(rows) @@ -254,18 +254,16 @@ def test_time_values_wildcard(self): # insert placeholder data rows = self._insert_placeholder_set_three() - expected_time_values = [ - self.expected_from_row(r) for r in rows[:3] - ] + expected = [row.as_dict(ignore_fields=IGNORE_FIELDS) for row in rows[:3]] # make the request - response, _ = self.request_based_on_row(rows[0], time_values="*") + response = self.request_based_on_row(rows[0], time_values="*") self.maxDiff = None # assert that the right data came back self.assertEqual(response, { 'result': 1, - 'epidata': expected_time_values, + 'epidata': expected, 'message': 'success', }) @@ -274,18 +272,16 @@ def test_issues_wildcard(self): # insert placeholder data rows = self._insert_placeholder_set_five() - expected_issues = [ - self.expected_from_row(r) for r in rows[:3] - ] + expected = [row.as_dict(ignore_fields=IGNORE_FIELDS) for row in rows[:3]] # make the request - response, _ = self.request_based_on_row(rows[0], issues="*") + response = self.request_based_on_row(rows[0], issues="*") self.maxDiff = None # assert that the right data came back self.assertEqual(response, { 'result': 1, - 'epidata': expected_issues, + 'epidata': expected, 'message': 'success', }) diff --git a/src/acquisition/covidcast/covidcast_row.py b/src/acquisition/covidcast/covidcast_row.py new file mode 100644 index 000000000..c5b2973f1 --- /dev/null +++ b/src/acquisition/covidcast/covidcast_row.py @@ -0,0 +1,241 @@ +from dataclasses import asdict, dataclass, fields +from datetime import date +from typing import Any, ClassVar, Dict, Iterable, List, Optional + +import pandas as pd +from delphi_utils import Nans + +from delphi.epidata.server.utils.dates import day_to_time_value, time_value_to_day +from delphi.epidata.server.endpoints.covidcast_utils.model import PANDAS_DTYPES + + +@dataclass +class CovidcastRow: + """A container for the values of a single covidcast database row. + + Used for: + - inserting rows into the database + - creating test rows with default fields for testing + - converting from and to formats (dict, csv, df, kwargs) + - creating consistent views, with consistent data types (dict, csv, df) + + The rows are specified in 'v4_schema.sql'. The datatypes are made to match database. When writing to Pandas, the dtypes match the JIT model.py schema. + """ + + # Arguments. + source: str + signal: str + time_type: str + geo_type: str + time_value: int + geo_value: str + value: float + stderr: float + sample_size: float + # The following three fields are Nans enums from delphi_utils.nans. + missing_value: int + missing_stderr: int + missing_sample_size: int + issue: Optional[int] + lag: Optional[int] + # The following four fields are only the database, but are not ingested at acquisition and not returned by the API. + id: Optional[int] + direction: Optional[int] + direction_updated_timestamp: int + value_updated_timestamp: int + + # Classvars. + _db_row_ignore_fields: ClassVar = [] + _api_row_ignore_fields: ClassVar = ["id", "direction_updated_timestamp", "value_updated_timestamp"] + _api_row_compatibility_ignore_fields: ClassVar = ["id", "direction_updated_timestamp", "value_updated_timestamp", "source", "time_type", "geo_type"] + _pandas_dtypes: ClassVar = PANDAS_DTYPES + + @staticmethod + def make_default_row(**kwargs) -> "CovidcastRow": + default_args = { + "source": "src", + "signal": "sig", + "time_type": "day", + "geo_type": "county", + "time_value": 20200202, + "geo_value": "01234", + "value": 10.0, + "stderr": 10.0, + "sample_size": 10.0, + "missing_value": Nans.NOT_MISSING.value, + "missing_stderr": Nans.NOT_MISSING.value, + "missing_sample_size": Nans.NOT_MISSING.value, + "issue": 20200202, + "lag": 0, + "id": None, + "direction": None, + "direction_updated_timestamp": 0, + "value_updated_timestamp": 20200202, + } + default_args.update(kwargs) + return CovidcastRow(**default_args) + + def __post_init__(self): + # Convert time values to ints by default. + self.time_value = day_to_time_value(self.time_value) if isinstance(self.time_value, date) else self.time_value + self.issue = day_to_time_value(self.issue) if isinstance(self.issue, date) else self.issue + self.value_updated_timestamp = day_to_time_value(self.value_updated_timestamp) if isinstance(self.value_updated_timestamp, date) else self.value_updated_timestamp + + def _sanity_check_fields(self, extra_checks: bool = True): + if self.issue and self.issue < self.time_value: + self.issue = self.time_value + + if self.issue: + self.lag = (time_value_to_day(self.issue) - time_value_to_day(self.time_value)).days + else: + self.lag = None + + # This sanity checking is already done in CsvImporter, but it's here so the testing class gets it too. + if pd.isna(self.value) and self.missing_value == Nans.NOT_MISSING: + self.missing_value = Nans.NOT_APPLICABLE.value if extra_checks else Nans.OTHER.value + + if pd.isna(self.stderr) and self.missing_stderr == Nans.NOT_MISSING: + self.missing_stderr = Nans.NOT_APPLICABLE.value if extra_checks else Nans.OTHER.value + + if pd.isna(self.sample_size) and self.missing_sample_size == Nans.NOT_MISSING: + self.missing_sample_size = Nans.NOT_APPLICABLE.value if extra_checks else Nans.OTHER.value + + return self + + def as_dict(self, ignore_fields: Optional[List[str]] = None) -> dict: + d = asdict(self) + if ignore_fields: + for key in ignore_fields: + del d[key] + return d + + def as_dataframe(self, ignore_fields: Optional[List[str]] = None) -> pd.DataFrame: + df = pd.DataFrame.from_records([self.as_dict(ignore_fields=ignore_fields)]) + # This is to mirror the types in model.py. + df = set_df_dtypes(df, self._pandas_dtypes) + return df + + @property + def api_row_df(self) -> pd.DataFrame: + """Returns a dataframe view into the row with the fields returned by the API server.""" + return self.as_dataframe(ignore_fields=self._api_row_ignore_fields) + + @property + def api_compatibility_row_df(self) -> pd.DataFrame: + """Returns a dataframe view into the row with the fields returned by the old API server (the PHP server).""" + return self.as_dataframe(ignore_fields=self._api_row_compatibility_ignore_fields) + + @property + def db_row_df(self) -> pd.DataFrame: + """Returns a dataframe view into the row with the fields returned by an all-field database query.""" + return self.as_dataframe(ignore_fields=self._db_row_ignore_fields) + + @property + def signal_pair(self): + return f"{self.source}:{self.signal}" + + @property + def geo_pair(self): + return f"{self.geo_type}:{self.geo_value}" + + @property + def time_pair(self): + return f"{self.time_type}:{self.time_value}" + + +def covidcast_rows_from_args(sanity_check: bool = True, test_mode: bool = True, **kwargs: Dict[str, Iterable]) -> List[CovidcastRow]: + """A convenience constructor. + + Handy for constructing batches of test cases. + + Example: + covidcast_rows_from_args(value=[1, 2, 3], time_value=[1, 2, 3]) will yield + [CovidcastRow.make_default_row(value=1, time_value=1), CovidcastRow.make_default_row(value=2, time_value=2), CovidcastRow.make_default_row(value=3, time_value=3)] + with all the defaults from CovidcastRow. + """ + # If any iterables were passed instead of lists, convert them to lists. + kwargs = {key: list(value) for key, value in kwargs.items()} + # All the arg values must be lists of the same length. + assert len(set(len(lst) for lst in kwargs.values())) == 1 + + if sanity_check: + return [CovidcastRow.make_default_row(**_kwargs)._sanity_check_fields(extra_checks=test_mode) for _kwargs in transpose_dict(kwargs)] + else: + return [CovidcastRow.make_default_row(**_kwargs) for _kwargs in transpose_dict(kwargs)] + + +def covidcast_rows_from_records(records: Iterable[dict], sanity_check: bool = False) -> List[CovidcastRow]: + """A convenience constructor. + + Default is different from from_args, because from_records is usually called on faux-API returns in tests, + where we don't want any values getting default filled in. + + You can use csv.DictReader before this to read a CSV file. + """ + records = list(records) + return [CovidcastRow.make_default_row(**record) if not sanity_check else CovidcastRow.make_default_row(**record)._sanity_check_fields() for record in records] + + +def covidcast_rows_as_dicts(rows: Iterable[CovidcastRow], ignore_fields: Optional[List[str]] = None) -> List[dict]: + return [row.as_dict(ignore_fields=ignore_fields) for row in rows] + + +def covidcast_rows_as_dataframe(rows: Iterable[CovidcastRow], ignore_fields: Optional[List[str]] = None) -> pd.DataFrame: + if ignore_fields is None: + ignore_fields = [] + + columns = [field.name for field in fields(CovidcastRow) if field.name not in ignore_fields] + + if rows: + df = pd.concat([row.as_dataframe(ignore_fields=ignore_fields) for row in rows], ignore_index=True) + return df[columns] + else: + return pd.DataFrame(columns=columns) + + +def covidcast_rows_as_api_row_df(rows: Iterable[CovidcastRow]) -> pd.DataFrame: + return covidcast_rows_as_dataframe(rows, ignore_fields=CovidcastRow._api_row_ignore_fields) + + +def covidcast_rows_as_api_compatibility_row_df(rows: Iterable[CovidcastRow]) -> pd.DataFrame: + return covidcast_rows_as_dataframe(rows, ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) + + +def covidcast_rows_as_db_row_df(rows: Iterable[CovidcastRow]) -> pd.DataFrame: + return covidcast_rows_as_dataframe(rows, ignore_fields=CovidcastRow._db_row_ignore_fields) + + +def transpose_dict(d: Dict[Any, List[Any]]) -> List[Dict[Any, Any]]: + """Given a dictionary whose values are lists of the same length, turn it into a list of dictionaries whose values are the individual list entries. + + Example: + >>> transpose_dict(dict([["a", [2, 4, 6]], ["b", [3, 5, 7]], ["c", [10, 20, 30]]])) + [{"a": 2, "b": 3, "c": 10}, {"a": 4, "b": 5, "c": 20}, {"a": 6, "b": 7, "c": 30}] + """ + return [dict(zip(d.keys(), values)) for values in zip(*d.values())] + + +def check_valid_dtype(dtype): + try: + pd.api.types.pandas_dtype(dtype) + except TypeError: + raise ValueError(f"Invalid dtype {dtype}") + + +def set_df_dtypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame: + """Set the dataframe column datatypes.""" + [check_valid_dtype(d) for d in dtypes.values()] + + df = df.copy() + for k, v in dtypes.items(): + if k in df.columns: + df[k] = df[k].astype(v) + return df + + +def assert_frame_equal_no_order(df1: pd.DataFrame, df2: pd.DataFrame, index: List[str], **kwargs: Any) -> None: + """Assert that two DataFrames are equal, ignoring the order of rows.""" + # Remove any existing index. If it wasn't named, drop it. Set a new index and sort it. + df1 = df1.reset_index().drop(columns="index").set_index(index).sort_index() + df2 = df2.reset_index().drop(columns="index").set_index(index).sort_index() + pd.testing.assert_frame_equal(df1, df2, **kwargs) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index d21a27c35..cdc0dc959 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -2,20 +2,18 @@ See src/ddl/covidcast.sql for an explanation of each field. """ +import threading +from math import ceil +from multiprocessing import cpu_count +from queue import Queue, Empty +from typing import List # third party import json import mysql.connector -import numpy as np -from math import ceil - -from queue import Queue, Empty -import threading -from multiprocessing import cpu_count # first party import delphi.operations.secrets as secrets - from delphi.epidata.acquisition.covidcast.logger import get_structured_logger class CovidcastRow(): diff --git a/tests/acquisition/covidcast/test_covidcast_row.py b/tests/acquisition/covidcast/test_covidcast_row.py new file mode 100644 index 000000000..ab6930c5d --- /dev/null +++ b/tests/acquisition/covidcast/test_covidcast_row.py @@ -0,0 +1,103 @@ +import unittest + +from pandas import DataFrame, date_range +from pandas.testing import assert_frame_equal + +from delphi_utils.nancodes import Nans +from delphi.epidata.server.utils.dates import day_to_time_value +from delphi.epidata.acquisition.covidcast.covidcast_row import ( + covidcast_rows_as_api_compatibility_row_df, + covidcast_rows_as_api_row_df, + covidcast_rows_from_args, + set_df_dtypes, + transpose_dict, + CovidcastRow +) + +# py3tester coverage target (equivalent to `import *`) +__test_target__ = 'delphi.epidata.acquisition.covidcast.covidcast_row' + + +class TestCovidcastRows(unittest.TestCase): + def test_transpose_dict(self): + assert transpose_dict(dict([["a", [2, 4, 6]], ["b", [3, 5, 7]], ["c", [10, 20, 30]]])) == [{"a": 2, "b": 3, "c": 10}, {"a": 4, "b": 5, "c": 20}, {"a": 6, "b": 7, "c": 30}] + + + def test_CovidcastRow(self): + df = CovidcastRow.make_default_row(value=5.0).api_row_df + expected_df = set_df_dtypes(DataFrame.from_records([{ + "source": "src", + "signal": "sig", + "time_type": "day", + "geo_type": "county", + "time_value": 20200202, + "geo_value": "01234", + "value": 5.0, + "stderr": 10.0, + "sample_size": 10.0, + "missing_value": Nans.NOT_MISSING, + "missing_stderr": Nans.NOT_MISSING, + "missing_sample_size": Nans.NOT_MISSING, + "issue": 20200202, + "lag": 0, + "direction": None + }]), dtypes = CovidcastRow._pandas_dtypes) + assert_frame_equal(df, expected_df) + + df = CovidcastRow.make_default_row(value=5.0).api_compatibility_row_df + expected_df = set_df_dtypes(DataFrame.from_records([{ + "signal": "sig", + "time_value": 20200202, + "geo_value": "01234", + "value": 5.0, + "stderr": 10.0, + "sample_size": 10.0, + "missing_value": Nans.NOT_MISSING, + "missing_stderr": Nans.NOT_MISSING, + "missing_sample_size": Nans.NOT_MISSING, + "issue": 20200202, + "lag": 0, + "direction": None + }]), dtypes = CovidcastRow._pandas_dtypes) + assert_frame_equal(df, expected_df) + + + def test_covidcast_rows(self): + covidcast_rows = covidcast_rows_from_args(signal=["sig_base"] * 5 + ["sig_other"] * 5, time_value=date_range("2021-05-01", "2021-05-05").to_list() * 2, value=list(range(10))) + df = covidcast_rows_as_api_row_df(covidcast_rows) + expected_df = set_df_dtypes(DataFrame({ + "source": ["src"] * 10, + "signal": ["sig_base"] * 5 + ["sig_other"] * 5, + "time_type": ["day"] * 10, + "geo_type": ["county"] * 10, + "time_value": map(day_to_time_value, date_range("2021-05-01", "2021-05-5").to_list() * 2), + "geo_value": ["01234"] * 10, + "value": range(10), + "stderr": [10.0] * 10, + "sample_size": [10.0] * 10, + "missing_value": [Nans.NOT_MISSING] * 10, + "missing_stderr": [Nans.NOT_MISSING] * 10, + "missing_sample_size": [Nans.NOT_MISSING] * 10, + "issue": map(day_to_time_value, date_range("2021-05-01", "2021-05-5").to_list() * 2), + "lag": [0] * 10, + "direction": [None] * 10 + }), CovidcastRow._pandas_dtypes) + assert_frame_equal(df, expected_df) + + covidcast_rows = covidcast_rows_from_args(signal=["sig_base"] * 5 + ["sig_other"] * 5, time_value=date_range("2021-05-01", "2021-05-05").to_list() * 2, value=list(range(10))) + df = covidcast_rows_as_api_compatibility_row_df(covidcast_rows) + expected_df = set_df_dtypes(DataFrame({ + "signal": ["sig_base"] * 5 + ["sig_other"] * 5, + "time_value": map(day_to_time_value, date_range("2021-05-01", "2021-05-5").to_list() * 2), + "geo_value": ["01234"] * 10, + "value": range(10), + "stderr": [10.0] * 10, + "sample_size": [10.0] * 10, + "missing_value": [Nans.NOT_MISSING] * 10, + "missing_stderr": [Nans.NOT_MISSING] * 10, + "missing_sample_size": [Nans.NOT_MISSING] * 10, + "issue": map(day_to_time_value, date_range("2021-05-01", "2021-05-5").to_list() * 2), + "lag": [0] * 10, + "direction": [None] * 10 + }), CovidcastRow._pandas_dtypes) + assert_frame_equal(df, expected_df) From 694479a4e2747fbfbe4c7bfaf90a99972f077a59 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Fri, 7 Oct 2022 15:31:46 -0700 Subject: [PATCH 02/17] Server: update test_db to use CovidcastRow --- integrations/acquisition/covidcast/test_db.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/integrations/acquisition/covidcast/test_db.py b/integrations/acquisition/covidcast/test_db.py index 3cd7e91a7..63d7b9a43 100644 --- a/integrations/acquisition/covidcast/test_db.py +++ b/integrations/acquisition/covidcast/test_db.py @@ -1,9 +1,9 @@ -import unittest - from delphi_utils import Nans -from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow, DBLoadStateException + +from delphi.epidata.acquisition.covidcast.database import DBLoadStateException +from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase -import delphi.operations.secrets as secrets + # all the Nans we use here are just one value, so this is a shortcut to it: nmv = Nans.NOT_MISSING.value @@ -31,8 +31,8 @@ def _find_matches_for_row(self, row): def test_insert_or_update_with_nonempty_load_table(self): # make rows - a_row = self._make_placeholder_row()[0] - another_row = self._make_placeholder_row(time_value=self.DEFAULT_TIME_VALUE+1, issue=self.DEFAULT_ISSUE+1)[0] + a_row = CovidcastRow.make_default_row(time_value=20200202) + another_row = CovidcastRow.make_default_row(time_value=20200203, issue=20200203) # insert one self._db.insert_or_update_bulk([a_row]) # put something into the load table @@ -61,7 +61,7 @@ def test_id_sync(self): latest_view = 'epimetric_latest_v' # add a data point - base_row, _ = self._make_placeholder_row() + base_row = CovidcastRow.make_default_row() self._insert_rows([base_row]) # ensure the primary keys match in the latest and history tables matches = self._find_matches_for_row(base_row) @@ -71,7 +71,7 @@ def test_id_sync(self): old_pk_id = matches[latest_view][pk_column] # add a reissue for said data point - next_row, _ = self._make_placeholder_row() + next_row = CovidcastRow.make_default_row() next_row.issue += 1 self._insert_rows([next_row]) # ensure the new keys also match From 30dd70c7047238f77138192cccc2640c4e1f0f39 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Fri, 7 Oct 2022 15:32:06 -0700 Subject: [PATCH 03/17] Server: update test_delete_batch to use CovidcastRow --- .../acquisition/covidcast/delete_batch.csv | 6 ++--- .../covidcast/test_delete_batch.py | 24 +++++++------------ 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/integrations/acquisition/covidcast/delete_batch.csv b/integrations/acquisition/covidcast/delete_batch.csv index e0e1eb82c..5c1602218 100644 --- a/integrations/acquisition/covidcast/delete_batch.csv +++ b/integrations/acquisition/covidcast/delete_batch.csv @@ -1,4 +1,4 @@ geo_id,value,stderr,sample_size,issue,time_value,geo_type,signal,source -d_nonlatest,0,0,0,1,0,geo,sig,src -d_latest, 0,0,0,3,0,geo,sig,src -d_justone, 0,0,0,1,0,geo,sig,src \ No newline at end of file +d_nonlatest,0,0,0,1,0,county,sig,src +d_latest, 0,0,0,3,0,county,sig,src +d_justone, 0,0,0,1,0,county,sig,src \ No newline at end of file diff --git a/integrations/acquisition/covidcast/test_delete_batch.py b/integrations/acquisition/covidcast/test_delete_batch.py index 915c9341b..e0957b4da 100644 --- a/integrations/acquisition/covidcast/test_delete_batch.py +++ b/integrations/acquisition/covidcast/test_delete_batch.py @@ -5,13 +5,10 @@ import unittest from os import path -# third party -import mysql.connector - # first party -from delphi_utils import Nans -from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow import delphi.operations.secrets as secrets +from delphi.epidata.acquisition.covidcast.database import Database +from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow, covidcast_rows_from_args # py3tester coverage target (equivalent to `import *`) __test_target__ = 'delphi.epidata.acquisition.covidcast.database' @@ -56,17 +53,12 @@ def test_delete_from_tuples(self): def _test_delete_batch(self, cc_deletions): # load sample data - rows = [] - for time_value in [0, 1]: - rows += [ - # varying numeric column here (2nd to last) is `issue` - CovidcastRow('src', 'sig', 'day', 'geo', time_value, "d_nonlatest", 0,0,0,0,0,0, 1, 0), - CovidcastRow('src', 'sig', 'day', 'geo', time_value, "d_nonlatest", 0,0,0,0,0,0, 2, 0), - CovidcastRow('src', 'sig', 'day', 'geo', time_value, "d_latest", 0,0,0,0,0,0, 1, 0), - CovidcastRow('src', 'sig', 'day', 'geo', time_value, "d_latest", 0,0,0,0,0,0, 2, 0), - CovidcastRow('src', 'sig', 'day', 'geo', time_value, "d_latest", 0,0,0,0,0,0, 3, 0) - ] - rows.append(CovidcastRow('src', 'sig', 'day', 'geo', 0, "d_justone", 0,0,0,0,0,0, 1, 0)) + rows = covidcast_rows_from_args( + time_value = [0] * 5 + [1] * 5 + [0], + geo_value = ["d_nonlatest"] * 2 + ["d_latest"] * 3 + ["d_nonlatest"] * 2 + ["d_latest"] * 3 + ["d_justone"], + issue = [1, 2] + [1, 2, 3] + [1, 2] + [1, 2, 3] + [1], + ) + self._db.insert_or_update_bulk(rows) # delete entries From 45e48ba7050976f0d5522ca486ceff6bd11f74eb Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Fri, 7 Oct 2022 15:32:20 -0700 Subject: [PATCH 04/17] Server: update test_delphi_epidata to use CovidcastRow --- integrations/client/test_delphi_epidata.py | 112 +++++++++++---------- 1 file changed, 57 insertions(+), 55 deletions(-) diff --git a/integrations/client/test_delphi_epidata.py b/integrations/client/test_delphi_epidata.py index 625d2859d..0ebbff0aa 100644 --- a/integrations/client/test_delphi_epidata.py +++ b/integrations/client/test_delphi_epidata.py @@ -1,26 +1,27 @@ """Integration tests for delphi_epidata.py.""" # standard library -import unittest import time -from unittest.mock import patch, MagicMock from json import JSONDecodeError +from unittest.mock import MagicMock, patch -# third party -from aiohttp.client_exceptions import ClientResponseError -import mysql.connector +# first party import pytest +from aiohttp.client_exceptions import ClientResponseError -# first party -from delphi_utils import Nans -from delphi.epidata.client.delphi_epidata import Epidata -from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow +# third party +import delphi.operations.secrets as secrets from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main as update_covidcast_meta_cache +from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase -import delphi.operations.secrets as secrets +from delphi.epidata.client.delphi_epidata import Epidata +from delphi_utils import Nans + # py3tester coverage target __test_target__ = 'delphi.epidata.client.delphi_epidata' +# all the Nans we use here are just one value, so this is a shortcut to it: +nmv = Nans.NOT_MISSING.value def fake_epidata_endpoint(func): """This can be used as a decorator to enable a bogus Epidata endpoint to return 404 responses.""" @@ -30,9 +31,6 @@ def wrapper(*args): Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php' return wrapper -# all the Nans we use here are just one value, so this is a shortcut to it: -nmv = Nans.NOT_MISSING.value - class DelphiEpidataPythonClientTests(CovidcastBase): """Tests the Python client.""" @@ -54,12 +52,12 @@ def test_covidcast(self): # insert placeholder data: three issues of one signal, one issue of another rows = [ - self._make_placeholder_row(issue=self.DEFAULT_ISSUE + i, value=i, lag=i)[0] + CovidcastRow.make_default_row(issue=2020_02_02 + i, value=i, lag=i) for i in range(3) ] row_latest_issue = rows[-1] rows.append( - self._make_placeholder_row(signal="sig2")[0] + CovidcastRow.make_default_row(signal="sig2") ) self._insert_rows(rows) @@ -70,10 +68,11 @@ def test_covidcast(self): ) expected = [ - self.expected_from_row(row_latest_issue), - self.expected_from_row(rows[-1]) + row_latest_issue.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields), + rows[-1].as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) ] + self.assertEqual(response['epidata'], expected) # check result self.assertEqual(response, { 'result': 1, @@ -89,10 +88,10 @@ def test_covidcast(self): expected = [{ rows[0].signal: [ - self.expected_from_row(row_latest_issue, self.DEFAULT_MINUS + ['signal']), + row_latest_issue.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields + ['signal']), ], rows[-1].signal: [ - self.expected_from_row(rows[-1], self.DEFAULT_MINUS + ['signal']), + rows[-1].as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields + ['signal']), ], }] @@ -109,12 +108,12 @@ def test_covidcast(self): **self.params_from_row(rows[0]) ) - expected = self.expected_from_row(row_latest_issue) + expected = [row_latest_issue.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields)] # check result self.assertEqual(response_1, { 'result': 1, - 'epidata': [expected], + 'epidata': expected, 'message': 'success', }) @@ -124,13 +123,13 @@ def test_covidcast(self): **self.params_from_row(rows[0], as_of=rows[1].issue) ) - expected = self.expected_from_row(rows[1]) + expected = [rows[1].as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields)] # check result self.maxDiff=None self.assertEqual(response_1a, { 'result': 1, - 'epidata': [expected], + 'epidata': expected, 'message': 'success', }) @@ -141,8 +140,8 @@ def test_covidcast(self): ) expected = [ - self.expected_from_row(rows[0]), - self.expected_from_row(rows[1]) + rows[0].as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields), + rows[1].as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) ] # check result @@ -158,12 +157,12 @@ def test_covidcast(self): **self.params_from_row(rows[0], lag=2) ) - expected = self.expected_from_row(row_latest_issue) + expected = [row_latest_issue.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields)] # check result self.assertDictEqual(response_3, { 'result': 1, - 'epidata': [expected], + 'epidata': expected, 'message': 'success', }) with self.subTest(name='long request'): @@ -223,16 +222,16 @@ def test_geo_value(self): # insert placeholder data: three counties, three MSAs N = 3 rows = [ - self._make_placeholder_row(geo_type="county", geo_value=str(i)*5, value=i)[0] + CovidcastRow.make_default_row(geo_type="county", geo_value=str(i)*5, value=i) for i in range(N) ] + [ - self._make_placeholder_row(geo_type="msa", geo_value=str(i)*5, value=i*10)[0] + CovidcastRow.make_default_row(geo_type="msa", geo_value=str(i)*5, value=i*10) for i in range(N) ] self._insert_rows(rows) counties = [ - self.expected_from_row(rows[i]) for i in range(N) + rows[i].as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) for i in range(N) ] def fetch(geo): @@ -241,41 +240,44 @@ def fetch(geo): ) # test fetch all - r = fetch('*') - self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], counties) + request = fetch('*') + self.assertEqual(request['message'], 'success') + self.assertEqual(request['epidata'], counties) # test fetch a specific region - r = fetch('11111') - self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[1]]) + request = fetch('11111') + self.assertEqual(request['message'], 'success') + self.assertEqual(request['epidata'], [counties[1]]) # test fetch a specific yet not existing region - r = fetch('55555') - self.assertEqual(r['message'], 'no results') + request = fetch('55555') + self.assertEqual(request['message'], 'no results') # test fetch a multiple regions - r = fetch(['11111', '22222']) - self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[1], counties[2]]) + request = fetch(['11111', '22222']) + self.assertEqual(request['message'], 'success') + self.assertEqual(request['epidata'], [counties[1], counties[2]]) # test fetch a multiple regions in another variant - r = fetch(['00000', '22222']) - self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[0], counties[2]]) + request = fetch(['00000', '22222']) + self.assertEqual(request['message'], 'success') + self.assertEqual(request['epidata'], [counties[0], counties[2]]) # test fetch a multiple regions but one is not existing - r = fetch(['11111', '55555']) - self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[1]]) + request = fetch(['11111', '55555']) + self.assertEqual(request['message'], 'success') + self.assertEqual(request['epidata'], [counties[1]]) # test fetch a multiple regions but specify no region - r = fetch([]) - self.assertEqual(r['message'], 'no results') + request = fetch([]) + self.assertEqual(request['message'], 'no results') def test_covidcast_meta(self): """Test that the covidcast_meta endpoint returns expected data.""" + DEFAULT_TIME_VALUE = 2020_02_02 + DEFAULT_ISSUE = 2020_02_02 + # insert placeholder data: three dates, three issues. values are: # 1st issue: 0 10 20 # 2nd issue: 1 11 21 # 3rd issue: 2 12 22 rows = [ - self._make_placeholder_row(time_value=self.DEFAULT_TIME_VALUE + t, issue=self.DEFAULT_ISSUE + i, value=t*10 + i)[0] + CovidcastRow.make_default_row(time_value=DEFAULT_TIME_VALUE + t, issue=DEFAULT_ISSUE + i, value=t*10 + i) for i in range(3) for t in range(3) ] self._insert_rows(rows) @@ -299,14 +301,14 @@ def test_covidcast_meta(self): signal=rows[0].signal, time_type=rows[0].time_type, geo_type=rows[0].geo_type, - min_time=self.DEFAULT_TIME_VALUE, - max_time=self.DEFAULT_TIME_VALUE + 2, + min_time=DEFAULT_TIME_VALUE, + max_time=DEFAULT_TIME_VALUE + 2, num_locations=1, min_value=2., mean_value=12., max_value=22., stdev_value=8.1649658, # population stdev, not sample, which is 10. - max_issue=self.DEFAULT_ISSUE + 2, + max_issue=DEFAULT_ISSUE + 2, min_lag=0, max_lag=0, # we didn't set lag when inputting data ) @@ -322,10 +324,10 @@ def test_async_epidata(self): # insert placeholder data: three counties, three MSAs N = 3 rows = [ - self._make_placeholder_row(geo_type="county", geo_value=str(i)*5, value=i)[0] + CovidcastRow.make_default_row(geo_type="county", geo_value=str(i)*5, value=i) for i in range(N) ] + [ - self._make_placeholder_row(geo_type="msa", geo_value=str(i)*5, value=i*10)[0] + CovidcastRow.make_default_row(geo_type="msa", geo_value=str(i)*5, value=i*10) for i in range(N) ] self._insert_rows(rows) From 60425248801802763c6811d8fc5fc905ba25327d Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Fri, 7 Oct 2022 15:32:32 -0700 Subject: [PATCH 05/17] Server: update test_covidcast_endpoints to use CovidcastRow --- .../server/test_covidcast_endpoints.py | 97 ++++++++++++------- 1 file changed, 62 insertions(+), 35 deletions(-) diff --git a/integrations/server/test_covidcast_endpoints.py b/integrations/server/test_covidcast_endpoints.py index 54974a874..233d0ff50 100644 --- a/integrations/server/test_covidcast_endpoints.py +++ b/integrations/server/test_covidcast_endpoints.py @@ -1,7 +1,9 @@ """Integration tests for the custom `covidcast/*` endpoints.""" # standard library -from typing import Iterable, Dict, Any +from copy import copy +from itertools import accumulate, chain +from typing import Iterable, Dict, Any, List, Sequence import unittest from io import StringIO @@ -10,21 +12,22 @@ # third party import mysql.connector +from more_itertools import interleave_longest, windowed import requests import pandas as pd +import numpy as np from delphi_utils import Nans from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main as update_cache - -from delphi.epidata.acquisition.covidcast.database import Database +from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase # use the local instance of the Epidata API BASE_URL = "http://delphi_web_epidata/epidata/covidcast" +BASE_URL_OLD = "http://delphi_web_epidata/epidata/api.php" class CovidcastEndpointTests(CovidcastBase): - """Tests the `covidcast/*` endpoint.""" def localSetUp(self): @@ -32,19 +35,31 @@ def localSetUp(self): # reset the `covidcast_meta_cache` table (it should always have one row) self._db._cursor.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') - def _fetch(self, endpoint="/", **params): + def _fetch(self, endpoint="/", is_compatibility=False, **params): # make the request - response = requests.get( - f"{BASE_URL}{endpoint}", - params=params, - ) + if is_compatibility: + url = BASE_URL_OLD + params.setdefault("endpoint", "covidcast") + if params.get("source"): + params.setdefault("data_source", params.get("source")) + else: + url = f"{BASE_URL}{endpoint}" + response = requests.get(url, params=params) response.raise_for_status() return response.json() + def _diff_rows(self, rows: Sequence[float]): + return [float(x - y) if x is not None and y is not None else None for x, y in zip(rows[1:], rows[:-1])] + + def _smooth_rows(self, rows: Sequence[float]): + return [ + sum(e)/len(e) if None not in e else None + for e in windowed(rows, 7) + ] + def test_basic(self): """Request a signal from the / endpoint.""" - - rows = [self._make_placeholder_row(time_value=20200401 + i, value=i)[0] for i in range(10)] + rows = [CovidcastRow.make_default_row(time_value=20200401 + i, value=i) for i in range(10)] first = rows[0] self._insert_rows(rows) @@ -53,20 +68,35 @@ def test_basic(self): self.assertEqual(out["result"], -1) with self.subTest("simple"): - out = self._fetch("/", signal=first.signal_pair(), geo=first.geo_pair(), time="day:*") + out = self._fetch("/", signal=first.signal_pair, geo=first.geo_pair, time="day:*") + self.assertEqual(len(out["epidata"]), len(rows)) + + def test_compatibility(self): + """Request at the /api.php endpoint.""" + rows = [CovidcastRow.make_default_row(source="src", signal="sig", time_value=20200401 + i, value=i) for i in range(10)] + first = rows[0] + self._insert_rows(rows) + + with self.subTest("validation"): + out = self._fetch("/", is_compatibility=True) + self.assertEqual(out["result"], -1) + + with self.subTest("simple"): + out = self._fetch("/", signal=first.signal_pair, geo=first.geo_pair, time="day:*", is_compatibility=True) self.assertEqual(len(out["epidata"]), len(rows)) def test_trend(self): """Request a signal from the /trend endpoint.""" num_rows = 30 - rows = [self._make_placeholder_row(time_value=20200401 + i, value=i)[0] for i in range(num_rows)] + rows = [CovidcastRow.make_default_row(time_value=20200401 + i, value=i) for i in range(num_rows)] first = rows[0] last = rows[-1] ref = rows[num_rows // 2] self._insert_rows(rows) - out = self._fetch("/trend", signal=first.signal_pair(), geo=first.geo_pair(), date=last.time_value, window="20200401-20201212", basis=ref.time_value) + out = self._fetch("/trend", signal=first.signal_pair, geo=first.geo_pair, date=last.time_value, window="20200401-20201212", basis=ref.time_value) + self.assertEqual(out["result"], 1) self.assertEqual(len(out["epidata"]), 1) @@ -90,16 +120,17 @@ def test_trend(self): self.assertEqual(trend["max_value"], last.value) self.assertEqual(trend["max_trend"], "steady") + def test_trendseries(self): """Request a signal from the /trendseries endpoint.""" num_rows = 3 - rows = [self._make_placeholder_row(time_value=20200401 + i, value=num_rows - i)[0] for i in range(num_rows)] + rows = [CovidcastRow.make_default_row(time_value=20200401 + i, value=num_rows - i) for i in range(num_rows)] first = rows[0] last = rows[-1] self._insert_rows(rows) - out = self._fetch("/trendseries", signal=first.signal_pair(), geo=first.geo_pair(), date=last.time_value, window="20200401-20200410", basis=1) + out = self._fetch("/trendseries", signal=first.signal_pair, geo=first.geo_pair, date=last.time_value, window="20200401-20200410", basis=1) self.assertEqual(out["result"], 1) self.assertEqual(len(out["epidata"]), 3) @@ -127,6 +158,7 @@ def match_row(trend, row): self.assertEqual(trend["max_date"], first.time_value) self.assertEqual(trend["max_value"], first.value) self.assertEqual(trend["max_trend"], "steady") + with self.subTest("trend1"): trend = trends[1] match_row(trend, rows[1]) @@ -159,15 +191,15 @@ def test_correlation(self): """Request a signal from the /correlation endpoint.""" num_rows = 30 - reference_rows = [self._make_placeholder_row(signal="ref", time_value=20200401 + i, value=i)[0] for i in range(num_rows)] + reference_rows = [CovidcastRow.make_default_row(signal="ref", time_value=20200401 + i, value=i) for i in range(num_rows)] first = reference_rows[0] self._insert_rows(reference_rows) - other_rows = [self._make_placeholder_row(signal="other", time_value=20200401 + i, value=i)[0] for i in range(num_rows)] + other_rows = [CovidcastRow.make_default_row(signal="other", time_value=20200401 + i, value=i) for i in range(num_rows)] other = other_rows[0] self._insert_rows(other_rows) max_lag = 3 - out = self._fetch("/correlation", reference=first.signal_pair(), others=other.signal_pair(), geo=first.geo_pair(), window="20200401-20201212", lag=max_lag) + out = self._fetch("/correlation", reference=first.signal_pair, others=other.signal_pair, geo=first.geo_pair, window="20200401-20201212", lag=max_lag) self.assertEqual(out["result"], 1) df = pd.DataFrame(out["epidata"]) self.assertEqual(len(df), max_lag * 2 + 1) # -...0...+ @@ -185,31 +217,26 @@ def test_correlation(self): def test_csv(self): """Request a signal from the /csv endpoint.""" - rows = [self._make_placeholder_row(time_value=20200401 + i, value=i)[0] for i in range(10)] + rows = [CovidcastRow.make_default_row(time_value=20200401 + i, value=i) for i in range(10)] first = rows[0] self._insert_rows(rows) response = requests.get( f"{BASE_URL}/csv", - params=dict(signal=first.signal_pair(), start_day="2020-04-01", end_day="2020-12-12", geo_type=first.geo_type), + params=dict(signal=first.signal_pair, start_day="2020-04-01", end_day="2020-12-12", geo_type=first.geo_type), ) - response.raise_for_status() - out = response.text - df = pd.read_csv(StringIO(out), index_col=0) - self.assertEqual(df.shape, (len(rows), 10)) - self.assertEqual(list(df.columns), ["geo_value", "signal", "time_value", "issue", "lag", "value", "stderr", "sample_size", "geo_type", "data_source"]) def test_backfill(self): """Request a signal from the /backfill endpoint.""" num_rows = 10 - issue_0 = [self._make_placeholder_row(time_value=20200401 + i, value=i, sample_size=1, lag=0, issue=20200401 + i)[0] for i in range(num_rows)] - issue_1 = [self._make_placeholder_row(time_value=20200401 + i, value=i + 1, sample_size=2, lag=1, issue=20200401 + i + 1)[0] for i in range(num_rows)] - last_issue = [self._make_placeholder_row(time_value=20200401 + i, value=i + 2, sample_size=3, lag=2, issue=20200401 + i + 2)[0] for i in range(num_rows)] # <-- the latest issues + issue_0 = [CovidcastRow.make_default_row(time_value=20200401 + i, value=i, sample_size=1, lag=0, issue=20200401 + i) for i in range(num_rows)] + issue_1 = [CovidcastRow.make_default_row(time_value=20200401 + i, value=i + 1, sample_size=2, lag=1, issue=20200401 + i + 1) for i in range(num_rows)] + last_issue = [CovidcastRow.make_default_row(time_value=20200401 + i, value=i + 2, sample_size=3, lag=2, issue=20200401 + i + 2) for i in range(num_rows)] # <-- the latest issues self._insert_rows([*issue_0, *issue_1, *last_issue]) first = issue_0[0] - out = self._fetch("/backfill", signal=first.signal_pair(), geo=first.geo_pair(), time="day:20200401-20201212", anchor_lag=3) + out = self._fetch("/backfill", signal=first.signal_pair, geo=first.geo_pair, time="day:20200401-20201212", anchor_lag=3) self.assertEqual(out["result"], 1) df = pd.DataFrame(out["epidata"]) self.assertEqual(len(df), 3 * num_rows) # num issues @@ -231,7 +258,7 @@ def test_meta(self): """Request a signal from the /meta endpoint.""" num_rows = 10 - rows = [self._make_placeholder_row(time_value=20200401 + i, value=i, source="fb-survey", signal="smoothed_cli")[0] for i in range(num_rows)] + rows = [CovidcastRow.make_default_row(time_value=20200401 + i, value=i, source="fb-survey", signal="smoothed_cli") for i in range(num_rows)] self._insert_rows(rows) first = rows[0] last = rows[-1] @@ -272,22 +299,22 @@ def test_coverage(self): num_geos_per_date = [10, 20, 30, 40, 44] dates = [20200401 + i for i in range(len(num_geos_per_date))] - rows = [self._make_placeholder_row(time_value=dates[i], value=i, geo_value=str(geo_value))[0] for i, num_geo in enumerate(num_geos_per_date) for geo_value in range(num_geo)] + rows = [CovidcastRow.make_default_row(time_value=dates[i], value=i, geo_value=str(geo_value)) for i, num_geo in enumerate(num_geos_per_date) for geo_value in range(num_geo)] self._insert_rows(rows) first = rows[0] with self.subTest("default"): - out = self._fetch("/coverage", signal=first.signal_pair(), geo_type=first.geo_type, latest=dates[-1], format="json") + out = self._fetch("/coverage", signal=first.signal_pair, geo_type=first.geo_type, latest=dates[-1], format="json") self.assertEqual(len(out), len(num_geos_per_date)) self.assertEqual([o["time_value"] for o in out], dates) self.assertEqual([o["count"] for o in out], num_geos_per_date) with self.subTest("specify window"): - out = self._fetch("/coverage", signal=first.signal_pair(), geo_type=first.geo_type, window=f"{dates[0]}-{dates[1]}", format="json") + out = self._fetch("/coverage", signal=first.signal_pair, geo_type=first.geo_type, window=f"{dates[0]}-{dates[1]}", format="json") self.assertEqual(len(out), 2) self.assertEqual([o["time_value"] for o in out], dates[:2]) self.assertEqual([o["count"] for o in out], num_geos_per_date[:2]) with self.subTest("invalid geo_type"): - out = self._fetch("/coverage", signal=first.signal_pair(), geo_type="doesnt_exist", format="json") + out = self._fetch("/coverage", signal=first.signal_pair, geo_type="doesnt_exist", format="json") self.assertEqual(len(out), 0) From fca788b530f16400ef9e0f80cf5440d2be8def9a Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Fri, 7 Oct 2022 15:32:40 -0700 Subject: [PATCH 06/17] Server: update test_covidcast to use CovidcastRow --- integrations/server/test_covidcast.py | 155 ++++++++++++-------------- tests/server/test_pandas.py | 1 - 2 files changed, 72 insertions(+), 84 deletions(-) diff --git a/integrations/server/test_covidcast.py b/integrations/server/test_covidcast.py index b69a68b06..6a16c9df1 100644 --- a/integrations/server/test_covidcast.py +++ b/integrations/server/test_covidcast.py @@ -1,7 +1,7 @@ """Integration tests for the `covidcast` endpoint.""" # standard library -import json +from typing import Callable import unittest # third party @@ -10,13 +10,12 @@ # first party from delphi_utils import Nans +from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase # use the local instance of the Epidata API BASE_URL = 'http://delphi_web_epidata/epidata/api.php' - - class CovidcastTests(CovidcastBase): """Tests the `covidcast` endpoint.""" @@ -24,28 +23,26 @@ def localSetUp(self): """Perform per-test setup.""" self._db._cursor.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') - def request_based_on_row(self, row, extract_response=lambda x: x.json(), **kwargs): + def request_based_on_row(self, row: CovidcastRow, extract_response: Callable = lambda x: x.json(), **kwargs): params = self.params_from_row(row, endpoint='covidcast', **kwargs) response = requests.get(BASE_URL, params=params) response.raise_for_status() response = extract_response(response) - expected = self.expected_from_row(row) - - return response, expected + return response def _insert_placeholder_set_one(self): - row, settings = self._make_placeholder_row() + row = CovidcastRow.make_default_row() self._insert_rows([row]) return row def _insert_placeholder_set_two(self): rows = [ - self._make_placeholder_row(geo_type='county', geo_value=str(i)*5, value=i*1., stderr=i*10., sample_size=i*100.)[0] + CovidcastRow.make_default_row(geo_type='county', geo_value=str(i)*5, value=i*1., stderr=i*10., sample_size=i*100.) for i in [1, 2, 3] ] + [ # geo value intended to overlap with counties above - self._make_placeholder_row(geo_type='msa', geo_value=str(i-3)*5, value=i*1., stderr=i*10., sample_size=i*100.)[0] + CovidcastRow.make_default_row(geo_type='msa', geo_value=str(i-3)*5, value=i*1., stderr=i*10., sample_size=i*100.) for i in [4, 5, 6] ] self._insert_rows(rows) @@ -53,11 +50,11 @@ def _insert_placeholder_set_two(self): def _insert_placeholder_set_three(self): rows = [ - self._make_placeholder_row(geo_type='county', geo_value='11111', time_value=2000_01_01+i, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03, lag=2-i)[0] + CovidcastRow.make_default_row(geo_type='county', geo_value='11111', time_value=2000_01_01+i, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03, lag=2-i) for i in [1, 2, 3] ] + [ # time value intended to overlap with 11111 above, with disjoint geo values - self._make_placeholder_row(geo_type='county', geo_value=str(i)*5, time_value=2000_01_01+i-3, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03, lag=5-i)[0] + CovidcastRow.make_default_row(geo_type='county', geo_value=str(i)*5, time_value=2000_01_01+i-3, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03, lag=5-i) for i in [4, 5, 6] ] self._insert_rows(rows) @@ -65,11 +62,11 @@ def _insert_placeholder_set_three(self): def _insert_placeholder_set_four(self): rows = [ - self._make_placeholder_row(source='src1', signal=str(i)*5, value=i*1., stderr=i*10., sample_size=i*100.)[0] + CovidcastRow.make_default_row(source='src1', signal=str(i)*5, value=i*1., stderr=i*10., sample_size=i*100.) for i in [1, 2, 3] ] + [ # signal intended to overlap with the signal above - self._make_placeholder_row(source='src2', signal=str(i-3)*5, value=i*1., stderr=i*10., sample_size=i*100.)[0] + CovidcastRow.make_default_row(source='src2', signal=str(i-3)*5, value=i*1., stderr=i*10., sample_size=i*100.) for i in [4, 5, 6] ] self._insert_rows(rows) @@ -77,11 +74,11 @@ def _insert_placeholder_set_four(self): def _insert_placeholder_set_five(self): rows = [ - CovidcastRow(time_value=2000_01_01, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03+i) + CovidcastRow.make_default_row(time_value=2000_01_01, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03+i) for i in [1, 2, 3] ] + [ # different time_values, same issues - CovidcastRow(time_value=2000_01_01+i-3, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03+i-3) + CovidcastRow.make_default_row(time_value=2000_01_01+i-3, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03+i-3) for i in [4, 5, 6] ] self._insert_rows(rows) @@ -94,10 +91,13 @@ def test_round_trip(self): row = self._insert_placeholder_set_one() # make the request - response, expected = self.request_based_on_row(row) + response = self.request_based_on_row(row) + + expected = [row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields)] + self.assertEqual(response, { 'result': 1, - 'epidata': [expected], + 'epidata': expected, 'message': 'success', }) @@ -154,32 +154,25 @@ def test_csv_format(self): # make the request # NB 'format' is a Python reserved word - response, _ = self.request_based_on_row( + response = self.request_based_on_row( row, extract_response=lambda resp: resp.text, **{'format':'csv'} ) - expected_response = ( - "geo_value,signal,time_value,direction,issue,lag,missing_value," + - "missing_stderr,missing_sample_size,value,stderr,sample_size\n" + - ",".join("" if x is None else str(x) for x in [ - row.geo_value, - row.signal, - row.time_value, - row.direction, - row.issue, - row.lag, - row.missing_value, - row.missing_stderr, - row.missing_sample_size, - row.value, - row.stderr, - row.sample_size - ]) + "\n" + + # TODO: This is a mess because of api.php. + column_order = [ + "geo_value", "signal", "time_value", "direction", "issue", "lag", "missing_value", + "missing_stderr", "missing_sample_size", "value", "stderr", "sample_size" + ] + expected = ( + row.api_compatibility_row_df + .assign(direction = None) + .to_csv(columns=column_order, index=False) ) # assert that the right data came back - self.assertEqual(response, expected_response) + self.assertEqual(response, expected) def test_raw_json_format(self): """Test generate raw json data.""" @@ -188,10 +181,12 @@ def test_raw_json_format(self): row = self._insert_placeholder_set_one() # make the request - response, expected = self.request_based_on_row(row, **{'format':'json'}) + response = self.request_based_on_row(row, **{'format':'json'}) + + expected = [row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields)] # assert that the right data came back - self.assertEqual(response, [expected]) + self.assertEqual(response, expected) def test_fields(self): """Test fields parameter""" @@ -200,7 +195,9 @@ def test_fields(self): row = self._insert_placeholder_set_one() # limit fields - response, expected = self.request_based_on_row(row, fields='time_value,geo_value') + response = self.request_based_on_row(row, fields='time_value,geo_value') + + expected = row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) expected_all = { 'result': 1, 'epidata': [{ @@ -213,15 +210,14 @@ def test_fields(self): self.assertEqual(response, expected_all) # limit using invalid fields - response, _ = self.request_based_on_row(row, fields='time_value,geo_value,doesnt_exist') + response = self.request_based_on_row(row, fields='time_value,geo_value,doesnt_exist') # assert that the right data came back (only valid fields) self.assertEqual(response, expected_all) # limit exclude fields: exclude all except time_value and geo_value - - response, _ = self.request_based_on_row(row, fields=( + response = self.request_based_on_row(row, fields=( '-value,-stderr,-sample_size,-direction,-issue,-lag,-signal,' + '-missing_value,-missing_stderr,-missing_sample_size' )) @@ -234,18 +230,15 @@ def test_location_wildcard(self): # insert placeholder data rows = self._insert_placeholder_set_two() - expected_counties = [ - self.expected_from_row(r) for r in rows[:3] - ] - + expected = [row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) for row in rows[:3]] # make the request - response, _ = self.request_based_on_row(rows[0], geo_value="*") + response = self.request_based_on_row(rows[0], geo_value="*") self.maxDiff = None # assert that the right data came back self.assertEqual(response, { 'result': 1, - 'epidata': expected_counties, + 'epidata': expected, 'message': 'success', }) @@ -254,7 +247,7 @@ def test_time_values_wildcard(self): # insert placeholder data rows = self._insert_placeholder_set_three() - expected = [row.as_dict(ignore_fields=IGNORE_FIELDS) for row in rows[:3]] + expected = [row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) for row in rows[:3]] # make the request response = self.request_based_on_row(rows[0], time_values="*") @@ -272,7 +265,7 @@ def test_issues_wildcard(self): # insert placeholder data rows = self._insert_placeholder_set_five() - expected = [row.as_dict(ignore_fields=IGNORE_FIELDS) for row in rows[:3]] + expected = [row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) for row in rows[:3]] # make the request response = self.request_based_on_row(rows[0], issues="*") @@ -290,12 +283,10 @@ def test_signal_wildcard(self): # insert placeholder data rows = self._insert_placeholder_set_four() - expected_signals = [ - self.expected_from_row(r) for r in rows[:3] - ] + expected_signals = [row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) for row in rows[:3]] # make the request - response, _ = self.request_based_on_row(rows[0], signals="*") + response = self.request_based_on_row(rows[0], signals="*") self.maxDiff = None # assert that the right data came back @@ -310,35 +301,33 @@ def test_geo_value(self): # insert placeholder data rows = self._insert_placeholder_set_two() - expected_counties = [ - self.expected_from_row(r) for r in rows[:3] - ] + expected = [row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) for row in rows[:3]] def fetch(geo_value): # make the request - response, _ = self.request_based_on_row(rows[0], geo_value=geo_value) + response = self.request_based_on_row(rows[0], geo_value=geo_value) return response # test fetch a specific region r = fetch('11111') self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [expected_counties[0]]) + self.assertEqual(r['epidata'], expected[0:1]) # test fetch a specific yet not existing region r = fetch('55555') self.assertEqual(r['message'], 'no results') # test fetch multiple regions r = fetch('11111,22222') self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [expected_counties[0], expected_counties[1]]) + self.assertEqual(r['epidata'], expected[0:2]) # test fetch multiple noncontiguous regions r = fetch('11111,33333') self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [expected_counties[0], expected_counties[2]]) + self.assertEqual(r['epidata'], [expected[0], expected[2]]) # test fetch multiple regions but one is not existing r = fetch('11111,55555') self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [expected_counties[0]]) + self.assertEqual(r['epidata'], expected[0:1]) # test fetch empty region r = fetch('') self.assertEqual(r['message'], 'no results') @@ -348,12 +337,10 @@ def test_location_timeline(self): # insert placeholder data rows = self._insert_placeholder_set_three() - expected_timeseries = [ - self.expected_from_row(r) for r in rows[:3] - ] + expected_timeseries = [row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) for row in rows[:3]] # make the request - response, _ = self.request_based_on_row(rows[0], time_values='20000101-20000105') + response = self.request_based_on_row(rows[0], time_values='20000101-20000105') # assert that the right data came back self.assertEqual(response, { @@ -379,15 +366,16 @@ def test_unique_key_constraint(self): def test_nullable_columns(self): """Missing values should be surfaced as null.""" - row, _ = self._make_placeholder_row( + row = CovidcastRow.make_default_row( stderr=None, sample_size=None, missing_stderr=Nans.OTHER.value, missing_sample_size=Nans.OTHER.value ) self._insert_rows([row]) # make the request - response, expected = self.request_based_on_row(row) - expected.update(stderr=None, sample_size=None) + response = self.request_based_on_row(row) + expected = row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) + # expected.update(stderr=None, sample_size=None) # assert that the right data came back self.assertEqual(response, { @@ -401,18 +389,19 @@ def test_temporal_partitioning(self): # insert placeholder data rows = [ - self._make_placeholder_row(time_type=tt)[0] + CovidcastRow.make_default_row(time_type=tt) for tt in "hour day week month year".split() ] self._insert_rows(rows) # make the request - response, expected = self.request_based_on_row(rows[1], time_values="0-99999999") + response = self.request_based_on_row(rows[1], time_values="20000101-30010201") + expected = [rows[1].as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields)] # assert that the right data came back self.assertEqual(response, { 'result': 1, - 'epidata': [expected], + 'epidata': expected, 'message': 'success', }) @@ -423,37 +412,37 @@ def test_date_formats(self): rows = self._insert_placeholder_set_three() # make the request - response, expected = self.request_based_on_row(rows[0], time_values="20000102", geo_value="*") + response = self.request_based_on_row(rows[0], time_values="20000102", geo_value="*") # assert that the right data came back self.assertEqual(len(response['epidata']), 2) # make the request - response, expected = self.request_based_on_row(rows[0], time_values="2000-01-02", geo_value="*") + response = self.request_based_on_row(rows[0], time_values="2000-01-02", geo_value="*") # assert that the right data came back self.assertEqual(len(response['epidata']), 2) # make the request - response, expected = self.request_based_on_row(rows[0], time_values="20000102,20000103", geo_value="*") + response = self.request_based_on_row(rows[0], time_values="20000102,20000103", geo_value="*") # assert that the right data came back - self.assertEqual(len(response['epidata']), 4) + self.assertEqual(len(response['epidata']), 2 * 2) # make the request - response, expected = self.request_based_on_row(rows[0], time_values="2000-01-02,2000-01-03", geo_value="*") + response = self.request_based_on_row(rows[0], time_values="2000-01-02,2000-01-03", geo_value="*") # assert that the right data came back - self.assertEqual(len(response['epidata']), 4) + self.assertEqual(len(response['epidata']), 2 * 2) # make the request - response, expected = self.request_based_on_row(rows[0], time_values="20000102-20000104", geo_value="*") + response = self.request_based_on_row(rows[0], time_values="20000102-20000104", geo_value="*") # assert that the right data came back - self.assertEqual(len(response['epidata']), 6) + self.assertEqual(len(response['epidata']), 2 * 3) # make the request - response, expected = self.request_based_on_row(rows[0], time_values="2000-01-02:2000-01-04", geo_value="*") + response = self.request_based_on_row(rows[0], time_values="2000-01-02:2000-01-04", geo_value="*") # assert that the right data came back - self.assertEqual(len(response['epidata']), 6) + self.assertEqual(len(response['epidata']), 2 * 3) diff --git a/tests/server/test_pandas.py b/tests/server/test_pandas.py index 083162a47..12a9c18cd 100644 --- a/tests/server/test_pandas.py +++ b/tests/server/test_pandas.py @@ -9,7 +9,6 @@ from delphi.epidata.server._pandas import as_pandas from delphi.epidata.server._config import MAX_RESULTS - # py3tester coverage target __test_target__ = "delphi.epidata.server._pandas" From 79e664b823490f45ba90071772148d48b1f4a88d Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Fri, 7 Oct 2022 15:33:14 -0700 Subject: [PATCH 07/17] Server: update test_utils to use CovidcastRow --- src/acquisition/covidcast/test_utils.py | 44 ++++++------------------- 1 file changed, 10 insertions(+), 34 deletions(-) diff --git a/src/acquisition/covidcast/test_utils.py b/src/acquisition/covidcast/test_utils.py index 181dfac68..45f9fbfd0 100644 --- a/src/acquisition/covidcast/test_utils.py +++ b/src/acquisition/covidcast/test_utils.py @@ -1,7 +1,9 @@ +from typing import Sequence import unittest from delphi_utils import Nans -from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow +from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow +from delphi.epidata.acquisition.covidcast.database import Database import delphi.operations.secrets as secrets # all the Nans we use here are just one value, so this is a shortcut to it: @@ -31,36 +33,20 @@ def tearDown(self): # close and destroy conenction to the database self._db.disconnect(False) del self._db + self.localTearDown() - DEFAULT_TIME_VALUE=2000_01_01 - DEFAULT_ISSUE=2000_01_01 - def _make_placeholder_row(self, **kwargs): - settings = { - 'source': 'src', - 'signal': 'sig', - 'geo_type': 'state', - 'geo_value': 'pa', - 'time_type': 'day', - 'time_value': self.DEFAULT_TIME_VALUE, - 'value': 0.0, - 'stderr': 1.0, - 'sample_size': 2.0, - 'missing_value': nmv, - 'missing_stderr': nmv, - 'missing_sample_size': nmv, - 'issue': self.DEFAULT_ISSUE, - 'lag': 0 - } - settings.update(kwargs) - return (CovidcastRow(**settings), settings) + def localTearDown(self): + # stub; override in subclasses to perform custom teardown. + # runs after database changes have been committed + pass - def _insert_rows(self, rows): + def _insert_rows(self, rows: Sequence[CovidcastRow]): # inserts rows into the database using the full acquisition process, including 'dbjobs' load into history & latest tables n = self._db.insert_or_update_bulk(rows) print(f"{n} rows added to load table & dispatched to v4 schema") self._db._connection.commit() # NOTE: this isnt expressly needed for our test cases, but would be if using external access (like through client lib) to ensure changes are visible outside of this db session - def params_from_row(self, row, **kwargs): + def params_from_row(self, row: CovidcastRow, **kwargs): ret = { 'data_source': row.source, 'signals': row.signal, @@ -71,13 +57,3 @@ def params_from_row(self, row, **kwargs): } ret.update(kwargs) return ret - - DEFAULT_MINUS=['time_type', 'geo_type', 'source'] - def expected_from_row(self, row, minus=DEFAULT_MINUS): - expected = dict(vars(row)) - # remove columns commonly excluded from output - # nb may need to add source or *_type back in for multiplexed queries - for key in ['id', 'direction_updated_timestamp'] + minus: - del expected[key] - return expected - From 8ea401892b93a7b41fe9472bc24d6a3d5e361787 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Fri, 7 Oct 2022 15:50:17 -0700 Subject: [PATCH 08/17] Server: update TimePair to auto-sort tuples --- src/server/_params.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/server/_params.py b/src/server/_params.py index d0b1cda6d..4eace7be1 100644 --- a/src/server/_params.py +++ b/src/server/_params.py @@ -111,6 +111,10 @@ class TimeSet: time_type: str time_values: Union[bool, TimeValues] + def __post_init__(self): + if isinstance(self.time_values, list): + self.time_values = [(min(time_value), max(time_value)) if isinstance(time_value, tuple) else time_value for time_value in self.time_values] + @property def is_week(self) -> bool: return self.time_type == 'week' From 5400ece14d6a6b4f88f730973e6dcfe199f1f97a Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Fri, 7 Oct 2022 16:02:09 -0700 Subject: [PATCH 09/17] Server: minor model.py data_source_by_id name update --- src/server/endpoints/covidcast_utils/model.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/server/endpoints/covidcast_utils/model.py b/src/server/endpoints/covidcast_utils/model.py index abab0033b..4cc78888e 100644 --- a/src/server/endpoints/covidcast_utils/model.py +++ b/src/server/endpoints/covidcast_utils/model.py @@ -202,7 +202,7 @@ def _load_data_sources(): data_sources, data_sources_df = _load_data_sources() -data_source_by_id = {d.source: d for d in data_sources} +data_sources_by_id = {d.source: d for d in data_sources} def _load_data_signals(sources: List[DataSource]): @@ -231,7 +231,7 @@ def _load_data_signals(sources: List[DataSource]): data_signals_by_key = {d.key: d for d in data_signals} # also add the resolved signal version to the signal lookup for d in data_signals: - source = data_source_by_id.get(d.source) + source = data_sources_by_id.get(d.source) if source and source.uses_db_alias: data_signals_by_key[(source.db_source, d.signal)] = d @@ -261,7 +261,7 @@ def create_source_signal_alias_mapper(source_signals: List[SourceSignalSet]) -> alias_to_data_sources: Dict[str, List[DataSource]] = {} transformed_sets: List[SourceSignalSet] = [] for ssset in source_signals: - source = data_source_by_id.get(ssset.source) + source = data_sources_by_id.get(ssset.source) if not source or not source.uses_db_alias: transformed_sets.append(ssset) continue From 01b225e99055cafe67694d5e417fb72342e10ae4 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Wed, 30 Nov 2022 17:03:40 -0800 Subject: [PATCH 10/17] Server: update csv issue none handling --- src/server/endpoints/covidcast.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/endpoints/covidcast.py b/src/server/endpoints/covidcast.py index 09b3d3740..97322d2cc 100644 --- a/src/server/endpoints/covidcast.py +++ b/src/server/endpoints/covidcast.py @@ -328,7 +328,7 @@ def parse_row(i, row): "geo_value": row["geo_value"], "signal": row["signal"], "time_value": time_value_to_iso(row["time_value"]) if is_day else row["time_value"], - "issue": time_value_to_iso(row["issue"]) if is_day else row["issue"], + "issue": time_value_to_iso(row["issue"]) if is_day and row["issue"] is not None else row["issue"], "lag": row["lag"], "value": row["value"], "stderr": row["stderr"], From 7be40fa771cac6c1f3df6a195040782b19278f6d Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Fri, 4 Nov 2022 16:57:57 -0700 Subject: [PATCH 11/17] Server: add type hints to _query --- src/server/_query.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/server/_query.py b/src/server/_query.py index 2b716e954..267a78eb1 100644 --- a/src/server/_query.py +++ b/src/server/_query.py @@ -9,8 +9,9 @@ Sequence, Tuple, Union, - cast + cast, ) +from flask import Response from flask import request from sqlalchemy import text @@ -54,7 +55,7 @@ def filter_values( param_key: str, params: Dict[str, Any], formatter=lambda x: x, -): +) -> str: if not values: return "FALSE" # builds a SQL expression to filter strings (ex: locations) @@ -69,7 +70,7 @@ def filter_strings( values: Optional[Sequence[str]], param_key: str, params: Dict[str, Any], -): +) -> str: return filter_values(field, values, param_key, params) @@ -78,7 +79,7 @@ def filter_integers( values: Optional[Sequence[IntRange]], param_key: str, params: Dict[str, Any], -): +) -> str: return filter_values(field, values, param_key, params) @@ -87,7 +88,7 @@ def filter_dates( values: Optional[TimeValues], param_key: str, params: Dict[str, Any], -): +) -> str: ranges = time_values_to_ranges(values) return filter_values(field, ranges, param_key, params, date_string) @@ -199,7 +200,7 @@ def parse_row( fields_string: Optional[Sequence[str]] = None, fields_int: Optional[Sequence[str]] = None, fields_float: Optional[Sequence[str]] = None, -): +) -> Dict[str, Any]: keys = set(row.keys()) parsed = dict() if fields_string: @@ -235,7 +236,7 @@ def limit_query(query: str, limit: int) -> str: return full_query -def run_query(p: APrinter, query_tuple: Tuple[str, Dict[str, Any]]): +def run_query(p: APrinter, query_tuple: Tuple[str, Dict[str, Any]]) -> Iterable[Row]: query, params = query_tuple # limit rows + 1 for detecting whether we would have more full_query = text(limit_query(query, p.remaining_rows + 1)) @@ -255,7 +256,7 @@ def execute_queries( fields_int: Sequence[str], fields_float: Sequence[str], transform: Callable[[Dict[str, Any], Row], Dict[str, Any]] = _identity_transform, -): +) -> Response: """ execute the given queries and return the response to send them """ @@ -314,14 +315,14 @@ def execute_query( fields_int: Sequence[str], fields_float: Sequence[str], transform: Callable[[Dict[str, Any], Row], Dict[str, Any]] = _identity_transform, -): +) -> Response: """ execute the given query and return the response to send it """ return execute_queries([(query, params)], fields_string, fields_int, fields_float, transform) -def _join_l(value: Union[str, List[str]]): +def _join_l(value: Union[str, List[str]]) -> str: return ", ".join(value) if isinstance(value, (list, tuple)) else value From 91427cd5e26020dc51f101ab845b491b10f543a6 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Tue, 11 Oct 2022 14:46:48 -0700 Subject: [PATCH 12/17] Acquisition: update test_csv_uploading to remove Pandas warning --- integrations/acquisition/covidcast/test_csv_uploading.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integrations/acquisition/covidcast/test_csv_uploading.py b/integrations/acquisition/covidcast/test_csv_uploading.py index de3eb5f13..f975ecfa0 100644 --- a/integrations/acquisition/covidcast/test_csv_uploading.py +++ b/integrations/acquisition/covidcast/test_csv_uploading.py @@ -213,8 +213,8 @@ def test_uploading(self): "time_value": [20200419], "signal": [signal_name], "direction": [None]})], axis=1).rename(columns=uploader_column_rename) - expected_values_df["missing_value"].iloc[0] = Nans.OTHER - expected_values_df["missing_sample_size"].iloc[0] = Nans.NOT_MISSING + expected_values_df.loc[0, "missing_value"] = Nans.OTHER + expected_values_df.loc[0, "missing_sample_size"] = Nans.NOT_MISSING expected_values = expected_values_df.to_dict(orient="records") expected_response = {'result': 1, 'epidata': self.apply_lag(expected_values), 'message': 'success'} From c101a3c31194289fdfc1ffd32bf47969d7445929 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Mon, 5 Dec 2022 12:31:48 -0800 Subject: [PATCH 13/17] Server: add PANDAS_DTYPES to model.py --- src/server/endpoints/covidcast_utils/model.py | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/server/endpoints/covidcast_utils/model.py b/src/server/endpoints/covidcast_utils/model.py index 4cc78888e..5d2d5011b 100644 --- a/src/server/endpoints/covidcast_utils/model.py +++ b/src/server/endpoints/covidcast_utils/model.py @@ -9,6 +9,28 @@ from ..._params import SourceSignalSet +PANDAS_DTYPES = { + "source": str, + "signal": str, + "time_type": str, + "time_value": "Int64", + "geo_type": str, + "geo_value": str, + "value": float, + "stderr": float, + "sample_size": float, + "missing_value": "Int8", + "missing_stderr": "Int8", + "missing_sample_size": "Int8", + "issue": "Int64", + "lag": "Int64", + "id": "Int64", + "direction": "Int8", + "direction_updated_timestamp": "Int64", + "value_updated_timestamp": "Int64", +} + + class HighValuesAre(str, Enum): bad = "bad" good = "good" From 3e214e2ab9f93832dc6bc62b9caa4e54f17126e6 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Mon, 5 Dec 2022 12:39:55 -0800 Subject: [PATCH 14/17] Docker: add more_itertools==8.4.0 to Python and API images --- requirements.api.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.api.txt b/requirements.api.txt index d5cc0e63b..6ccafc1e1 100644 --- a/requirements.api.txt +++ b/requirements.api.txt @@ -2,6 +2,7 @@ epiweeks==2.1.2 Flask==2.2.2 itsdangerous<2.1 jinja2==3.0.3 +more_itertools==8.4.0 mysqlclient==2.1.1 newrelic orjson==3.4.7 From 7268fc024d4beeea19b764b4cba0e7040c0ba86b Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Mon, 5 Dec 2022 13:11:40 -0800 Subject: [PATCH 15/17] Acquisition: update database.py to use CovidcastRow --- src/acquisition/covidcast/database.py | 52 ++------------------------- 1 file changed, 3 insertions(+), 49 deletions(-) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index cdc0dc959..bd2b526fd 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -10,59 +10,13 @@ # third party import json +from typing import List import mysql.connector # first party import delphi.operations.secrets as secrets from delphi.epidata.acquisition.covidcast.logger import get_structured_logger - -class CovidcastRow(): - """A container for all the values of a single covidcast row.""" - - @staticmethod - def fromCsvRowValue(row_value, source, signal, time_type, geo_type, time_value, issue, lag): - if row_value is None: return None - return CovidcastRow(source, signal, time_type, geo_type, time_value, - row_value.geo_value, - row_value.value, - row_value.stderr, - row_value.sample_size, - row_value.missing_value, - row_value.missing_stderr, - row_value.missing_sample_size, - issue, lag) - - @staticmethod - def fromCsvRows(row_values, source, signal, time_type, geo_type, time_value, issue, lag): - # NOTE: returns a generator, as row_values is expected to be a generator - return (CovidcastRow.fromCsvRowValue(row_value, source, signal, time_type, geo_type, time_value, issue, lag) - for row_value in row_values) - - def __init__(self, source, signal, time_type, geo_type, time_value, geo_value, value, stderr, - sample_size, missing_value, missing_stderr, missing_sample_size, issue, lag): - self.id = None - self.source = source - self.signal = signal - self.time_type = time_type - self.geo_type = geo_type - self.time_value = time_value - self.geo_value = geo_value # from CSV row - self.value = value # ... - self.stderr = stderr # ... - self.sample_size = sample_size # ... - self.missing_value = missing_value # ... - self.missing_stderr = missing_stderr # ... - self.missing_sample_size = missing_sample_size # from CSV row - self.direction_updated_timestamp = 0 - self.direction = None - self.issue = issue - self.lag = lag - - def signal_pair(self): - return f"{self.source}:{self.signal}" - - def geo_pair(self): - return f"{self.geo_type}:{self.geo_value}" +from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow class DBLoadStateException(Exception): @@ -154,7 +108,7 @@ def do_analyze(self): def insert_or_update_bulk(self, cc_rows): return self.insert_or_update_batch(cc_rows) - def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False, suppress_jobs=False): + def insert_or_update_batch(self, cc_rows: List[CovidcastRow], batch_size=2**20, commit_partial=False, suppress_jobs=False): """ Insert new rows into the load table and dispatch into dimension and fact tables. """ From 4178862b21b366b4864d7b7976a3d41faa47f9a8 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Sat, 21 Jan 2023 05:35:18 -0800 Subject: [PATCH 16/17] Acquisition: update csv_importer to use CovidcastRow --- src/acquisition/covidcast/csv_importer.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index 3c559be84..6dce65e26 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -398,9 +398,8 @@ def load_csv(filepath: str, details: PathDetails) -> Iterator[Optional[Covidcast details.issue, details.lag, # These four fields are unused by database acquisition - # TODO: These will be used when CovidcastRow is updated. - # id=None, - # direction=None, - # direction_updated_timestamp=0, - # value_updated_timestamp=0, + id=None, + direction=None, + direction_updated_timestamp=0, + value_updated_timestamp=0, ) From b3cba4c7fd35b9231abb4be8e5050a56d6574ff5 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Thu, 8 Dec 2022 17:09:06 -0800 Subject: [PATCH 17/17] CovidcastRow: address code review #1044 Co-authored-by: Katie Mazaitis Co-authored-by: melange396 --- integrations/acquisition/covidcast/test_db.py | 13 +- .../covidcast/test_delete_batch.py | 3 +- integrations/client/test_delphi_epidata.py | 43 ++-- integrations/server/test_covidcast.py | 56 +++-- .../server/test_covidcast_endpoints.py | 76 +++---- src/acquisition/covidcast/covidcast_row.py | 207 +++++------------- src/acquisition/covidcast/csv_importer.py | 29 ++- src/acquisition/covidcast/database.py | 1 - src/acquisition/covidcast/test_utils.py | 157 ++++++++++++- src/server/_params.py | 4 - src/server/endpoints/covidcast.py | 2 +- src/server/endpoints/covidcast_utils/model.py | 22 -- .../covidcast/test_covidcast_row.py | 133 ++++++----- 13 files changed, 374 insertions(+), 372 deletions(-) diff --git a/integrations/acquisition/covidcast/test_db.py b/integrations/acquisition/covidcast/test_db.py index 63d7b9a43..7b9d80770 100644 --- a/integrations/acquisition/covidcast/test_db.py +++ b/integrations/acquisition/covidcast/test_db.py @@ -1,8 +1,7 @@ from delphi_utils import Nans from delphi.epidata.acquisition.covidcast.database import DBLoadStateException -from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow -from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase, CovidcastTestRow # all the Nans we use here are just one value, so this is a shortcut to it: @@ -11,7 +10,7 @@ class TestTest(CovidcastBase): def _find_matches_for_row(self, row): - # finds (if existing) row from both history and latest views that matches long-key of provided CovidcastRow + # finds (if existing) row from both history and latest views that matches long-key of provided CovidcastTestRow cols = "source signal time_type time_value geo_type geo_value issue".split() results = {} cur = self._db._cursor @@ -31,8 +30,8 @@ def _find_matches_for_row(self, row): def test_insert_or_update_with_nonempty_load_table(self): # make rows - a_row = CovidcastRow.make_default_row(time_value=20200202) - another_row = CovidcastRow.make_default_row(time_value=20200203, issue=20200203) + a_row = CovidcastTestRow.make_default_row(time_value=2020_02_02) + another_row = CovidcastTestRow.make_default_row(time_value=2020_02_03, issue=2020_02_03) # insert one self._db.insert_or_update_bulk([a_row]) # put something into the load table @@ -61,7 +60,7 @@ def test_id_sync(self): latest_view = 'epimetric_latest_v' # add a data point - base_row = CovidcastRow.make_default_row() + base_row = CovidcastTestRow.make_default_row() self._insert_rows([base_row]) # ensure the primary keys match in the latest and history tables matches = self._find_matches_for_row(base_row) @@ -71,7 +70,7 @@ def test_id_sync(self): old_pk_id = matches[latest_view][pk_column] # add a reissue for said data point - next_row = CovidcastRow.make_default_row() + next_row = CovidcastTestRow.make_default_row() next_row.issue += 1 self._insert_rows([next_row]) # ensure the new keys also match diff --git a/integrations/acquisition/covidcast/test_delete_batch.py b/integrations/acquisition/covidcast/test_delete_batch.py index e0957b4da..4624df27c 100644 --- a/integrations/acquisition/covidcast/test_delete_batch.py +++ b/integrations/acquisition/covidcast/test_delete_batch.py @@ -8,7 +8,7 @@ # first party import delphi.operations.secrets as secrets from delphi.epidata.acquisition.covidcast.database import Database -from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow, covidcast_rows_from_args +from delphi.epidata.acquisition.covidcast.test_utils import covidcast_rows_from_args # py3tester coverage target (equivalent to `import *`) __test_target__ = 'delphi.epidata.acquisition.covidcast.database' @@ -57,6 +57,7 @@ def _test_delete_batch(self, cc_deletions): time_value = [0] * 5 + [1] * 5 + [0], geo_value = ["d_nonlatest"] * 2 + ["d_latest"] * 3 + ["d_nonlatest"] * 2 + ["d_latest"] * 3 + ["d_justone"], issue = [1, 2] + [1, 2, 3] + [1, 2] + [1, 2, 3] + [1], + sanitize_fields = True ) self._db.insert_or_update_bulk(rows) diff --git a/integrations/client/test_delphi_epidata.py b/integrations/client/test_delphi_epidata.py index 0ebbff0aa..82c1452ec 100644 --- a/integrations/client/test_delphi_epidata.py +++ b/integrations/client/test_delphi_epidata.py @@ -12,8 +12,7 @@ # third party import delphi.operations.secrets as secrets from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main as update_covidcast_meta_cache -from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow -from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase, CovidcastTestRow from delphi.epidata.client.delphi_epidata import Epidata from delphi_utils import Nans @@ -52,13 +51,11 @@ def test_covidcast(self): # insert placeholder data: three issues of one signal, one issue of another rows = [ - CovidcastRow.make_default_row(issue=2020_02_02 + i, value=i, lag=i) + CovidcastTestRow.make_default_row(issue=2020_02_02 + i, value=i, lag=i) for i in range(3) ] row_latest_issue = rows[-1] - rows.append( - CovidcastRow.make_default_row(signal="sig2") - ) + rows.append(CovidcastTestRow.make_default_row(signal="sig2")) self._insert_rows(rows) with self.subTest(name='request two signals'): @@ -68,8 +65,8 @@ def test_covidcast(self): ) expected = [ - row_latest_issue.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields), - rows[-1].as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) + row_latest_issue.as_api_compatibility_row_dict(), + rows[-1].as_api_compatibility_row_dict() ] self.assertEqual(response['epidata'], expected) @@ -88,10 +85,10 @@ def test_covidcast(self): expected = [{ rows[0].signal: [ - row_latest_issue.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields + ['signal']), + row_latest_issue.as_api_compatibility_row_dict(ignore_fields=['signal']), ], rows[-1].signal: [ - rows[-1].as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields + ['signal']), + rows[-1].as_api_compatibility_row_dict(ignore_fields=['signal']), ], }] @@ -108,7 +105,7 @@ def test_covidcast(self): **self.params_from_row(rows[0]) ) - expected = [row_latest_issue.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields)] + expected = [row_latest_issue.as_api_compatibility_row_dict()] # check result self.assertEqual(response_1, { @@ -123,7 +120,7 @@ def test_covidcast(self): **self.params_from_row(rows[0], as_of=rows[1].issue) ) - expected = [rows[1].as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields)] + expected = [rows[1].as_api_compatibility_row_dict()] # check result self.maxDiff=None @@ -140,8 +137,8 @@ def test_covidcast(self): ) expected = [ - rows[0].as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields), - rows[1].as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) + rows[0].as_api_compatibility_row_dict(), + rows[1].as_api_compatibility_row_dict() ] # check result @@ -157,7 +154,7 @@ def test_covidcast(self): **self.params_from_row(rows[0], lag=2) ) - expected = [row_latest_issue.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields)] + expected = [row_latest_issue.as_api_compatibility_row_dict()] # check result self.assertDictEqual(response_3, { @@ -222,16 +219,16 @@ def test_geo_value(self): # insert placeholder data: three counties, three MSAs N = 3 rows = [ - CovidcastRow.make_default_row(geo_type="county", geo_value=str(i)*5, value=i) + CovidcastTestRow.make_default_row(geo_type="county", geo_value=str(i)*5, value=i) for i in range(N) ] + [ - CovidcastRow.make_default_row(geo_type="msa", geo_value=str(i)*5, value=i*10) + CovidcastTestRow.make_default_row(geo_type="msa", geo_value=str(i)*5, value=i*10) for i in range(N) ] self._insert_rows(rows) counties = [ - rows[i].as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) for i in range(N) + rows[i].as_api_compatibility_row_dict() for i in range(N) ] def fetch(geo): @@ -277,7 +274,11 @@ def test_covidcast_meta(self): # 2nd issue: 1 11 21 # 3rd issue: 2 12 22 rows = [ - CovidcastRow.make_default_row(time_value=DEFAULT_TIME_VALUE + t, issue=DEFAULT_ISSUE + i, value=t*10 + i) + CovidcastTestRow.make_default_row( + time_value=DEFAULT_TIME_VALUE + t, + issue=DEFAULT_ISSUE + i, + value=t*10 + i + ) for i in range(3) for t in range(3) ] self._insert_rows(rows) @@ -324,10 +325,10 @@ def test_async_epidata(self): # insert placeholder data: three counties, three MSAs N = 3 rows = [ - CovidcastRow.make_default_row(geo_type="county", geo_value=str(i)*5, value=i) + CovidcastTestRow.make_default_row(geo_type="county", geo_value=str(i)*5, value=i) for i in range(N) ] + [ - CovidcastRow.make_default_row(geo_type="msa", geo_value=str(i)*5, value=i*10) + CovidcastTestRow.make_default_row(geo_type="msa", geo_value=str(i)*5, value=i*10) for i in range(N) ] self._insert_rows(rows) diff --git a/integrations/server/test_covidcast.py b/integrations/server/test_covidcast.py index 6a16c9df1..c3b50206d 100644 --- a/integrations/server/test_covidcast.py +++ b/integrations/server/test_covidcast.py @@ -10,8 +10,7 @@ # first party from delphi_utils import Nans -from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow -from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase, CovidcastTestRow # use the local instance of the Epidata API BASE_URL = 'http://delphi_web_epidata/epidata/api.php' @@ -23,7 +22,7 @@ def localSetUp(self): """Perform per-test setup.""" self._db._cursor.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') - def request_based_on_row(self, row: CovidcastRow, extract_response: Callable = lambda x: x.json(), **kwargs): + def request_based_on_row(self, row: CovidcastTestRow, extract_response: Callable = lambda x: x.json(), **kwargs): params = self.params_from_row(row, endpoint='covidcast', **kwargs) response = requests.get(BASE_URL, params=params) response.raise_for_status() @@ -32,17 +31,17 @@ def request_based_on_row(self, row: CovidcastRow, extract_response: Callable = l return response def _insert_placeholder_set_one(self): - row = CovidcastRow.make_default_row() + row = CovidcastTestRow.make_default_row() self._insert_rows([row]) return row def _insert_placeholder_set_two(self): rows = [ - CovidcastRow.make_default_row(geo_type='county', geo_value=str(i)*5, value=i*1., stderr=i*10., sample_size=i*100.) + CovidcastTestRow.make_default_row(geo_type='county', geo_value=str(i)*5, value=i*1., stderr=i*10., sample_size=i*100.) for i in [1, 2, 3] ] + [ # geo value intended to overlap with counties above - CovidcastRow.make_default_row(geo_type='msa', geo_value=str(i-3)*5, value=i*1., stderr=i*10., sample_size=i*100.) + CovidcastTestRow.make_default_row(geo_type='msa', geo_value=str(i-3)*5, value=i*1., stderr=i*10., sample_size=i*100.) for i in [4, 5, 6] ] self._insert_rows(rows) @@ -50,11 +49,11 @@ def _insert_placeholder_set_two(self): def _insert_placeholder_set_three(self): rows = [ - CovidcastRow.make_default_row(geo_type='county', geo_value='11111', time_value=2000_01_01+i, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03, lag=2-i) + CovidcastTestRow.make_default_row(geo_type='county', geo_value='11111', time_value=2000_01_01+i, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03, lag=2-i) for i in [1, 2, 3] ] + [ # time value intended to overlap with 11111 above, with disjoint geo values - CovidcastRow.make_default_row(geo_type='county', geo_value=str(i)*5, time_value=2000_01_01+i-3, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03, lag=5-i) + CovidcastTestRow.make_default_row(geo_type='county', geo_value=str(i)*5, time_value=2000_01_01+i-3, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03, lag=5-i) for i in [4, 5, 6] ] self._insert_rows(rows) @@ -62,11 +61,11 @@ def _insert_placeholder_set_three(self): def _insert_placeholder_set_four(self): rows = [ - CovidcastRow.make_default_row(source='src1', signal=str(i)*5, value=i*1., stderr=i*10., sample_size=i*100.) + CovidcastTestRow.make_default_row(source='src1', signal=str(i)*5, value=i*1., stderr=i*10., sample_size=i*100.) for i in [1, 2, 3] ] + [ # signal intended to overlap with the signal above - CovidcastRow.make_default_row(source='src2', signal=str(i-3)*5, value=i*1., stderr=i*10., sample_size=i*100.) + CovidcastTestRow.make_default_row(source='src2', signal=str(i-3)*5, value=i*1., stderr=i*10., sample_size=i*100.) for i in [4, 5, 6] ] self._insert_rows(rows) @@ -74,11 +73,11 @@ def _insert_placeholder_set_four(self): def _insert_placeholder_set_five(self): rows = [ - CovidcastRow.make_default_row(time_value=2000_01_01, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03+i) + CovidcastTestRow.make_default_row(time_value=2000_01_01, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03+i) for i in [1, 2, 3] ] + [ # different time_values, same issues - CovidcastRow.make_default_row(time_value=2000_01_01+i-3, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03+i-3) + CovidcastTestRow.make_default_row(time_value=2000_01_01+i-3, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03+i-3) for i in [4, 5, 6] ] self._insert_rows(rows) @@ -93,7 +92,7 @@ def test_round_trip(self): # make the request response = self.request_based_on_row(row) - expected = [row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields)] + expected = [row.as_api_compatibility_row_dict()] self.assertEqual(response, { 'result': 1, @@ -160,13 +159,13 @@ def test_csv_format(self): **{'format':'csv'} ) - # TODO: This is a mess because of api.php. + # This is a hardcoded mess because of api.php. column_order = [ "geo_value", "signal", "time_value", "direction", "issue", "lag", "missing_value", "missing_stderr", "missing_sample_size", "value", "stderr", "sample_size" ] expected = ( - row.api_compatibility_row_df + row.as_api_compatibility_row_df() .assign(direction = None) .to_csv(columns=column_order, index=False) ) @@ -183,7 +182,7 @@ def test_raw_json_format(self): # make the request response = self.request_based_on_row(row, **{'format':'json'}) - expected = [row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields)] + expected = [row.as_api_compatibility_row_dict()] # assert that the right data came back self.assertEqual(response, expected) @@ -197,7 +196,7 @@ def test_fields(self): # limit fields response = self.request_based_on_row(row, fields='time_value,geo_value') - expected = row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) + expected = row.as_api_compatibility_row_dict() expected_all = { 'result': 1, 'epidata': [{ @@ -230,7 +229,7 @@ def test_location_wildcard(self): # insert placeholder data rows = self._insert_placeholder_set_two() - expected = [row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) for row in rows[:3]] + expected = [row.as_api_compatibility_row_dict() for row in rows[:3]] # make the request response = self.request_based_on_row(rows[0], geo_value="*") @@ -247,7 +246,7 @@ def test_time_values_wildcard(self): # insert placeholder data rows = self._insert_placeholder_set_three() - expected = [row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) for row in rows[:3]] + expected = [row.as_api_compatibility_row_dict() for row in rows[:3]] # make the request response = self.request_based_on_row(rows[0], time_values="*") @@ -265,7 +264,7 @@ def test_issues_wildcard(self): # insert placeholder data rows = self._insert_placeholder_set_five() - expected = [row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) for row in rows[:3]] + expected = [row.as_api_compatibility_row_dict() for row in rows[:3]] # make the request response = self.request_based_on_row(rows[0], issues="*") @@ -283,7 +282,7 @@ def test_signal_wildcard(self): # insert placeholder data rows = self._insert_placeholder_set_four() - expected_signals = [row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) for row in rows[:3]] + expected_signals = [row.as_api_compatibility_row_dict() for row in rows[:3]] # make the request response = self.request_based_on_row(rows[0], signals="*") @@ -301,7 +300,7 @@ def test_geo_value(self): # insert placeholder data rows = self._insert_placeholder_set_two() - expected = [row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) for row in rows[:3]] + expected = [row.as_api_compatibility_row_dict() for row in rows[:3]] def fetch(geo_value): # make the request @@ -337,7 +336,7 @@ def test_location_timeline(self): # insert placeholder data rows = self._insert_placeholder_set_three() - expected_timeseries = [row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) for row in rows[:3]] + expected_timeseries = [row.as_api_compatibility_row_dict() for row in rows[:3]] # make the request response = self.request_based_on_row(rows[0], time_values='20000101-20000105') @@ -366,7 +365,7 @@ def test_unique_key_constraint(self): def test_nullable_columns(self): """Missing values should be surfaced as null.""" - row = CovidcastRow.make_default_row( + row = CovidcastTestRow.make_default_row( stderr=None, sample_size=None, missing_stderr=Nans.OTHER.value, missing_sample_size=Nans.OTHER.value ) @@ -374,8 +373,7 @@ def test_nullable_columns(self): # make the request response = self.request_based_on_row(row) - expected = row.as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) - # expected.update(stderr=None, sample_size=None) + expected = row.as_api_compatibility_row_dict() # assert that the right data came back self.assertEqual(response, { @@ -389,14 +387,14 @@ def test_temporal_partitioning(self): # insert placeholder data rows = [ - CovidcastRow.make_default_row(time_type=tt) + CovidcastTestRow.make_default_row(time_type=tt) for tt in "hour day week month year".split() ] self._insert_rows(rows) # make the request - response = self.request_based_on_row(rows[1], time_values="20000101-30010201") - expected = [rows[1].as_dict(ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields)] + response = self.request_based_on_row(rows[1], time_values="*") + expected = [rows[1].as_api_compatibility_row_dict()] # assert that the right data came back self.assertEqual(response, { diff --git a/integrations/server/test_covidcast_endpoints.py b/integrations/server/test_covidcast_endpoints.py index 233d0ff50..41d942456 100644 --- a/integrations/server/test_covidcast_endpoints.py +++ b/integrations/server/test_covidcast_endpoints.py @@ -1,26 +1,16 @@ """Integration tests for the custom `covidcast/*` endpoints.""" # standard library -from copy import copy -from itertools import accumulate, chain -from typing import Iterable, Dict, Any, List, Sequence -import unittest from io import StringIO - -# from typing import Optional -from dataclasses import dataclass +from typing import Sequence # third party -import mysql.connector -from more_itertools import interleave_longest, windowed +from more_itertools import windowed import requests import pandas as pd -import numpy as np -from delphi_utils import Nans from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main as update_cache -from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow -from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase, CovidcastTestRow # use the local instance of the Epidata API BASE_URL = "http://delphi_web_epidata/epidata/covidcast" @@ -39,6 +29,8 @@ def _fetch(self, endpoint="/", is_compatibility=False, **params): # make the request if is_compatibility: url = BASE_URL_OLD + # only set endpoint if it's not already set + # only set endpoint if it's not already set params.setdefault("endpoint", "covidcast") if params.get("source"): params.setdefault("data_source", params.get("source")) @@ -49,7 +41,10 @@ def _fetch(self, endpoint="/", is_compatibility=False, **params): return response.json() def _diff_rows(self, rows: Sequence[float]): - return [float(x - y) if x is not None and y is not None else None for x, y in zip(rows[1:], rows[:-1])] + return [ + float(x - y) if x is not None and y is not None else None + for x, y in zip(rows[1:], rows[:-1]) + ] def _smooth_rows(self, rows: Sequence[float]): return [ @@ -59,7 +54,7 @@ def _smooth_rows(self, rows: Sequence[float]): def test_basic(self): """Request a signal from the / endpoint.""" - rows = [CovidcastRow.make_default_row(time_value=20200401 + i, value=i) for i in range(10)] + rows = [CovidcastTestRow.make_default_row(time_value=2020_04_01 + i, value=i) for i in range(10)] first = rows[0] self._insert_rows(rows) @@ -68,12 +63,12 @@ def test_basic(self): self.assertEqual(out["result"], -1) with self.subTest("simple"): - out = self._fetch("/", signal=first.signal_pair, geo=first.geo_pair, time="day:*") + out = self._fetch("/", signal=first.signal_pair(), geo=first.geo_pair(), time="day:*") self.assertEqual(len(out["epidata"]), len(rows)) def test_compatibility(self): """Request at the /api.php endpoint.""" - rows = [CovidcastRow.make_default_row(source="src", signal="sig", time_value=20200401 + i, value=i) for i in range(10)] + rows = [CovidcastTestRow.make_default_row(source="src", signal="sig", time_value=2020_04_01 + i, value=i) for i in range(10)] first = rows[0] self._insert_rows(rows) @@ -82,20 +77,20 @@ def test_compatibility(self): self.assertEqual(out["result"], -1) with self.subTest("simple"): - out = self._fetch("/", signal=first.signal_pair, geo=first.geo_pair, time="day:*", is_compatibility=True) + out = self._fetch("/", signal=first.signal_pair(), geo=first.geo_pair(), time="day:*", is_compatibility=True) self.assertEqual(len(out["epidata"]), len(rows)) def test_trend(self): """Request a signal from the /trend endpoint.""" num_rows = 30 - rows = [CovidcastRow.make_default_row(time_value=20200401 + i, value=i) for i in range(num_rows)] + rows = [CovidcastTestRow.make_default_row(time_value=2020_04_01 + i, value=i) for i in range(num_rows)] first = rows[0] last = rows[-1] ref = rows[num_rows // 2] self._insert_rows(rows) - out = self._fetch("/trend", signal=first.signal_pair, geo=first.geo_pair, date=last.time_value, window="20200401-20201212", basis=ref.time_value) + out = self._fetch("/trend", signal=first.signal_pair(), geo=first.geo_pair(), date=last.time_value, window="20200401-20201212", basis=ref.time_value) self.assertEqual(out["result"], 1) @@ -125,12 +120,12 @@ def test_trendseries(self): """Request a signal from the /trendseries endpoint.""" num_rows = 3 - rows = [CovidcastRow.make_default_row(time_value=20200401 + i, value=num_rows - i) for i in range(num_rows)] + rows = [CovidcastTestRow.make_default_row(time_value=2020_04_01 + i, value=num_rows - i) for i in range(num_rows)] first = rows[0] last = rows[-1] self._insert_rows(rows) - out = self._fetch("/trendseries", signal=first.signal_pair, geo=first.geo_pair, date=last.time_value, window="20200401-20200410", basis=1) + out = self._fetch("/trendseries", signal=first.signal_pair(), geo=first.geo_pair(), date=last.time_value, window="20200401-20200410", basis=1) self.assertEqual(out["result"], 1) self.assertEqual(len(out["epidata"]), 3) @@ -191,15 +186,15 @@ def test_correlation(self): """Request a signal from the /correlation endpoint.""" num_rows = 30 - reference_rows = [CovidcastRow.make_default_row(signal="ref", time_value=20200401 + i, value=i) for i in range(num_rows)] + reference_rows = [CovidcastTestRow.make_default_row(signal="ref", time_value=20200401 + i, value=i) for i in range(num_rows)] first = reference_rows[0] self._insert_rows(reference_rows) - other_rows = [CovidcastRow.make_default_row(signal="other", time_value=20200401 + i, value=i) for i in range(num_rows)] + other_rows = [CovidcastTestRow.make_default_row(signal="other", time_value=20200401 + i, value=i) for i in range(num_rows)] other = other_rows[0] self._insert_rows(other_rows) max_lag = 3 - out = self._fetch("/correlation", reference=first.signal_pair, others=other.signal_pair, geo=first.geo_pair, window="20200401-20201212", lag=max_lag) + out = self._fetch("/correlation", reference=first.signal_pair(), others=other.signal_pair(), geo=first.geo_pair(), window="20200401-20201212", lag=max_lag) self.assertEqual(out["result"], 1) df = pd.DataFrame(out["epidata"]) self.assertEqual(len(df), max_lag * 2 + 1) # -...0...+ @@ -217,26 +212,33 @@ def test_correlation(self): def test_csv(self): """Request a signal from the /csv endpoint.""" - rows = [CovidcastRow.make_default_row(time_value=20200401 + i, value=i) for i in range(10)] + rows = [CovidcastTestRow.make_default_row(time_value=2020_04_01 + i, value=i) for i in range(10)] first = rows[0] self._insert_rows(rows) response = requests.get( f"{BASE_URL}/csv", - params=dict(signal=first.signal_pair, start_day="2020-04-01", end_day="2020-12-12", geo_type=first.geo_type), + params=dict(signal=first.signal_pair(), start_day="2020-04-01", end_day="2020-12-12", geo_type=first.geo_type), ) + response.raise_for_status() + out = response.text + df = pd.read_csv(StringIO(out), index_col=0) + self.assertEqual(df.shape, (len(rows), 10)) + self.assertEqual(list(df.columns), ["geo_value", "signal", "time_value", "issue", "lag", "value", "stderr", "sample_size", "geo_type", "data_source"]) + def test_backfill(self): """Request a signal from the /backfill endpoint.""" + TEST_DATE_VALUE = 2020_04_01 num_rows = 10 - issue_0 = [CovidcastRow.make_default_row(time_value=20200401 + i, value=i, sample_size=1, lag=0, issue=20200401 + i) for i in range(num_rows)] - issue_1 = [CovidcastRow.make_default_row(time_value=20200401 + i, value=i + 1, sample_size=2, lag=1, issue=20200401 + i + 1) for i in range(num_rows)] - last_issue = [CovidcastRow.make_default_row(time_value=20200401 + i, value=i + 2, sample_size=3, lag=2, issue=20200401 + i + 2) for i in range(num_rows)] # <-- the latest issues + issue_0 = [CovidcastTestRow.make_default_row(time_value=TEST_DATE_VALUE + i, value=i, sample_size=1, lag=0, issue=TEST_DATE_VALUE + i) for i in range(num_rows)] + issue_1 = [CovidcastTestRow.make_default_row(time_value=TEST_DATE_VALUE + i, value=i + 1, sample_size=2, lag=1, issue=TEST_DATE_VALUE + i + 1) for i in range(num_rows)] + last_issue = [CovidcastTestRow.make_default_row(time_value=TEST_DATE_VALUE + i, value=i + 2, sample_size=3, lag=2, issue=TEST_DATE_VALUE + i + 2) for i in range(num_rows)] # <-- the latest issues self._insert_rows([*issue_0, *issue_1, *last_issue]) first = issue_0[0] - out = self._fetch("/backfill", signal=first.signal_pair, geo=first.geo_pair, time="day:20200401-20201212", anchor_lag=3) + out = self._fetch("/backfill", signal=first.signal_pair(), geo=first.geo_pair(), time="day:20200401-20201212", anchor_lag=3) self.assertEqual(out["result"], 1) df = pd.DataFrame(out["epidata"]) self.assertEqual(len(df), 3 * num_rows) # num issues @@ -258,7 +260,7 @@ def test_meta(self): """Request a signal from the /meta endpoint.""" num_rows = 10 - rows = [CovidcastRow.make_default_row(time_value=20200401 + i, value=i, source="fb-survey", signal="smoothed_cli") for i in range(num_rows)] + rows = [CovidcastTestRow.make_default_row(time_value=2020_04_01 + i, value=i, source="fb-survey", signal="smoothed_cli") for i in range(num_rows)] self._insert_rows(rows) first = rows[0] last = rows[-1] @@ -298,23 +300,23 @@ def test_coverage(self): """Request a signal from the /coverage endpoint.""" num_geos_per_date = [10, 20, 30, 40, 44] - dates = [20200401 + i for i in range(len(num_geos_per_date))] - rows = [CovidcastRow.make_default_row(time_value=dates[i], value=i, geo_value=str(geo_value)) for i, num_geo in enumerate(num_geos_per_date) for geo_value in range(num_geo)] + dates = [2020_04_01 + i for i in range(len(num_geos_per_date))] + rows = [CovidcastTestRow.make_default_row(time_value=dates[i], value=i, geo_value=str(geo_value)) for i, num_geo in enumerate(num_geos_per_date) for geo_value in range(num_geo)] self._insert_rows(rows) first = rows[0] with self.subTest("default"): - out = self._fetch("/coverage", signal=first.signal_pair, geo_type=first.geo_type, latest=dates[-1], format="json") + out = self._fetch("/coverage", signal=first.signal_pair(), geo_type=first.geo_type, latest=dates[-1], format="json") self.assertEqual(len(out), len(num_geos_per_date)) self.assertEqual([o["time_value"] for o in out], dates) self.assertEqual([o["count"] for o in out], num_geos_per_date) with self.subTest("specify window"): - out = self._fetch("/coverage", signal=first.signal_pair, geo_type=first.geo_type, window=f"{dates[0]}-{dates[1]}", format="json") + out = self._fetch("/coverage", signal=first.signal_pair(), geo_type=first.geo_type, window=f"{dates[0]}-{dates[1]}", format="json") self.assertEqual(len(out), 2) self.assertEqual([o["time_value"] for o in out], dates[:2]) self.assertEqual([o["count"] for o in out], num_geos_per_date[:2]) with self.subTest("invalid geo_type"): - out = self._fetch("/coverage", signal=first.signal_pair, geo_type="doesnt_exist", format="json") + out = self._fetch("/coverage", signal=first.signal_pair(), geo_type="doesnt_exist", format="json") self.assertEqual(len(out), 0) diff --git a/src/acquisition/covidcast/covidcast_row.py b/src/acquisition/covidcast/covidcast_row.py index c5b2973f1..23e19eb57 100644 --- a/src/acquisition/covidcast/covidcast_row.py +++ b/src/acquisition/covidcast/covidcast_row.py @@ -1,13 +1,29 @@ -from dataclasses import asdict, dataclass, fields -from datetime import date -from typing import Any, ClassVar, Dict, Iterable, List, Optional +from dataclasses import asdict, dataclass +from typing import Any, ClassVar, Dict, List, Optional import pandas as pd -from delphi_utils import Nans -from delphi.epidata.server.utils.dates import day_to_time_value, time_value_to_day -from delphi.epidata.server.endpoints.covidcast_utils.model import PANDAS_DTYPES +PANDAS_DTYPES = { + "source": str, + "signal": str, + "time_type": str, + "time_value": "Int64", + "geo_type": str, + "geo_value": str, + "value": float, + "stderr": float, + "sample_size": float, + "missing_value": "Int8", + "missing_stderr": "Int8", + "missing_sample_size": "Int8", + "issue": "Int64", + "lag": "Int64", + "id": "Int64", + "direction": "Int8", + "direction_updated_timestamp": "Int64", + "value_updated_timestamp": "Int64", +} @dataclass class CovidcastRow: @@ -36,71 +52,19 @@ class CovidcastRow: missing_value: int missing_stderr: int missing_sample_size: int - issue: Optional[int] - lag: Optional[int] - # The following four fields are only the database, but are not ingested at acquisition and not returned by the API. - id: Optional[int] - direction: Optional[int] - direction_updated_timestamp: int - value_updated_timestamp: int + issue: int + lag: int + # The following three fields are only the database, but are not ingested at acquisition and not returned by the API. + epimetric_id: Optional[int] = None + direction: Optional[int] = None + value_updated_timestamp: Optional[int] = 0 # Classvars. _db_row_ignore_fields: ClassVar = [] - _api_row_ignore_fields: ClassVar = ["id", "direction_updated_timestamp", "value_updated_timestamp"] - _api_row_compatibility_ignore_fields: ClassVar = ["id", "direction_updated_timestamp", "value_updated_timestamp", "source", "time_type", "geo_type"] - _pandas_dtypes: ClassVar = PANDAS_DTYPES + _api_row_ignore_fields: ClassVar = ["epimetric_id", "value_updated_timestamp"] + _api_row_compatibility_ignore_fields: ClassVar = _api_row_ignore_fields + ["source", "time_type", "geo_type"] - @staticmethod - def make_default_row(**kwargs) -> "CovidcastRow": - default_args = { - "source": "src", - "signal": "sig", - "time_type": "day", - "geo_type": "county", - "time_value": 20200202, - "geo_value": "01234", - "value": 10.0, - "stderr": 10.0, - "sample_size": 10.0, - "missing_value": Nans.NOT_MISSING.value, - "missing_stderr": Nans.NOT_MISSING.value, - "missing_sample_size": Nans.NOT_MISSING.value, - "issue": 20200202, - "lag": 0, - "id": None, - "direction": None, - "direction_updated_timestamp": 0, - "value_updated_timestamp": 20200202, - } - default_args.update(kwargs) - return CovidcastRow(**default_args) - - def __post_init__(self): - # Convert time values to ints by default. - self.time_value = day_to_time_value(self.time_value) if isinstance(self.time_value, date) else self.time_value - self.issue = day_to_time_value(self.issue) if isinstance(self.issue, date) else self.issue - self.value_updated_timestamp = day_to_time_value(self.value_updated_timestamp) if isinstance(self.value_updated_timestamp, date) else self.value_updated_timestamp - - def _sanity_check_fields(self, extra_checks: bool = True): - if self.issue and self.issue < self.time_value: - self.issue = self.time_value - - if self.issue: - self.lag = (time_value_to_day(self.issue) - time_value_to_day(self.time_value)).days - else: - self.lag = None - - # This sanity checking is already done in CsvImporter, but it's here so the testing class gets it too. - if pd.isna(self.value) and self.missing_value == Nans.NOT_MISSING: - self.missing_value = Nans.NOT_APPLICABLE.value if extra_checks else Nans.OTHER.value - - if pd.isna(self.stderr) and self.missing_stderr == Nans.NOT_MISSING: - self.missing_stderr = Nans.NOT_APPLICABLE.value if extra_checks else Nans.OTHER.value - - if pd.isna(self.sample_size) and self.missing_sample_size == Nans.NOT_MISSING: - self.missing_sample_size = Nans.NOT_APPLICABLE.value if extra_checks else Nans.OTHER.value - - return self + _pandas_dtypes: ClassVar = PANDAS_DTYPES def as_dict(self, ignore_fields: Optional[List[str]] = None) -> dict: d = asdict(self) @@ -108,6 +72,18 @@ def as_dict(self, ignore_fields: Optional[List[str]] = None) -> dict: for key in ignore_fields: del d[key] return d + + def as_api_row_dict(self, ignore_fields: Optional[List[str]] = None) -> dict: + """Returns a dict view into the row with the fields returned by the API server.""" + return self.as_dict(ignore_fields=self._api_row_ignore_fields + (ignore_fields or [])) + + def as_api_compatibility_row_dict(self, ignore_fields: Optional[List[str]] = None) -> dict: + """Returns a dict view into the row with the fields returned by the old API server (the PHP server).""" + return self.as_dict(ignore_fields=self._api_row_compatibility_ignore_fields + (ignore_fields or [])) + + def as_db_row_dict(self, ignore_fields: Optional[List[str]] = None) -> dict: + """Returns a dict view into the row with the fields returned by the database.""" + return self.as_dict(ignore_fields=self._db_row_ignore_fields + (ignore_fields or [])) def as_dataframe(self, ignore_fields: Optional[List[str]] = None) -> pd.DataFrame: df = pd.DataFrame.from_records([self.as_dict(ignore_fields=ignore_fields)]) @@ -115,105 +91,28 @@ def as_dataframe(self, ignore_fields: Optional[List[str]] = None) -> pd.DataFram df = set_df_dtypes(df, self._pandas_dtypes) return df - @property - def api_row_df(self) -> pd.DataFrame: + def as_api_row_df(self, ignore_fields: Optional[List[str]] = None) -> pd.DataFrame: """Returns a dataframe view into the row with the fields returned by the API server.""" - return self.as_dataframe(ignore_fields=self._api_row_ignore_fields) + return self.as_dataframe(ignore_fields=self._api_row_ignore_fields + (ignore_fields or [])) - @property - def api_compatibility_row_df(self) -> pd.DataFrame: + def as_api_compatibility_row_df(self, ignore_fields: Optional[List[str]] = None) -> pd.DataFrame: """Returns a dataframe view into the row with the fields returned by the old API server (the PHP server).""" - return self.as_dataframe(ignore_fields=self._api_row_compatibility_ignore_fields) + return self.as_dataframe(ignore_fields=self._api_row_compatibility_ignore_fields + (ignore_fields or [])) - @property - def db_row_df(self) -> pd.DataFrame: + def as_db_row_df(self, ignore_fields: Optional[List[str]] = None) -> pd.DataFrame: """Returns a dataframe view into the row with the fields returned by an all-field database query.""" - return self.as_dataframe(ignore_fields=self._db_row_ignore_fields) + return self.as_dataframe(ignore_fields=self._db_row_ignore_fields + (ignore_fields or [])) - @property def signal_pair(self): return f"{self.source}:{self.signal}" - @property def geo_pair(self): return f"{self.geo_type}:{self.geo_value}" - @property def time_pair(self): return f"{self.time_type}:{self.time_value}" -def covidcast_rows_from_args(sanity_check: bool = True, test_mode: bool = True, **kwargs: Dict[str, Iterable]) -> List[CovidcastRow]: - """A convenience constructor. - - Handy for constructing batches of test cases. - - Example: - covidcast_rows_from_args(value=[1, 2, 3], time_value=[1, 2, 3]) will yield - [CovidcastRow.make_default_row(value=1, time_value=1), CovidcastRow.make_default_row(value=2, time_value=2), CovidcastRow.make_default_row(value=3, time_value=3)] - with all the defaults from CovidcastRow. - """ - # If any iterables were passed instead of lists, convert them to lists. - kwargs = {key: list(value) for key, value in kwargs.items()} - # All the arg values must be lists of the same length. - assert len(set(len(lst) for lst in kwargs.values())) == 1 - - if sanity_check: - return [CovidcastRow.make_default_row(**_kwargs)._sanity_check_fields(extra_checks=test_mode) for _kwargs in transpose_dict(kwargs)] - else: - return [CovidcastRow.make_default_row(**_kwargs) for _kwargs in transpose_dict(kwargs)] - - -def covidcast_rows_from_records(records: Iterable[dict], sanity_check: bool = False) -> List[CovidcastRow]: - """A convenience constructor. - - Default is different from from_args, because from_records is usually called on faux-API returns in tests, - where we don't want any values getting default filled in. - - You can use csv.DictReader before this to read a CSV file. - """ - records = list(records) - return [CovidcastRow.make_default_row(**record) if not sanity_check else CovidcastRow.make_default_row(**record)._sanity_check_fields() for record in records] - - -def covidcast_rows_as_dicts(rows: Iterable[CovidcastRow], ignore_fields: Optional[List[str]] = None) -> List[dict]: - return [row.as_dict(ignore_fields=ignore_fields) for row in rows] - - -def covidcast_rows_as_dataframe(rows: Iterable[CovidcastRow], ignore_fields: Optional[List[str]] = None) -> pd.DataFrame: - if ignore_fields is None: - ignore_fields = [] - - columns = [field.name for field in fields(CovidcastRow) if field.name not in ignore_fields] - - if rows: - df = pd.concat([row.as_dataframe(ignore_fields=ignore_fields) for row in rows], ignore_index=True) - return df[columns] - else: - return pd.DataFrame(columns=columns) - - -def covidcast_rows_as_api_row_df(rows: Iterable[CovidcastRow]) -> pd.DataFrame: - return covidcast_rows_as_dataframe(rows, ignore_fields=CovidcastRow._api_row_ignore_fields) - - -def covidcast_rows_as_api_compatibility_row_df(rows: Iterable[CovidcastRow]) -> pd.DataFrame: - return covidcast_rows_as_dataframe(rows, ignore_fields=CovidcastRow._api_row_compatibility_ignore_fields) - - -def covidcast_rows_as_db_row_df(rows: Iterable[CovidcastRow]) -> pd.DataFrame: - return covidcast_rows_as_dataframe(rows, ignore_fields=CovidcastRow._db_row_ignore_fields) - - -def transpose_dict(d: Dict[Any, List[Any]]) -> List[Dict[Any, Any]]: - """Given a dictionary whose values are lists of the same length, turn it into a list of dictionaries whose values are the individual list entries. - - Example: - >>> transpose_dict(dict([["a", [2, 4, 6]], ["b", [3, 5, 7]], ["c", [10, 20, 30]]])) - [{"a": 2, "b": 3, "c": 10}, {"a": 4, "b": 5, "c": 20}, {"a": 6, "b": 7, "c": 30}] - """ - return [dict(zip(d.keys(), values)) for values in zip(*d.values())] - def check_valid_dtype(dtype): try: @@ -231,11 +130,3 @@ def set_df_dtypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame: if k in df.columns: df[k] = df[k].astype(v) return df - - -def assert_frame_equal_no_order(df1: pd.DataFrame, df2: pd.DataFrame, index: List[str], **kwargs: Any) -> None: - """Assert that two DataFrames are equal, ignoring the order of rows.""" - # Remove any existing index. If it wasn't named, drop it. Set a new index and sort it. - df1 = df1.reset_index().drop(columns="index").set_index(index).sort_index() - df2 = df2.reset_index().drop(columns="index").set_index(index).sort_index() - pd.testing.assert_frame_equal(df1, df2, **kwargs) diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index 6dce65e26..0fa936802 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -15,11 +15,27 @@ # first party from delphi_utils import Nans from delphi.utils.epiweek import delta_epiweeks -from delphi.epidata.acquisition.covidcast.database import CovidcastRow +from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow from delphi.epidata.acquisition.covidcast.logger import get_structured_logger -DFRow = NamedTuple('DFRow', [('geo_id', str), ('value', float), ('stderr', float), ('sample_size', float), ('missing_value', int), ('missing_stderr', int), ('missing_sample_size', int)]) -PathDetails = NamedTuple('PathDetails', [('issue', int), ('lag', int), ('source', str), ('signal', str), ('time_type', str), ('time_value', int), ('geo_type', str)]) +DataFrameRow = NamedTuple('DFRow', [ + ('geo_id', str), + ('value', float), + ('stderr', float), + ('sample_size', float), + ('missing_value', int), + ('missing_stderr', int), + ('missing_sample_size', int) +]) +PathDetails = NamedTuple('PathDetails', [ + ('issue', int), + ('lag', int), + ('source', str), + ("signal", str), + ('time_type', str), + ('time_value', int), + ('geo_type', str), +]) @dataclass @@ -268,7 +284,7 @@ def validate_missing_code(row, attr_quantity, attr_name, filepath=None, logger=N @staticmethod - def extract_and_check_row(row: DFRow, geo_type: str, filepath: Optional[str] = None) -> Tuple[Optional[CsvRowValue], Optional[str]]: + def extract_and_check_row(row: DataFrameRow, geo_type: str, filepath: Optional[str] = None) -> Tuple[Optional[CsvRowValue], Optional[str]]: """Extract and return `CsvRowValue` from a CSV row, with sanity checks. Also returns the name of the field which failed sanity check, or None. @@ -397,9 +413,4 @@ def load_csv(filepath: str, details: PathDetails) -> Iterator[Optional[Covidcast csv_row_values.missing_sample_size, details.issue, details.lag, - # These four fields are unused by database acquisition - id=None, - direction=None, - direction_updated_timestamp=0, - value_updated_timestamp=0, ) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index bd2b526fd..3beedac82 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -10,7 +10,6 @@ # third party import json -from typing import List import mysql.connector # first party diff --git a/src/acquisition/covidcast/test_utils.py b/src/acquisition/covidcast/test_utils.py index 45f9fbfd0..96db2c164 100644 --- a/src/acquisition/covidcast/test_utils.py +++ b/src/acquisition/covidcast/test_utils.py @@ -1,14 +1,151 @@ -from typing import Sequence +from dataclasses import fields +from datetime import date +from typing import Any, Dict, Iterable, List, Optional, Sequence import unittest +import pandas as pd + from delphi_utils import Nans from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow from delphi.epidata.acquisition.covidcast.database import Database +from delphi.epidata.server.utils.dates import day_to_time_value, time_value_to_day import delphi.operations.secrets as secrets # all the Nans we use here are just one value, so this is a shortcut to it: nmv = Nans.NOT_MISSING.value + +class CovidcastTestRow(CovidcastRow): + @staticmethod + def make_default_row(**kwargs) -> "CovidcastTestRow": + default_args = { + "source": "src", + "signal": "sig", + "time_type": "day", + "geo_type": "county", + "time_value": 2020_02_02, + "geo_value": "01234", + "value": 10.0, + "stderr": 10.0, + "sample_size": 10.0, + "missing_value": Nans.NOT_MISSING.value, + "missing_stderr": Nans.NOT_MISSING.value, + "missing_sample_size": Nans.NOT_MISSING.value, + "issue": 2020_02_02, + "lag": 0, + } + default_args.update(kwargs) + return CovidcastTestRow(**default_args) + + def __post_init__(self): + # Convert time values to ints by default. + if isinstance(self.time_value, date): + self.time_value = day_to_time_value(self.time_value) + if isinstance(self.issue, date): + self.issue = day_to_time_value(self.issue) + if isinstance(self.value_updated_timestamp, date): + self.value_updated_timestamp = day_to_time_value(self.value_updated_timestamp) + + def _sanitize_fields(self, extra_checks: bool = True): + if self.issue and self.issue < self.time_value: + self.issue = self.time_value + + if self.issue: + self.lag = (time_value_to_day(self.issue) - time_value_to_day(self.time_value)).days + else: + self.lag = None + + # This sanity checking is already done in CsvImporter, but it's here so the testing class gets it too. + if pd.isna(self.value) and self.missing_value == Nans.NOT_MISSING: + self.missing_value = Nans.NOT_APPLICABLE.value if extra_checks else Nans.OTHER.value + + if pd.isna(self.stderr) and self.missing_stderr == Nans.NOT_MISSING: + self.missing_stderr = Nans.NOT_APPLICABLE.value if extra_checks else Nans.OTHER.value + + if pd.isna(self.sample_size) and self.missing_sample_size == Nans.NOT_MISSING: + self.missing_sample_size = Nans.NOT_APPLICABLE.value if extra_checks else Nans.OTHER.value + + return self + + +def covidcast_rows_from_args(sanitize_fields: bool = False, test_mode: bool = True, **kwargs: Dict[str, Iterable]) -> List[CovidcastTestRow]: + """A convenience constructor for test rows. + + Example: + covidcast_rows_from_args(value=[1, 2, 3], time_value=[1, 2, 3]) will yield + [CovidcastTestRow.make_default_row(value=1, time_value=1), CovidcastTestRow.make_default_row(value=2, time_value=2), CovidcastTestRow.make_default_row(value=3, time_value=3)] + with all the defaults from CovidcastTestRow. + """ + # If any iterables were passed instead of lists, convert them to lists. + kwargs = {key: list(value) for key, value in kwargs.items()} + # All the arg values must be lists of the same length. + assert len(set(len(lst) for lst in kwargs.values())) == 1 + + if sanitize_fields: + return [CovidcastTestRow.make_default_row(**_kwargs)._sanitize_fields(extra_checks=test_mode) for _kwargs in transpose_dict(kwargs)] + else: + return [CovidcastTestRow.make_default_row(**_kwargs) for _kwargs in transpose_dict(kwargs)] + + +def covidcast_rows_from_records(records: Iterable[dict], sanity_check: bool = False) -> List[CovidcastTestRow]: + """A convenience constructor. + + Default is different from from_args, because from_records is usually called on faux-API returns in tests, + where we don't want any values getting default filled in. + + You can use csv.DictReader before this to read a CSV file. + """ + records = list(records) + return [CovidcastTestRow.make_default_row(**record) if not sanity_check else CovidcastTestRow.make_default_row(**record)._sanitize_fields() for record in records] + + +def covidcast_rows_as_dicts(rows: Iterable[CovidcastTestRow], ignore_fields: Optional[List[str]] = None) -> List[dict]: + return [row.as_dict(ignore_fields=ignore_fields) for row in rows] + + +def covidcast_rows_as_dataframe(rows: Iterable[CovidcastTestRow], ignore_fields: Optional[List[str]] = None) -> pd.DataFrame: + if ignore_fields is None: + ignore_fields = [] + + columns = [field.name for field in fields(CovidcastTestRow) if field.name not in ignore_fields] + + if rows: + df = pd.concat([row.as_dataframe(ignore_fields=ignore_fields) for row in rows], ignore_index=True) + return df[columns] + else: + return pd.DataFrame(columns=columns) + + +def covidcast_rows_as_api_row_df(rows: Iterable[CovidcastTestRow]) -> pd.DataFrame: + return covidcast_rows_as_dataframe(rows, ignore_fields=CovidcastTestRow._api_row_ignore_fields) + + +def covidcast_rows_as_api_compatibility_row_df(rows: Iterable[CovidcastTestRow]) -> pd.DataFrame: + return covidcast_rows_as_dataframe(rows, ignore_fields=CovidcastTestRow._api_row_compatibility_ignore_fields) + + +def covidcast_rows_as_db_row_df(rows: Iterable[CovidcastTestRow]) -> pd.DataFrame: + return covidcast_rows_as_dataframe(rows, ignore_fields=CovidcastTestRow._db_row_ignore_fields) + + +def transpose_dict(d: Dict[Any, List[Any]]) -> List[Dict[Any, Any]]: + """Given a dictionary whose values are lists of the same length, turn it into a list of dictionaries whose values are the individual list entries. + + Example: + >>> transpose_dict(dict([["a", [2, 4, 6]], ["b", [3, 5, 7]], ["c", [10, 20, 30]]])) + [{"a": 2, "b": 3, "c": 10}, {"a": 4, "b": 5, "c": 20}, {"a": 6, "b": 7, "c": 30}] + """ + return [dict(zip(d.keys(), values)) for values in zip(*d.values())] + + +def assert_frame_equal_no_order(df1: pd.DataFrame, df2: pd.DataFrame, index: List[str], **kwargs: Any) -> None: + """Assert that two DataFrames are equal, ignoring the order of rows.""" + # Remove any existing index. If it wasn't named, drop it. Set a new index and sort it. + df1 = df1.reset_index().drop(columns="index").set_index(index).sort_index() + df2 = df2.reset_index().drop(columns="index").set_index(index).sort_index() + pd.testing.assert_frame_equal(df1, df2, **kwargs) + + class CovidcastBase(unittest.TestCase): def setUp(self): # use the local test instance of the database @@ -24,29 +161,29 @@ def setUp(self): self.localSetUp() self._db._connection.commit() - def localSetUp(self): - # stub; override in subclasses to perform custom setup. - # runs after tables have been truncated but before database changes have been committed - pass - def tearDown(self): # close and destroy conenction to the database + self.localTearDown() self._db.disconnect(False) del self._db - self.localTearDown() + + def localSetUp(self): + # stub; override in subclasses to perform custom setup. + # runs after tables have been truncated but before database changes have been committed + pass def localTearDown(self): # stub; override in subclasses to perform custom teardown. # runs after database changes have been committed pass - def _insert_rows(self, rows: Sequence[CovidcastRow]): + def _insert_rows(self, rows: Sequence[CovidcastTestRow]): # inserts rows into the database using the full acquisition process, including 'dbjobs' load into history & latest tables n = self._db.insert_or_update_bulk(rows) print(f"{n} rows added to load table & dispatched to v4 schema") self._db._connection.commit() # NOTE: this isnt expressly needed for our test cases, but would be if using external access (like through client lib) to ensure changes are visible outside of this db session - def params_from_row(self, row: CovidcastRow, **kwargs): + def params_from_row(self, row: CovidcastTestRow, **kwargs): ret = { 'data_source': row.source, 'signals': row.signal, @@ -56,4 +193,4 @@ def params_from_row(self, row: CovidcastRow, **kwargs): 'geo_value': row.geo_value, } ret.update(kwargs) - return ret + return ret \ No newline at end of file diff --git a/src/server/_params.py b/src/server/_params.py index 4eace7be1..d0b1cda6d 100644 --- a/src/server/_params.py +++ b/src/server/_params.py @@ -111,10 +111,6 @@ class TimeSet: time_type: str time_values: Union[bool, TimeValues] - def __post_init__(self): - if isinstance(self.time_values, list): - self.time_values = [(min(time_value), max(time_value)) if isinstance(time_value, tuple) else time_value for time_value in self.time_values] - @property def is_week(self) -> bool: return self.time_type == 'week' diff --git a/src/server/endpoints/covidcast.py b/src/server/endpoints/covidcast.py index 97322d2cc..09b3d3740 100644 --- a/src/server/endpoints/covidcast.py +++ b/src/server/endpoints/covidcast.py @@ -328,7 +328,7 @@ def parse_row(i, row): "geo_value": row["geo_value"], "signal": row["signal"], "time_value": time_value_to_iso(row["time_value"]) if is_day else row["time_value"], - "issue": time_value_to_iso(row["issue"]) if is_day and row["issue"] is not None else row["issue"], + "issue": time_value_to_iso(row["issue"]) if is_day else row["issue"], "lag": row["lag"], "value": row["value"], "stderr": row["stderr"], diff --git a/src/server/endpoints/covidcast_utils/model.py b/src/server/endpoints/covidcast_utils/model.py index 5d2d5011b..4cc78888e 100644 --- a/src/server/endpoints/covidcast_utils/model.py +++ b/src/server/endpoints/covidcast_utils/model.py @@ -9,28 +9,6 @@ from ..._params import SourceSignalSet -PANDAS_DTYPES = { - "source": str, - "signal": str, - "time_type": str, - "time_value": "Int64", - "geo_type": str, - "geo_value": str, - "value": float, - "stderr": float, - "sample_size": float, - "missing_value": "Int8", - "missing_stderr": "Int8", - "missing_sample_size": "Int8", - "issue": "Int64", - "lag": "Int64", - "id": "Int64", - "direction": "Int8", - "direction_updated_timestamp": "Int64", - "value_updated_timestamp": "Int64", -} - - class HighValuesAre(str, Enum): bad = "bad" good = "good" diff --git a/tests/acquisition/covidcast/test_covidcast_row.py b/tests/acquisition/covidcast/test_covidcast_row.py index ab6930c5d..9462fd4ed 100644 --- a/tests/acquisition/covidcast/test_covidcast_row.py +++ b/tests/acquisition/covidcast/test_covidcast_row.py @@ -1,17 +1,16 @@ import unittest -from pandas import DataFrame, date_range +from pandas import DataFrame from pandas.testing import assert_frame_equal from delphi_utils.nancodes import Nans -from delphi.epidata.server.utils.dates import day_to_time_value -from delphi.epidata.acquisition.covidcast.covidcast_row import ( +from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow, set_df_dtypes +from delphi.epidata.acquisition.covidcast.test_utils import ( + CovidcastTestRow, covidcast_rows_as_api_compatibility_row_df, covidcast_rows_as_api_row_df, covidcast_rows_from_args, - set_df_dtypes, transpose_dict, - CovidcastRow ) # py3tester coverage target (equivalent to `import *`) @@ -19,85 +18,75 @@ class TestCovidcastRows(unittest.TestCase): + expected_df = set_df_dtypes(DataFrame({ + "source": ["src"] * 10, + "signal": ["sig_base"] * 5 + ["sig_other"] * 5, + "time_type": ["day"] * 10, + "geo_type": ["county"] * 10, + "time_value": [2021_05_01 + i for i in range(5)] * 2, + "geo_value": ["01234"] * 10, + "value": range(10), + "stderr": [10.0] * 10, + "sample_size": [10.0] * 10, + "missing_value": [Nans.NOT_MISSING] * 10, + "missing_stderr": [Nans.NOT_MISSING] * 10, + "missing_sample_size": [Nans.NOT_MISSING] * 10, + "issue": [2021_05_01 + i for i in range(5)] * 2, + "lag": [0] * 10, + "direction": [None] * 10 + }), CovidcastRow._pandas_dtypes) + def test_transpose_dict(self): - assert transpose_dict(dict([["a", [2, 4, 6]], ["b", [3, 5, 7]], ["c", [10, 20, 30]]])) == [{"a": 2, "b": 3, "c": 10}, {"a": 4, "b": 5, "c": 20}, {"a": 6, "b": 7, "c": 30}] + assert transpose_dict( + { + "a": [2, 4, 6], + "b": [3, 5, 7], + "c": [10, 20, 30] + } + ) == [ + {"a": 2, "b": 3, "c": 10}, + {"a": 4, "b": 5, "c": 20}, + {"a": 6, "b": 7, "c": 30} + ] def test_CovidcastRow(self): - df = CovidcastRow.make_default_row(value=5.0).api_row_df - expected_df = set_df_dtypes(DataFrame.from_records([{ - "source": "src", - "signal": "sig", - "time_type": "day", - "geo_type": "county", - "time_value": 20200202, - "geo_value": "01234", - "value": 5.0, - "stderr": 10.0, - "sample_size": 10.0, - "missing_value": Nans.NOT_MISSING, - "missing_stderr": Nans.NOT_MISSING, - "missing_sample_size": Nans.NOT_MISSING, - "issue": 20200202, - "lag": 0, - "direction": None - }]), dtypes = CovidcastRow._pandas_dtypes) + df = CovidcastTestRow.make_default_row( + signal="sig_base", + value=0.0, + time_value=2021_05_01, + issue=2021_05_01, + ).as_api_row_df() + expected_df = self.expected_df.iloc[0:1] assert_frame_equal(df, expected_df) - df = CovidcastRow.make_default_row(value=5.0).api_compatibility_row_df - expected_df = set_df_dtypes(DataFrame.from_records([{ - "signal": "sig", - "time_value": 20200202, - "geo_value": "01234", - "value": 5.0, - "stderr": 10.0, - "sample_size": 10.0, - "missing_value": Nans.NOT_MISSING, - "missing_stderr": Nans.NOT_MISSING, - "missing_sample_size": Nans.NOT_MISSING, - "issue": 20200202, - "lag": 0, - "direction": None - }]), dtypes = CovidcastRow._pandas_dtypes) + df = CovidcastTestRow.make_default_row( + signal="sig_base", + value=0.0, + time_value=2021_05_01, + issue=2021_05_01, + ).as_api_compatibility_row_df() + expected_df = self.expected_df.iloc[0:1][df.columns] assert_frame_equal(df, expected_df) def test_covidcast_rows(self): - covidcast_rows = covidcast_rows_from_args(signal=["sig_base"] * 5 + ["sig_other"] * 5, time_value=date_range("2021-05-01", "2021-05-05").to_list() * 2, value=list(range(10))) + covidcast_rows = covidcast_rows_from_args( + signal=["sig_base"] * 5 + ["sig_other"] * 5, + time_value=[2021_05_01 + i for i in range(5)] * 2, + value=list(range(10)), + sanitize_fields = True + ) df = covidcast_rows_as_api_row_df(covidcast_rows) - expected_df = set_df_dtypes(DataFrame({ - "source": ["src"] * 10, - "signal": ["sig_base"] * 5 + ["sig_other"] * 5, - "time_type": ["day"] * 10, - "geo_type": ["county"] * 10, - "time_value": map(day_to_time_value, date_range("2021-05-01", "2021-05-5").to_list() * 2), - "geo_value": ["01234"] * 10, - "value": range(10), - "stderr": [10.0] * 10, - "sample_size": [10.0] * 10, - "missing_value": [Nans.NOT_MISSING] * 10, - "missing_stderr": [Nans.NOT_MISSING] * 10, - "missing_sample_size": [Nans.NOT_MISSING] * 10, - "issue": map(day_to_time_value, date_range("2021-05-01", "2021-05-5").to_list() * 2), - "lag": [0] * 10, - "direction": [None] * 10 - }), CovidcastRow._pandas_dtypes) + expected_df = self.expected_df assert_frame_equal(df, expected_df) - covidcast_rows = covidcast_rows_from_args(signal=["sig_base"] * 5 + ["sig_other"] * 5, time_value=date_range("2021-05-01", "2021-05-05").to_list() * 2, value=list(range(10))) + covidcast_rows = covidcast_rows_from_args( + signal=["sig_base"] * 5 + ["sig_other"] * 5, + time_value=[2021_05_01 + i for i in range(5)] * 2, + value=list(range(10)), + sanitize_fields = True + ) df = covidcast_rows_as_api_compatibility_row_df(covidcast_rows) - expected_df = set_df_dtypes(DataFrame({ - "signal": ["sig_base"] * 5 + ["sig_other"] * 5, - "time_value": map(day_to_time_value, date_range("2021-05-01", "2021-05-5").to_list() * 2), - "geo_value": ["01234"] * 10, - "value": range(10), - "stderr": [10.0] * 10, - "sample_size": [10.0] * 10, - "missing_value": [Nans.NOT_MISSING] * 10, - "missing_stderr": [Nans.NOT_MISSING] * 10, - "missing_sample_size": [Nans.NOT_MISSING] * 10, - "issue": map(day_to_time_value, date_range("2021-05-01", "2021-05-5").to_list() * 2), - "lag": [0] * 10, - "direction": [None] * 10 - }), CovidcastRow._pandas_dtypes) + expected_df = self.expected_df[df.columns] assert_frame_equal(df, expected_df)