Skip to content

Commit 9692271

Browse files
committed
add auto deadletter queue, dashboard
1 parent 32db4a4 commit 9692271

File tree

1 file changed

+180
-21
lines changed

1 file changed

+180
-21
lines changed

run.py

Lines changed: 180 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,15 @@
99
from email.mime.multipart import MIMEMultipart
1010
from email.mime.text import MIMEText
1111

12+
CREATE_DASHBOARD = False
13+
CLEAN_DASHBOARD = False
14+
1215
from config import *
16+
17+
# Back compatability with old config requirements
18+
if ':' in SQS_DEAD_LETTER_QUEUE:
19+
SQS_DEAD_LETTER_QUEUE = SQS_DEAD_LETTER_QUEUE.rsplit(':',1)[1]
20+
1321
WAIT_TIME = 60
1422
MONITOR_TIME = 60
1523

@@ -45,15 +53,6 @@
4553
]
4654
}
4755

48-
SQS_DEFINITION = {
49-
"DelaySeconds": "0",
50-
"MaximumMessageSize": "262144",
51-
"MessageRetentionPeriod": "1209600",
52-
"ReceiveMessageWaitTimeSeconds": "0",
53-
"RedrivePolicy": "{\"deadLetterTargetArn\":\"" + SQS_DEAD_LETTER_QUEUE + "\",\"maxReceiveCount\":\"10\"}",
54-
"VisibilityTimeout": str(SQS_MESSAGE_VISIBILITY)
55-
}
56-
5756

5857
#################################
5958
# AUXILIARY FUNCTIONS
@@ -157,29 +156,44 @@ def create_or_update_ecs_service(ecs, ECS_SERVICE_NAME, ECS_TASK_NAME):
157156
ecs.create_service(cluster=ECS_CLUSTER, serviceName=ECS_SERVICE_NAME, taskDefinition=ECS_TASK_NAME, desiredCount=0)
158157
print('Service created')
159158

160-
def get_queue_url(sqs):
159+
def get_queue_url(sqs, queue_name):
161160
result = sqs.list_queues()
161+
queue_url = None
162162
if 'QueueUrls' in result.keys():
163163
for u in result['QueueUrls']:
164-
if u.split('/')[-1] == SQS_QUEUE_NAME:
165-
return u
166-
return None
164+
if u.split('/')[-1] == queue_name:
165+
queue_url = u
166+
return queue_url
167167

168168
def get_or_create_queue(sqs):
169-
u = get_queue_url(sqs)
170-
if u is None:
169+
queue_url = get_queue_url(sqs, SQS_QUEUE_NAME)
170+
dead_url = get_queue_url(sqs, SQS_DEAD_LETTER_QUEUE)
171+
if dead_url is None:
172+
print("Creating DeadLetter queue")
173+
sqs.create_queue(QueueName=SQS_DEAD_LETTER_QUEUE)
174+
time.sleep(WAIT_TIME)
175+
dead_url = get_queue_url(sqs, SQS_DEAD_LETTER_QUEUE)
176+
else:
177+
print (f'DeadLetter queue {SQS_DEAD_LETTER_QUEUE} already exists.')
178+
if queue_url is None:
171179
print('Creating queue')
180+
response = sqs.get_queue_attributes(QueueUrl=dead_url, AttributeNames=["QueueArn"])
181+
dead_arn = response["Attributes"]["QueueArn"]
182+
SQS_DEFINITION = {
183+
"DelaySeconds": "0",
184+
"MaximumMessageSize": "262144",
185+
"MessageRetentionPeriod": "1209600",
186+
"ReceiveMessageWaitTimeSeconds": "0",
187+
"RedrivePolicy": '{"deadLetterTargetArn":"'
188+
+ dead_arn
189+
+ '","maxReceiveCount":"10"}',
190+
"VisibilityTimeout": str(SQS_MESSAGE_VISIBILITY),
191+
}
172192
sqs.create_queue(QueueName=SQS_QUEUE_NAME, Attributes=SQS_DEFINITION)
173193
time.sleep(WAIT_TIME)
174194
else:
175195
print('Queue exists')
176196

177-
def loadConfig(configFile):
178-
data = None
179-
with open(configFile, 'r') as conf:
180-
data = json.load(conf)
181-
return data
182-
183197
def killdeadAlarms(fleetId,monitorapp,ec2,cloud):
184198
todel=[]
185199
changes = ec2.describe_spot_fleet_request_history(SpotFleetRequestId=fleetId,StartTime=(datetime.datetime.now()-datetime.timedelta(hours=2)).replace(microsecond=0))
@@ -283,6 +297,143 @@ def export_logs(logs, loggroupId, starttime, bucketId):
283297
break
284298
time.sleep(30)
285299

