-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
336 lines (274 loc) · 12.1 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
from src.blast import blast
from src.get_genome_coordinates import get_genome_coordinates, get_genome_coordinates_batch
from src.accID2operon import acc2operon
from src.fetch_promoter import fetch_promoter
from src.fetch_operator import fetch_operator
from src.troubleshoot import troubleshoot
import re
import pandas as pd
import requests
import json
import sys
import signal
import ast
import os
import base64
import boto3
import time
import hashlib
from decimal import Decimal
# Graceful shutdown function
# Shutdown is allowed to run until SIGKILL
# The purpose is to capture SIGKILL reason inside Cloudwatch event to write to the database
# Exiting early here would not result in the correct event sent
def signal_handler(signal, frame):
print('Graceful shutdown activated')
print(f'Signal called: {signal}')
# Map signals to handler
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
def set_params(param_obj):
try:
blast_params = {
"ident_cutoff": param_obj["identity"],
"cov_cutoff": param_obj["coverage"]
}
promoter_params = {
"min_length": param_obj["minLength"],
"max_length": param_obj["maxLength"]
}
operator_params = {
"extension_length": param_obj["extension"],
"win_score": param_obj["alignMatch"],
"loss_score": param_obj["alignMismatch"],
"spacer_penalty": param_obj["penalty"],
"gap_open": param_obj["gapOpen"],
"gap_extend": param_obj["gapExtend"],
"align_match": param_obj["alignMatch"],
"align_mismatch": param_obj["alignMismatch"],
"min_operator_length": param_obj["minOperator"],
"max_operator_length": param_obj["maxOperator"],
"seq_to_align": param_obj["seqToAlign"],
"search_method": param_obj["conservation"]
}
except Exception as e:
print(e)
raise Exception('Set params failed')
return blast_params, promoter_params, operator_params
# (1) BLAST protein. return a dataframe
def perform_blast(blast_params, promoter_params, operator_params, data):
genome_choice = data["genomeChoice"]
filter_redundant = data["filter"]
acc = data["acc"]
input_method = data["method"]
max_homologs = data["homologs"]
homolog_dict = []
try:
blast_df = blast(acc, input_method, blast_params, max_seqs=500)
except Exception as e:
print(e)
raise Exception(e)
if not blast_df.empty:
#if 'filter redundant' box checked, filter out homologs that have the same %identity and %coverage
def filter_blastDf(blast_df):
ident_covs = []
for i, row in blast_df.iterrows():
entry = {"Uniprot Id": row["Uniprot Id"], "identity": row["Identity"],"coverage": row["Coverage"]}
to_compare = {"identity": row["Identity"],"coverage": row["Coverage"]}
if to_compare not in ident_covs:
homolog_dict.append(entry)
ident_covs.append(to_compare)
return homolog_dict
if filter_redundant:
try:
homolog_dict = filter_blastDf(blast_df)
except Exception as e:
print('Error trying to filter blastDf')
raise Exception(e)
else:
homolog_dict = [
{"Uniprot Id": row["Uniprot Id"], "identity": row["Identity"],"coverage": row["Coverage"]}
for i, row in blast_df.iterrows()
]
# limit search to specified number of homologs
homolog_dict = homolog_dict[0:max_homologs]
### DISPLAY homolog_dict in the frontend
print("blast finished.")
# (2) Get genome coordianates. Return a dataframe
if genome_choice == "batch":
try:
homolog_dict = get_genome_coordinates_batch(homolog_dict)
except Exception as e:
print(e)
raise Exception(e)
#TODO: I get an error here sometimes.
if homolog_dict == None:
print("Failed fetching genome coordinates. Try fetching these individually (advanced options)")
raise Exception("Failed fetching genome coordinates. Try fetching these individually (advanced options)")
else:
homolog_dict = [i for i in homolog_dict if i != None]
elif genome_choice == "individually":
updated_homolog_dict = []
for i in range(0, len(homolog_dict)):
try:
updated_homolog_dict.append(get_genome_coordinates(homolog_dict[i]))
except Exception as e:
print('Error on api embl call')
raise Exception(e)
# Remove entries without any genome coordinates
homolog_dict = [i for i in updated_homolog_dict if i != None]
homolog_dict = [i for i in homolog_dict if "Genome" in i.keys()]
coordinates_df = pd.DataFrame(homolog_dict).drop(columns=["identity", "coverage"])
### DISPLAY coordinates_df in the frontend
print("genome coordinates fetched.")
# (3) Extract predicted operators for each homolog. return a dataframe
for i in range(0, len(homolog_dict)):
homolog_dict[i]["operon"] = acc2operon(homolog_dict[i])
# Deal with cases where operon fetching fails
try:
homolog_dict[i]["promoter"] = fetch_promoter(homolog_dict[i]["operon"], promoter_params)
except:
homolog_dict[i]["promoter"] = None
operator_dict = fetch_operator(homolog_dict, operator_params)
operator_df = pd.DataFrame(operator_dict["aligned_seqs"])
### DISPLAY operator dataframe in the frontend.
print("operators fetched.")
# (4) Process results
# Display metrics
metric1 = operator_dict["consensus_score"]
metric2 = operator_dict["num_seqs"]
# Show where the predicted operator is located within the native promoter
for i in homolog_dict:
if i["promoter"]:
[before, after] = re.split(re.escape((operator_dict["native_operator"]).upper()), i["promoter"])
html = "<span style='color: black;'>"+str(before)+"</span>"
html += "<span style='color: red; font-size: 16px'>"+str(operator_dict["native_operator"])+"</span>"
html += "<span style='color: black;'>"+str(after)+"</span>"
# DISPLAY this html code
break
# Display the consensus sequence
consensus_seq = operator_dict["consensus_seq"]
# Create & Display the consensus motif logo
motif = operator_dict["motif"]
motif_html = []
color_key = {"A":"red", "T": "green", "C": "blue", "G": "#fcba03"}
for i in motif:
motif_html.append(
{
"color": str(color_key[i["base"].upper()]),
"fontSize": 400,
"fontWeight": 550,
"display": 'inline-block',
"translateY": str(1.25-i["score"]**3),
"scaleY": str(3*i["score"]**3),
"base": str(i["base"])
}
)
return_data = {
"homolog_dict": homolog_dict,
"coordinates_df": coordinates_df,
"aligned_seqs": pd.DataFrame(operator_dict["aligned_seqs"]),
"consensus_score": operator_dict["consensus_score"],
"num_seqs": operator_dict["num_seqs"],
"html": html,
"consensus_seq": consensus_seq,
"motif_html": motif_html
}
return return_data
def format_homologs(homolog_dict):
extracted_dict = [{"coverage": d["coverage"], "identity": d["identity"], "Uniprot Id": d["Uniprot Id"], "promoter": d["promoter"]} for d in homolog_dict]
return extracted_dict
# def write_results_to_db(table, extracted_dict, coordinates, aligned, consensus, num, passed_in_data, PASSED_UUID):
def write_results_to_db(table, extracted_dict, return_data, passed_in_data, PASSED_UUID):
coordinates = return_data["coordinates_df"].to_dict('records')
aligned_seqs = return_data["aligned_seqs"].to_dict('records')
Item={
'PK': primary,
'SK': PASSED_UUID,
'homolog': extracted_dict,
'coordinates': coordinates,
'aligned_seq': aligned_seqs,
'consensus_score': return_data["consensus_score"],
'num_seqs': return_data["num_seqs"],
"html": return_data["html"],
"consensus_seq": return_data["consensus_seq"],
"motif_html": return_data["motif_html"],
'hash': hashed,
'status': 'complete'
}
# Prepare DynamoDB
parsed_data = json.loads(json.dumps(Item), parse_float=Decimal)
dynamodb = boto3.resource('dynamodb', region_name='us-east-2')
table = dynamodb.Table(TABLE_NAME)
# Write data
try:
table.put_item(
Item=parsed_data
)
except Exception as e:
print(e)
# This function propagates error up throughout the whole application and writes it to the DB
def central_error_handling(reason, passed_in_data):
print(f'Snowprint failure: {reason}')
Item={
'PK': primary,
'SK': PASSED_UUID,
'hash': hashed,
'status': 'error',
'reason': str(reason)
}
# Prepare DynamoDB
parsed_data = json.loads(json.dumps(Item))
dynamodb = boto3.resource('dynamodb', region_name='us-east-2')
table = dynamodb.Table(TABLE_NAME)
# Write data
try:
table.put_item(
Item=parsed_data
)
except Exception as e:
print(e)
exit()
if __name__ == "__main__":
# The following code may look strange but there is a method to the madness:
# My goal is to create ECS tasks that are completely async - they get data and then write to a DynamoDB when they are done
# I didn't want them to be dependent on any other AWS service to further complicate things
# This task is started by lambda to keep costs as low as possible
# The challenge became - how do we get JSON data into an ECS container?
# One solution could be using S3 but then we'd need to create unique JSON files in S3 and feed that specific file name to the container
# ^ This felt like a waste of a service to just hold a json object
# Another idea was just to create a file in /tmp/ in lambda but there's no way to specify a volume source location in boto3
# ^ You can configure a mountPoint in the ECS task definition but again, using NamedTemporaryFile results in a unique name which can't be overwritten in containerOverrides
# Then - I tried to just stringify the JSON object and pass it into the container with -e through docker
# This would have worked but an object in the format of '{"some":"value"}' threw an error since docker through it was the repository name no matter what way to shook it
# Thus - this solution is born:
# 1. Lambda recieves the event body from the API requests
# 2. It encodes it into base64 (this format ensures docker does not think it's a repository name) and passes in the environment variable (no need for a file)
# 3. This container reads the environment variable
# 4. It then must decode the base64, decode the bytestring, then double json.loads to get a valid python dict
# Get environment
PASSED_IN_JSON = os.environ['PASSED_IN_JSON']
TABLE_NAME = os.environ['TABLE_NAME']
PASSED_UUID = os.environ['UUID']
global hashed
global primary
# Decode base64 JSON
decode_base64 = base64.b64decode(PASSED_IN_JSON)
decode_bytes = decode_base64.decode('utf-8')
data = json.loads(decode_bytes)
# Set globals
primary = data["acc"]
json_string = json.dumps(data, sort_keys=True)
hashed = hashlib.sha256(json_string.encode()).hexdigest()
# Start snowprint
try:
blast_params, promoter_params, operator_params = set_params(data)
except Exception as e:
central_error_handling(e, data)
try:
return_data = perform_blast(blast_params, promoter_params, operator_params, data)
except Exception as e:
central_error_handling(e, data)
# Write results
extracted_dict = format_homologs(return_data["homolog_dict"])
write_results_to_db(TABLE_NAME, extracted_dict, return_data, data, PASSED_UUID)