Skip to content

Commit 163185f

Browse files
committed
Acquisition: update database.py to use CovidcastRow
1 parent c391a28 commit 163185f

File tree

1 file changed

+3
-50
lines changed

1 file changed

+3
-50
lines changed

src/acquisition/covidcast/database.py

+3-50
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55

66
# third party
77
import json
8+
from typing import List
89
import mysql.connector
9-
import numpy as np
1010
from math import ceil
1111

1212
from queue import Queue, Empty
@@ -17,54 +17,7 @@
1717
import delphi.operations.secrets as secrets
1818

1919
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
20-
21-
class CovidcastRow():
22-
"""A container for all the values of a single covidcast row."""
23-
24-
@staticmethod
25-
def fromCsvRowValue(row_value, source, signal, time_type, geo_type, time_value, issue, lag):
26-
if row_value is None: return None
27-
return CovidcastRow(source, signal, time_type, geo_type, time_value,
28-
row_value.geo_value,
29-
row_value.value,
30-
row_value.stderr,
31-
row_value.sample_size,
32-
row_value.missing_value,
33-
row_value.missing_stderr,
34-
row_value.missing_sample_size,
35-
issue, lag)
36-
37-
@staticmethod
38-
def fromCsvRows(row_values, source, signal, time_type, geo_type, time_value, issue, lag):
39-
# NOTE: returns a generator, as row_values is expected to be a generator
40-
return (CovidcastRow.fromCsvRowValue(row_value, source, signal, time_type, geo_type, time_value, issue, lag)
41-
for row_value in row_values)
42-
43-
def __init__(self, source, signal, time_type, geo_type, time_value, geo_value, value, stderr,
44-
sample_size, missing_value, missing_stderr, missing_sample_size, issue, lag):
45-
self.id = None
46-
self.source = source
47-
self.signal = signal
48-
self.time_type = time_type
49-
self.geo_type = geo_type
50-
self.time_value = time_value
51-
self.geo_value = geo_value # from CSV row
52-
self.value = value # ...
53-
self.stderr = stderr # ...
54-
self.sample_size = sample_size # ...
55-
self.missing_value = missing_value # ...
56-
self.missing_stderr = missing_stderr # ...
57-
self.missing_sample_size = missing_sample_size # from CSV row
58-
self.direction_updated_timestamp = 0
59-
self.direction = None
60-
self.issue = issue
61-
self.lag = lag
62-
63-
def signal_pair(self):
64-
return f"{self.source}:{self.signal}"
65-
66-
def geo_pair(self):
67-
return f"{self.geo_type}:{self.geo_value}"
20+
from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow
6821

6922

7023
class DBLoadStateException(Exception):
@@ -156,7 +109,7 @@ def do_analyze(self):
156109
def insert_or_update_bulk(self, cc_rows):
157110
return self.insert_or_update_batch(cc_rows)
158111

159-
def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False, suppress_jobs=False):
112+
def insert_or_update_batch(self, cc_rows: List[CovidcastRow], batch_size=2**20, commit_partial=False, suppress_jobs=False):
160113
"""
161114
Insert new rows into the load table and dispatch into dimension and fact tables.
162115
"""

0 commit comments

Comments
 (0)