|
5 | 5 |
|
6 | 6 | # third party |
7 | 7 | import json |
| 8 | +from typing import List |
8 | 9 | import mysql.connector |
9 | | -import numpy as np |
10 | 10 | from math import ceil |
11 | 11 |
|
12 | 12 | from queue import Queue, Empty |
|
17 | 17 | import delphi.operations.secrets as secrets |
18 | 18 |
|
19 | 19 | from delphi.epidata.acquisition.covidcast.logger import get_structured_logger |
20 | | - |
21 | | -class CovidcastRow(): |
22 | | - """A container for all the values of a single covidcast row.""" |
23 | | - |
24 | | - @staticmethod |
25 | | - def fromCsvRowValue(row_value, source, signal, time_type, geo_type, time_value, issue, lag): |
26 | | - if row_value is None: return None |
27 | | - return CovidcastRow(source, signal, time_type, geo_type, time_value, |
28 | | - row_value.geo_value, |
29 | | - row_value.value, |
30 | | - row_value.stderr, |
31 | | - row_value.sample_size, |
32 | | - row_value.missing_value, |
33 | | - row_value.missing_stderr, |
34 | | - row_value.missing_sample_size, |
35 | | - issue, lag) |
36 | | - |
37 | | - @staticmethod |
38 | | - def fromCsvRows(row_values, source, signal, time_type, geo_type, time_value, issue, lag): |
39 | | - # NOTE: returns a generator, as row_values is expected to be a generator |
40 | | - return (CovidcastRow.fromCsvRowValue(row_value, source, signal, time_type, geo_type, time_value, issue, lag) |
41 | | - for row_value in row_values) |
42 | | - |
43 | | - def __init__(self, source, signal, time_type, geo_type, time_value, geo_value, value, stderr, |
44 | | - sample_size, missing_value, missing_stderr, missing_sample_size, issue, lag): |
45 | | - self.id = None |
46 | | - self.source = source |
47 | | - self.signal = signal |
48 | | - self.time_type = time_type |
49 | | - self.geo_type = geo_type |
50 | | - self.time_value = time_value |
51 | | - self.geo_value = geo_value # from CSV row |
52 | | - self.value = value # ... |
53 | | - self.stderr = stderr # ... |
54 | | - self.sample_size = sample_size # ... |
55 | | - self.missing_value = missing_value # ... |
56 | | - self.missing_stderr = missing_stderr # ... |
57 | | - self.missing_sample_size = missing_sample_size # from CSV row |
58 | | - self.direction_updated_timestamp = 0 |
59 | | - self.direction = None |
60 | | - self.issue = issue |
61 | | - self.lag = lag |
62 | | - |
63 | | - def signal_pair(self): |
64 | | - return f"{self.source}:{self.signal}" |
65 | | - |
66 | | - def geo_pair(self): |
67 | | - return f"{self.geo_type}:{self.geo_value}" |
| 20 | +from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow |
68 | 21 |
|
69 | 22 |
|
70 | 23 | class DBLoadStateException(Exception): |
@@ -156,7 +109,7 @@ def do_analyze(self): |
156 | 109 | def insert_or_update_bulk(self, cc_rows): |
157 | 110 | return self.insert_or_update_batch(cc_rows) |
158 | 111 |
|
159 | | - def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False, suppress_jobs=False): |
| 112 | + def insert_or_update_batch(self, cc_rows: List[CovidcastRow], batch_size=2**20, commit_partial=False, suppress_jobs=False): |
160 | 113 | """ |
161 | 114 | Insert new rows into the load table and dispatch into dimension and fact tables. |
162 | 115 | """ |
|
0 commit comments