6
6
7
7
import pandas as pd
8
8
9
+ from delphi .epidata .acquisition .common .logger import get_structured_logger
10
+
9
11
class CovidHospException (Exception ):
10
12
"""Exception raised exclusively by `covid_hosp` utilities."""
11
13
@@ -69,7 +71,15 @@ def parse_bool(value):
69
71
return False
70
72
raise CovidHospException (f'cannot convert "{ value } " to bool' )
71
73
72
- def issues_to_fetch (metadata , newer_than , older_than ):
74
+ def limited_string_fn (length ):
75
+ def limited_string (value ):
76
+ value = str (value )
77
+ if len (value ) > length :
78
+ raise CovidHospException (f"Value '{ value } ':{ len (value )} longer than max { length } " )
79
+ return value
80
+ return limited_string
81
+
82
+ def issues_to_fetch (metadata , newer_than , older_than , logger = False ):
73
83
"""
74
84
Construct all issue dates and URLs to be ingested based on metadata.
75
85
@@ -81,6 +91,8 @@ def issues_to_fetch(metadata, newer_than, older_than):
81
91
Lower bound (exclusive) of days to get issues for.
82
92
older_than Date
83
93
Upper bound (exclusive) of days to get issues for
94
+ logger structlog.Logger [optional; default False]
95
+ Logger to receive messages
84
96
Returns
85
97
-------
86
98
Dictionary of {issue day: list of (download urls, index)}
@@ -100,11 +112,12 @@ def issues_to_fetch(metadata, newer_than, older_than):
100
112
elif day >= older_than :
101
113
n_beyond += 1
102
114
if n_beyond > 0 :
103
- print (f"{ n_beyond } issues available on { older_than } or newer" )
115
+ if logger :
116
+ logger .info ("issues available" , on_or_newer = older_than , count = n_beyond )
104
117
return daily_issues
105
118
106
119
@staticmethod
107
- def merge_by_key_cols (dfs , key_cols ):
120
+ def merge_by_key_cols (dfs , key_cols , logger = False ):
108
121
"""Merge a list of data frames as a series of updates.
109
122
110
123
Parameters:
@@ -113,13 +126,20 @@ def merge_by_key_cols(dfs, key_cols):
113
126
Data frames to merge, ordered from earliest to latest.
114
127
key_cols: list(str)
115
128
Columns to use as the index.
129
+ logger structlog.Logger [optional; default False]
130
+ Logger to receive messages
116
131
117
132
Returns a single data frame containing the most recent data for each state+date.
118
133
"""
119
134
120
135
dfs = [df .set_index (key_cols ) for df in dfs
121
136
if not all (k in df .index .names for k in key_cols )]
122
137
result = dfs [0 ]
138
+ if logger and len (dfs ) > 7 :
139
+ logger .warning (
140
+ "expensive operation" ,
141
+ msg = "concatenating more than 7 files may result in long running times" ,
142
+ count = len (dfs ))
123
143
for df in dfs [1 :]:
124
144
# update values for existing keys
125
145
result .update (df )
@@ -153,22 +173,25 @@ def update_dataset(database, network, newer_than=None, older_than=None):
153
173
bool
154
174
Whether a new dataset was acquired.
155
175
"""
156
- metadata = network .fetch_metadata ()
176
+ logger = get_structured_logger (f"{ database .__class__ .__module__ } .{ database .__class__ .__name__ } .update_dataset" )
177
+
178
+ metadata = network .fetch_metadata (logger = logger )
157
179
datasets = []
158
180
with database .connect () as db :
159
- max_issue = db .get_max_issue ()
181
+ max_issue = db .get_max_issue (logger = logger )
160
182
161
183
older_than = datetime .datetime .today ().date () if newer_than is None else older_than
162
184
newer_than = max_issue if newer_than is None else newer_than
163
- daily_issues = Utils .issues_to_fetch (metadata , newer_than , older_than )
185
+ daily_issues = Utils .issues_to_fetch (metadata , newer_than , older_than , logger = logger )
164
186
if not daily_issues :
165
- print ("no new issues, nothing to do" )
187
+ logger . info ("no new issues; nothing to do" )
166
188
return False
167
189
for issue , revisions in daily_issues .items ():
168
190
issue_int = int (issue .strftime ("%Y%m%d" ))
169
191
# download the dataset and add it to the database
170
192
dataset = Utils .merge_by_key_cols ([network .fetch_dataset (url ) for url , _ in revisions ],
171
- db .KEY_COLS )
193
+ db .KEY_COLS ,
194
+ logger = logger )
172
195
# add metadata to the database
173
196
all_metadata = []
174
197
for url , index in revisions :
@@ -180,10 +203,10 @@ def update_dataset(database, network, newer_than=None, older_than=None):
180
203
))
181
204
with database .connect () as db :
182
205
for issue_int , dataset , all_metadata in datasets :
183
- db .insert_dataset (issue_int , dataset )
206
+ db .insert_dataset (issue_int , dataset , logger = logger )
184
207
for url , metadata_json in all_metadata :
185
- db .insert_metadata (issue_int , url , metadata_json )
186
- print ( f'successfully acquired { len (dataset )} rows' )
208
+ db .insert_metadata (issue_int , url , metadata_json , logger = logger )
209
+ logger . info ( " acquired rows" , count = len (dataset ))
187
210
188
211
# note that the transaction is committed by exiting the `with` block
189
212
return True
0 commit comments