|
29 | 29 |
|
30 | 30 |
|
31 | 31 | # location constants
|
32 |
| -DROPBOX_BASE_DIR = '/cdc_page_stats' |
33 |
| -DELPHI_BASE_DIR = '/common/cdc_stage' |
| 32 | +DROPBOX_BASE_DIR = "/cdc_page_stats" |
| 33 | +DELPHI_BASE_DIR = "/common/cdc_stage" |
34 | 34 |
|
35 | 35 |
|
36 | 36 | def get_timestamp_string():
|
37 |
| - """ |
38 |
| - Return the current local date and time as a string. |
| 37 | + """ |
| 38 | + Return the current local date and time as a string. |
39 | 39 |
|
40 |
| - The format is "%Y%m%d_%H%M%S". |
41 |
| - """ |
42 |
| - return datetime.datetime.now().strftime('%Y%m%d_%H%M%S') |
| 40 | + The format is "%Y%m%d_%H%M%S". |
| 41 | + """ |
| 42 | + return datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
43 | 43 |
|
44 | 44 |
|
45 | 45 | def trigger_further_processing():
|
46 |
| - """Add CDCP processing scripts to the Automation run queue.""" |
| 46 | + """Add CDCP processing scripts to the Automation run queue.""" |
47 | 47 |
|
48 |
| - # connect |
49 |
| - u, p = secrets.db.auto |
50 |
| - cnx = mysql.connector.connect(user=u, password=p, database='automation') |
51 |
| - cur = cnx.cursor() |
| 48 | + # connect |
| 49 | + u, p = secrets.db.auto |
| 50 | + cnx = mysql.connector.connect(user=u, password=p, database="automation") |
| 51 | + cur = cnx.cursor() |
52 | 52 |
|
53 |
| - # add step "Process CDCP Data" to queue |
54 |
| - cur.execute('CALL automation.RunStep(46)') |
| 53 | + # add step "Process CDCP Data" to queue |
| 54 | + cur.execute("CALL automation.RunStep(46)") |
55 | 55 |
|
56 |
| - # disconnect |
57 |
| - cur.close() |
58 |
| - cnx.commit() |
59 |
| - cnx.close() |
| 56 | + # disconnect |
| 57 | + cur.close() |
| 58 | + cnx.commit() |
| 59 | + cnx.close() |
60 | 60 |
|
61 | 61 |
|
62 | 62 | def fetch_data():
|
63 |
| - """ |
64 |
| - Check for new files on dropbox, download them, zip them, cleanup dropbox, and |
65 |
| - trigger further processing of new data. |
66 |
| - """ |
67 |
| - |
68 |
| - # initialize dropbox api |
69 |
| - dbx = dropbox.Dropbox(secrets.cdcp.dropbox_token) |
70 |
| - |
71 |
| - # look for new CDC data files |
72 |
| - print('checking dropbox:%s' % DROPBOX_BASE_DIR) |
73 |
| - save_list = [] |
74 |
| - for entry in dbx.files_list_folder(DROPBOX_BASE_DIR).entries: |
75 |
| - name = entry.name |
76 |
| - if name.endswith('.csv') or name.endswith('.zip'): |
77 |
| - print(' download "%s"' % name) |
78 |
| - save_list.append(name) |
79 |
| - else: |
80 |
| - print(' skip "%s"' % name) |
81 |
| - |
82 |
| - # determine if there's anything to be done |
83 |
| - if len(save_list) == 0: |
84 |
| - print('did not find any new data files') |
85 |
| - return |
86 |
| - |
87 |
| - # download new files, saving them inside of a new zip file |
88 |
| - timestamp = get_timestamp_string() |
89 |
| - zip_path = '%s/dropbox_%s.zip' % (DELPHI_BASE_DIR, timestamp) |
90 |
| - print('downloading into delphi:%s' % zip_path) |
91 |
| - with ZipFile(zip_path, 'w', ZIP_DEFLATED) as zf: |
| 63 | + """ |
| 64 | + Check for new files on dropbox, download them, zip them, cleanup dropbox, and |
| 65 | + trigger further processing of new data. |
| 66 | + """ |
| 67 | + |
| 68 | + # initialize dropbox api |
| 69 | + dbx = dropbox.Dropbox(secrets.cdcp.dropbox_token) |
| 70 | + |
| 71 | + # look for new CDC data files |
| 72 | + print(f"checking dropbox: {DROPBOX_BASE_DIR}") |
| 73 | + save_list = [] |
| 74 | + for entry in dbx.files_list_folder(DROPBOX_BASE_DIR).entries: |
| 75 | + name = entry.name |
| 76 | + if name.endswith(".csv") or name.endswith(".zip"): |
| 77 | + print(f" download: {name}") |
| 78 | + save_list.append(name) |
| 79 | + else: |
| 80 | + print(f" skip: {name}") |
| 81 | + |
| 82 | + # determine if there's anything to be done |
| 83 | + if len(save_list) == 0: |
| 84 | + print("did not find any new data files") |
| 85 | + return |
| 86 | + |
| 87 | + # download new files, saving them inside of a new zip file |
| 88 | + timestamp = get_timestamp_string() |
| 89 | + zip_path = f"{DELPHI_BASE_DIR}/dropbox_{timestamp}.zip" |
| 90 | + print(f"downloading into delphi:{zip_path}") |
| 91 | + with ZipFile(zip_path, "w", ZIP_DEFLATED) as zf: |
| 92 | + for name in save_list: |
| 93 | + # location of the file on dropbox |
| 94 | + dropbox_path = f"{DROPBOX_BASE_DIR}/{name}" |
| 95 | + print(f" {dropbox_path}") |
| 96 | + |
| 97 | + # start the download |
| 98 | + meta, resp = dbx.files_download(dropbox_path) |
| 99 | + |
| 100 | + # check status and length |
| 101 | + if resp.status_code != 200: |
| 102 | + raise Exception(["resp.status_code", resp.status_code]) |
| 103 | + dropbox_len = meta.size |
| 104 | + print(f" need {int(dropbox_len)} bytes...") |
| 105 | + content_len = int(resp.headers.get("Content-Length", -1)) |
| 106 | + if dropbox_len != content_len: |
| 107 | + info = ["dropbox_len", dropbox_len, "content_len", content_len] |
| 108 | + raise Exception(info) |
| 109 | + |
| 110 | + # finish the download, holding the data in this variable |
| 111 | + filedata = resp.content |
| 112 | + |
| 113 | + # check the length again |
| 114 | + payload_len = len(filedata) |
| 115 | + print(" downloaded") |
| 116 | + if dropbox_len != payload_len: |
| 117 | + info = ["dropbox_len", dropbox_len, "payload_len", payload_len] |
| 118 | + raise Exception(info) |
| 119 | + |
| 120 | + # add the downloaded file to the zip file |
| 121 | + zf.writestr(name, filedata) |
| 122 | + print(" added") |
| 123 | + |
| 124 | + # At this point, all the data is stored and awaiting further processing on |
| 125 | + # the delphi server. |
| 126 | + print(f"saved all new data in {zip_path}") |
| 127 | + |
| 128 | + # on dropbox, archive downloaded files so they won't be downloaded again |
| 129 | + archive_dir = f"archived_reports/processed_{timestamp}" |
| 130 | + print("archiving files...") |
92 | 131 | for name in save_list:
|
93 |
| - # location of the file on dropbox |
94 |
| - dropbox_path = '%s/%s' % (DROPBOX_BASE_DIR, name) |
95 |
| - print(' %s' % dropbox_path) |
96 |
| - |
97 |
| - # start the download |
98 |
| - meta, resp = dbx.files_download(dropbox_path) |
99 |
| - |
100 |
| - # check status and length |
101 |
| - if resp.status_code != 200: |
102 |
| - raise Exception(['resp.status_code', resp.status_code]) |
103 |
| - dropbox_len = meta.size |
104 |
| - print(' need %d bytes...' % dropbox_len) |
105 |
| - content_len = int(resp.headers.get('Content-Length', -1)) |
106 |
| - if dropbox_len != content_len: |
107 |
| - info = ['dropbox_len', dropbox_len, 'content_len', content_len] |
108 |
| - raise Exception(info) |
109 |
| - |
110 |
| - # finish the download, holding the data in this variable |
111 |
| - filedata = resp.content |
112 |
| - |
113 |
| - # check the length again |
114 |
| - payload_len = len(filedata) |
115 |
| - print(' downloaded') |
116 |
| - if dropbox_len != payload_len: |
117 |
| - info = ['dropbox_len', dropbox_len, 'payload_len', payload_len] |
118 |
| - raise Exception(info) |
119 |
| - |
120 |
| - # add the downloaded file to the zip file |
121 |
| - zf.writestr(name, filedata) |
122 |
| - print(' added') |
123 |
| - |
124 |
| - # At this point, all the data is stored and awaiting further processing on |
125 |
| - # the delphi server. |
126 |
| - print('saved all new data in %s' % zip_path) |
127 |
| - |
128 |
| - # on dropbox, archive downloaded files so they won't be downloaded again |
129 |
| - archive_dir = 'archived_reports/processed_%s' % timestamp |
130 |
| - print('archiving files...') |
131 |
| - for name in save_list: |
132 |
| - # source and destination |
133 |
| - dropbox_src = '%s/%s' % (DROPBOX_BASE_DIR, name) |
134 |
| - dropbox_dst = '%s/%s/%s' % (DROPBOX_BASE_DIR, archive_dir, name) |
135 |
| - print(' "%s" -> "%s"' % (dropbox_src, dropbox_dst)) |
136 |
| - |
137 |
| - # move the file |
138 |
| - meta = dbx.files_move(dropbox_src, dropbox_dst) |
139 |
| - |
140 |
| - # sanity check |
141 |
| - if archive_dir not in meta.path_lower: |
142 |
| - raise Exception('failed to move "%s"' % name) |
143 |
| - |
144 |
| - # finally, trigger the usual processing flow |
145 |
| - print('triggering processing flow') |
146 |
| - trigger_further_processing() |
147 |
| - print('done') |
| 132 | + # source and destination |
| 133 | + dropbox_src = f"{DROPBOX_BASE_DIR}/{name}" |
| 134 | + dropbox_dst = f"{DROPBOX_BASE_DIR}/{archive_dir}/{name}" |
| 135 | + print(f" {dropbox_src} -> {dropbox_dst}") |
| 136 | + |
| 137 | + # move the file |
| 138 | + meta = dbx.files_move(dropbox_src, dropbox_dst) |
| 139 | + |
| 140 | + # sanity check |
| 141 | + if archive_dir not in meta.path_lower: |
| 142 | + raise Exception(f"failed to move {name}") |
| 143 | + |
| 144 | + # finally, trigger the usual processing flow |
| 145 | + print("triggering processing flow") |
| 146 | + trigger_further_processing() |
| 147 | + print("done") |
148 | 148 |
|
149 | 149 |
|
150 | 150 | def main():
|
151 |
| - # fetch new data |
152 |
| - fetch_data() |
| 151 | + # fetch new data |
| 152 | + fetch_data() |
153 | 153 |
|
154 | 154 |
|
155 |
| -if __name__ == '__main__': |
156 |
| - main() |
| 155 | +if __name__ == "__main__": |
| 156 | + main() |
0 commit comments