@@ -34,6 +34,26 @@ def sig_digit_round(value, n_digits):
3434 return result
3535
3636
37+ def convert_df_type (df , logger ):
38+ """convert types and warn if there are unexpected columns"""
39+ try :
40+ df = df .astype (TYPE_DICT )
41+ except KeyError as exc :
42+ raise KeyError (
43+ f"""
44+ Expected column(s) missed, The dataset schema may
45+ have changed. Please investigate and amend the code.
46+
47+ expected={ NEWLINE .join (sorted (type_dict .keys ()))}
48+
49+ received={ NEWLINE .join (sorted (df .columns ))}
50+ """
51+ ) from exc
52+ if new_columns := set (df .columns ) - set (TYPE_DICT .keys ()):
53+ logger .info ("New columns found in NWSS dataset." , new_columns = new_columns )
54+ return df
55+
56+
3757def reformat (df , df_metric ):
3858 """Add columns from df_metric to df, and rename some columns.
3959
@@ -112,25 +132,8 @@ def pull_nwss_data(token: str, logger):
112132 df_concentration = df_concentration .rename (columns = {"date" : "timestamp" })
113133
114134 # Schema checks.
115- try :
116- df_concentration = df_concentration .astype (TYPE_DICT )
117- except KeyError as exc :
118- raise KeyError (
119- f"Expected column(s) missed. Schema may have changed. expected={ sorted (TYPE_DICT .keys ())} received={ sorted (df_concentration .columns )} "
120- ) from exc
121-
122- if new_columns := set (df_concentration .columns ) - set (TYPE_DICT .keys ()):
123- logger .info ("New columns found in NWSS dataset." , new_columns = new_columns )
124-
125- try :
126- df_metric = df_metric .astype (TYPE_DICT_METRIC )
127- except KeyError as exc :
128- raise KeyError (
129- f"Expected column(s) missed. Schema may have changed. expected={ sorted (TYPE_DICT_METRIC .keys ())} received={ sorted (df_metric .columns )} "
130- ) from exc
131-
132- if new_columns := set (df_metric .columns ) - set (TYPE_DICT_METRIC .keys ()):
133- logger .info ("New columns found in NWSS dataset." , new_columns = new_columns )
135+ df_concentration = convert_df_type (df_concentration , logger )
136+ df_metric = convert_df_type (df_metric , logger )
134137
135138 # Drop sites without a normalization scheme.
136139 df = df_concentration [~ df_concentration ["normalization" ].isna ()]
0 commit comments