1
1
"""Common database code used by multiple `covid_hosp` scrapers."""
2
2
3
3
# standard library
4
+ from collections import namedtuple
4
5
from contextlib import contextmanager
5
6
import math
6
7
11
12
# first party
12
13
import delphi .operations .secrets as secrets
13
14
15
+ Columndef = namedtuple ("Columndef" , "csv_name sql_name dtype" )
14
16
15
17
class Database :
16
18
17
19
def __init__ (self ,
18
20
connection ,
19
21
table_name = None ,
20
22
columns_and_types = None ,
23
+ key_columns = None ,
21
24
additional_fields = None ):
22
25
"""Create a new Database object.
23
26
@@ -39,7 +42,11 @@ def __init__(self,
39
42
self .table_name = table_name
40
43
self .publication_col_name = "issue" if table_name == 'covid_hosp_state_timeseries' else \
41
44
'publication_date'
42
- self .columns_and_types = columns_and_types
45
+ self .columns_and_types = {
46
+ c .csv_name : c
47
+ for c in (columns_and_types if columns_and_types is not None else [])
48
+ }
49
+ self .key_columns = key_columns if key_columns is not None else []
43
50
self .additional_fields = additional_fields if additional_fields is not None else []
44
51
45
52
@classmethod
@@ -151,47 +158,49 @@ def insert_dataset(self, publication_date, dataframe):
151
158
The dataset.
152
159
"""
153
160
dataframe_columns_and_types = [
154
- x for x in self .columns_and_types if x [ 0 ] in dataframe .columns
161
+ x for x in self .columns_and_types . values () if x . csv_name in dataframe .columns
155
162
]
163
+
164
+ def nan_safe_dtype (dtype , value ):
165
+ if isinstance (value , float ) and math .isnan (value ):
166
+ return None
167
+ return dtype (value )
168
+
169
+ # first convert keys and save the results; we'll need them later
170
+ for csv_name in self .key_columns :
171
+ dataframe .loc [:, csv_name ] = dataframe [csv_name ].map (self .columns_and_types [csv_name ].dtype )
172
+
156
173
num_columns = 2 + len (dataframe_columns_and_types ) + len (self .additional_fields )
157
174
value_placeholders = ', ' .join (['%s' ] * num_columns )
158
- columns = ', ' .join (f'`{ i [ 1 ] } `' for i in dataframe_columns_and_types + self .additional_fields )
175
+ columns = ', ' .join (f'`{ i . sql_name } `' for i in dataframe_columns_and_types + self .additional_fields )
159
176
sql = f'INSERT INTO `{ self .table_name } ` (`id`, `{ self .publication_col_name } `, { columns } ) ' \
160
177
f'VALUES ({ value_placeholders } )'
161
178
id_and_publication_date = (0 , publication_date )
162
179
with self .new_cursor () as cursor :
163
180
for _ , row in dataframe .iterrows ():
164
181
values = []
165
- for name , _ , dtype in dataframe_columns_and_types :
166
- if isinstance (row [name ], float ) and math .isnan (row [name ]):
167
- values .append (None )
168
- else :
169
- values .append (dtype (row [name ]))
182
+ for c in dataframe_columns_and_types :
183
+ values .append (nan_safe_dtype (c .dtype , row [c .csv_name ]))
170
184
cursor .execute (sql ,
171
185
id_and_publication_date +
172
186
tuple (values ) +
173
- tuple (i [ 0 ] for i in self .additional_fields ))
187
+ tuple (i . csv_name for i in self .additional_fields ))
174
188
175
189
# deal with non/seldomly updated columns used like a fk table (if this database needs it)
176
190
if hasattr (self , 'AGGREGATE_KEY_COLS' ):
177
191
ak_cols = self .AGGREGATE_KEY_COLS
178
192
179
193
# restrict data to just the key columns and remove duplicate rows
180
- ak_data = (dataframe [set (ak_cols + self .KEY_COLS )]
181
- .sort_values (self .KEY_COLS )[ak_cols ]
194
+ # sort by key columns to ensure that the last ON DUPLICATE KEY overwrite
195
+ # uses the most-recent aggregate key information
196
+ ak_data = (dataframe [set (ak_cols + self .key_columns )]
197
+ .sort_values (self .key_columns )[ak_cols ]
182
198
.drop_duplicates ())
183
199
# cast types
184
- dataframe_typemap = {
185
- name : dtype
186
- for name , _ , dtype in dataframe_columns_and_types
187
- }
188
200
for col in ak_cols :
189
- def cast_but_sidestep_nans (i ):
190
- # not the prettiest, but it works to avoid the NaN values that show up in many columns
191
- if isinstance (i , float ) and math .isnan (i ):
192
- return None
193
- return dataframe_typemap [col ](i )
194
- ak_data [col ] = ak_data [col ].apply (cast_but_sidestep_nans )
201
+ ak_data [col ] = ak_data [col ].map (
202
+ lambda value : nan_safe_dtype (self .columns_and_types [col ].dtype , value )
203
+ )
195
204
# fix NULLs
196
205
ak_data = ak_data .to_numpy (na_value = None ).tolist ()
197
206
@@ -204,7 +213,7 @@ def cast_but_sidestep_nans(i):
204
213
# use aggregate key table alias
205
214
ak_table = self .table_name + '_key'
206
215
# assemble full SQL statement
207
- ak_insert_sql = f'INSERT INTO `{ ak_table } ` ({ ak_cols_str } ) VALUES ({ values_str } ) as v ON DUPLICATE KEY UPDATE { ak_updates_str } '
216
+ ak_insert_sql = f'INSERT INTO `{ ak_table } ` ({ ak_cols_str } ) VALUES ({ values_str } ) AS v ON DUPLICATE KEY UPDATE { ak_updates_str } '
208
217
209
218
# commit the data
210
219
with self .new_cursor () as cur :
0 commit comments