-
Notifications
You must be signed in to change notification settings - Fork 68
/
Copy pathcdc_dropbox_receiver.py
160 lines (122 loc) · 4.68 KB
/
cdc_dropbox_receiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
"""
===============
=== Purpose ===
===============
Downloads CDC page stats stored in Delphi's dropbox.
This program:
1. downloads new files within dropbox:/cdc_page_stats
2. moves the originals to dropbox:/cdc_page_stats/archived_reports
3. zips the downloaded files and moves that to delphi:/common/cdc_stage
4. queues cdc_upload.py, cdc_extract.py, and other scripts to run
See also:
- cdc_upload.py
- cdc_extract.py
"""
# standard library
import datetime
from zipfile import ZIP_DEFLATED, ZipFile
# third party
import dropbox
import mysql.connector
# first party
import delphi.operations.secrets as secrets
from delphi.epidata.common.logger import get_structured_logger
# location constants
DROPBOX_BASE_DIR = "/cdc_page_stats"
DELPHI_BASE_DIR = "/common/cdc_stage"
logger = get_structured_logger("cdc_dropbox_receiver")
def get_timestamp_string():
"""
Return the current local date and time as a string.
The format is "%Y%m%d_%H%M%S".
"""
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def trigger_further_processing():
"""Add CDCP processing scripts to the Automation run queue."""
# connect
u, p = secrets.db.auto
cnx = mysql.connector.connect(user=u, password=p, database="automation")
cur = cnx.cursor()
# add step "Process CDCP Data" to queue
cur.execute("CALL automation.RunStep(46)")
# disconnect
cur.close()
cnx.commit()
cnx.close()
def fetch_data():
"""
Check for new files on dropbox, download them, zip them, cleanup dropbox, and
trigger further processing of new data.
"""
# initialize dropbox api
dbx = dropbox.Dropbox(secrets.cdcp.dropbox_token)
# look for new CDC data files
logger.info(f"checking dropbox: {DROPBOX_BASE_DIR}")
save_list = []
for entry in dbx.files_list_folder(DROPBOX_BASE_DIR).entries:
name = entry.name
if name.endswith(".csv") or name.endswith(".zip"):
logger.info(f" download: {name}")
save_list.append(name)
else:
logger.info(f" skip: {name}")
# determine if there's anything to be done
if len(save_list) == 0:
logger.info("did not find any new data files")
return
# download new files, saving them inside of a new zip file
timestamp = get_timestamp_string()
zip_path = f"{DELPHI_BASE_DIR}/dropbox_{timestamp}.zip"
logger.info(f"downloading into delphi:{zip_path}")
with ZipFile(zip_path, "w", ZIP_DEFLATED) as zf:
for name in save_list:
# location of the file on dropbox
dropbox_path = f"{DROPBOX_BASE_DIR}/{name}"
logger.info(f" {dropbox_path}")
# start the download
meta, resp = dbx.files_download(dropbox_path)
# check status and length
if resp.status_code != 200:
raise Exception(["resp.status_code", resp.status_code])
dropbox_len = meta.size
logger.info(f" need {int(dropbox_len)} bytes...")
content_len = int(resp.headers.get("Content-Length", -1))
if dropbox_len != content_len:
info = ["dropbox_len", dropbox_len, "content_len", content_len]
raise Exception(info)
# finish the download, holding the data in this variable
filedata = resp.content
# check the length again
payload_len = len(filedata)
logger.info(" downloaded")
if dropbox_len != payload_len:
info = ["dropbox_len", dropbox_len, "payload_len", payload_len]
raise Exception(info)
# add the downloaded file to the zip file
zf.writestr(name, filedata)
logger.info(" added")
# At this point, all the data is stored and awaiting further processing on
# the delphi server.
logger.info(f"saved all new data in {zip_path}")
# on dropbox, archive downloaded files so they won't be downloaded again
archive_dir = f"archived_reports/processed_{timestamp}"
logger.info("archiving files...")
for name in save_list:
# source and destination
dropbox_src = f"{DROPBOX_BASE_DIR}/{name}"
dropbox_dst = f"{DROPBOX_BASE_DIR}/{archive_dir}/{name}"
logger.info(f" {dropbox_src} -> {dropbox_dst}")
# move the file
meta = dbx.files_move(dropbox_src, dropbox_dst)
# sanity check
if archive_dir not in meta.path_lower:
raise Exception(f"failed to move {name}")
# finally, trigger the usual processing flow
logger.info("triggering processing flow")
trigger_further_processing()
logger.info("done")
def main():
# fetch new data
fetch_data()
if __name__ == "__main__":
main()