|
10 | 10 |
|
11 | 11 | # third party |
12 | 12 | import json |
| 13 | +from typing import List |
13 | 14 | import mysql.connector |
14 | 15 |
|
15 | 16 | # first party |
16 | 17 | import delphi.operations.secrets as secrets |
17 | 18 | from delphi.epidata.acquisition.covidcast.logger import get_structured_logger |
18 | | - |
19 | | -class CovidcastRow(): |
20 | | - """A container for all the values of a single covidcast row.""" |
21 | | - |
22 | | - @staticmethod |
23 | | - def fromCsvRowValue(row_value, source, signal, time_type, geo_type, time_value, issue, lag): |
24 | | - if row_value is None: return None |
25 | | - return CovidcastRow(source, signal, time_type, geo_type, time_value, |
26 | | - row_value.geo_value, |
27 | | - row_value.value, |
28 | | - row_value.stderr, |
29 | | - row_value.sample_size, |
30 | | - row_value.missing_value, |
31 | | - row_value.missing_stderr, |
32 | | - row_value.missing_sample_size, |
33 | | - issue, lag) |
34 | | - |
35 | | - @staticmethod |
36 | | - def fromCsvRows(row_values, source, signal, time_type, geo_type, time_value, issue, lag): |
37 | | - # NOTE: returns a generator, as row_values is expected to be a generator |
38 | | - return (CovidcastRow.fromCsvRowValue(row_value, source, signal, time_type, geo_type, time_value, issue, lag) |
39 | | - for row_value in row_values) |
40 | | - |
41 | | - def __init__(self, source, signal, time_type, geo_type, time_value, geo_value, value, stderr, |
42 | | - sample_size, missing_value, missing_stderr, missing_sample_size, issue, lag): |
43 | | - self.id = None |
44 | | - self.source = source |
45 | | - self.signal = signal |
46 | | - self.time_type = time_type |
47 | | - self.geo_type = geo_type |
48 | | - self.time_value = time_value |
49 | | - self.geo_value = geo_value # from CSV row |
50 | | - self.value = value # ... |
51 | | - self.stderr = stderr # ... |
52 | | - self.sample_size = sample_size # ... |
53 | | - self.missing_value = missing_value # ... |
54 | | - self.missing_stderr = missing_stderr # ... |
55 | | - self.missing_sample_size = missing_sample_size # from CSV row |
56 | | - self.direction_updated_timestamp = 0 |
57 | | - self.direction = None |
58 | | - self.issue = issue |
59 | | - self.lag = lag |
60 | | - |
61 | | - def signal_pair(self): |
62 | | - return f"{self.source}:{self.signal}" |
63 | | - |
64 | | - def geo_pair(self): |
65 | | - return f"{self.geo_type}:{self.geo_value}" |
| 19 | +from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow |
66 | 20 |
|
67 | 21 |
|
68 | 22 | class DBLoadStateException(Exception): |
@@ -154,7 +108,7 @@ def do_analyze(self): |
154 | 108 | def insert_or_update_bulk(self, cc_rows): |
155 | 109 | return self.insert_or_update_batch(cc_rows) |
156 | 110 |
|
157 | | - def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False, suppress_jobs=False): |
| 111 | + def insert_or_update_batch(self, cc_rows: List[CovidcastRow], batch_size=2**20, commit_partial=False, suppress_jobs=False): |
158 | 112 | """ |
159 | 113 | Insert new rows into the load table and dispatch into dimension and fact tables. |
160 | 114 | """ |
|
0 commit comments