300+
def create_dashboard(requestInfo):
301+
cloudwatch = boto3.client("cloudwatch")
302+
DashboardMessage = {
303+
"widgets": [
304+
{
305+
"height": 6,
306+
"width": 6,
307+
"y": 0,
308+
"x": 18,
309+
"type": "metric",
310+
"properties": {
311+
"metrics": [
312+
[ "AWS/SQS", "NumberOfMessagesReceived", "QueueName", f'{APP_NAME}Queue' ],
313+
[ ".", "NumberOfMessagesDeleted", ".", "." ],
314+
],
315+
"view": "timeSeries",
316+
"stacked": False,
317+
"region": AWS_REGION,
318+
"period": 300,
319+
"stat": "Average"
320+
}
321+
},
322+
{
323+
"height": 6,
324+
"width": 6,
325+
"y": 0,
326+
"x": 6,
327+
"type": "metric",
328+
"properties": {
329+
"view": "timeSeries",
330+
"stacked": False,
331+
"metrics": [
332+
[ "AWS/ECS", "MemoryUtilization", "ClusterName", ECS_CLUSTER ]
333+
],
334+
"region": AWS_REGION,
335+
"period": 300,
336+
"yAxis": {
337+
"left": {
338+
"min": 0
339+
}
340+
}
341+
}
342+
},
343+
{
344+
"height": 6,
345+
"width": 6,
346+
"y": 0,
347+
"x": 12,
348+
"type": "metric",
349+
"properties": {
350+
"metrics": [
351+
[ "AWS/SQS", "ApproximateNumberOfMessagesVisible", "QueueName", f"{APP_NAME}Queue" ],
352+
[ ".", "ApproximateNumberOfMessagesNotVisible", ".", "."],
353+
],
354+
"view": "timeSeries",
355+
"stacked": True,
356+
"region": AWS_REGION,
357+
"period": 300,
358+
"stat": "Average"
359+
}
360+
},
361+
{
362+
"height": 6,
363+
"width": 12,
364+
"y": 6,
365+
"x": 12,
366+
"type": "log",
367+
"properties": {
368+
"query": f"SOURCE {APP_NAME} | fields @message| filter @message like 'bioformats2raw'| stats count_distinct(@message)",
369+
"region": AWS_REGION,
370+
"stacked": False,
371+
"title": "Distinct Logs",
372+
"view": "table"
373+
}
374+
},
375+
{
376+
"height": 6,
377+
"width": 12,
378+
"y": 6,
379+
"x": 0,
380+
"type": "log",
381+
"properties": {
382+
"query": f"SOURCE {APP_NAME} | fields @message| filter @message like 'bioformats2raw'| stats count(@message)",
383+
"region": AWS_REGION,
384+
"stacked": False,
385+
"title": "All Logs",
386+
"view": "table"
387+
}
388+
},
389+
{
390+
"height": 6,
391+
"width": 24,
392+
"y": 12,
393+
"x": 0,
394+
"type": "log",
395+
"properties": {
396+
"query": f"SOURCE {APP_NAME} | fields @message | filter @message like \"Error\"\n\n | display @message",
397+
"region": AWS_REGION,
398+
"stacked": False,
399+
"title": "Errors",
400+
"view": "table"
401+
}
402+
},
403+
{
404+
"height": 6,
405+
"width": 6,
406+
"y": 0,
407+
"x": 0,
408+
"type": "metric",
409+
"properties": {
410+
"metrics": [
411+
[ "AWS/EC2Spot", "FulfilledCapacity", "FleetRequestId", requestInfo["SpotFleetRequestId"]],
412+
[ ".", "TargetCapacity", ".", "."],
413+
],
414+
"view": "timeSeries",
415+
"stacked": False,
416+
"region": AWS_REGION,
417+
"period": 300,
418+
"stat": "Average"
419+
}
420+
}
421+
]
422+
}
423+
DashboardMessage_json = json.dumps(DashboardMessage, indent = 4)
424+
response = cloudwatch.put_dashboard(DashboardName=APP_NAME, DashboardBody=DashboardMessage_json)
425+
if response['DashboardValidationMessages']:
426+
print ('Likely error in Dashboard creation')
427+
print (response['DashboardValidationMessages'])
428+
429+
430+
def clean_dashboard(monitorapp):
431+
cloudwatch = boto3.client("cloudwatch")
432+
dashboard_list = cloudwatch.list_dashboards()
433+
for entry in dashboard_list["DashboardEntries"]:
434+
if monitorapp in entry["DashboardName"]:
435+
cloudwatch.delete_dashboards(DashboardNames=[entry["DashboardName"]])
436+
286437
#################################
287438
# CLASS TO HANDLE SQS QUEUE
288439
#################################
@@ -451,6 +602,10 @@ def startCluster():
451602

452603
print('Spot fleet successfully created. Your job should start in a few minutes.')
453604

605+
if CREATE_DASHBOARD:
606+
print ("Creating CloudWatch dashboard for run metrics")
607+
create_dashboard(requestInfo)
608+
454609
#################################
455610
# SERVICE 4: MONITOR JOB
456611
#################################
@@ -555,6 +710,10 @@ def monitor(cheapest=False):
555710
print("Removing cluster if it's not the default and not otherwise in use")
556711
removeClusterIfUnused(monitorcluster, ecs)
557712

713+
# Remove Cloudwatch dashboard if created and cleanup desired
714+
if CREATE_DASHBOARD and CLEAN_DASHBOARD:
715+
clean_dashboard(monitorapp)
716+
558717
#Step 6: Export the logs to S3
559718
logs=boto3.client('logs')
560719

0 commit comments

Comments
 (0)