-
Notifications
You must be signed in to change notification settings - Fork 100
Expand file tree
/
Copy pathhandler.py
More file actions
113 lines (94 loc) · 3.8 KB
/
handler.py
File metadata and controls
113 lines (94 loc) · 3.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import datetime, io, json, os, sys, uuid
# Add current directory to allow location of packages
sys.path.append(os.path.join(os.path.dirname(__file__), '.python_packages/lib/site-packages'))
def handler(event, context):
income_timestamp = datetime.datetime.now().timestamp()
# Flag to indicate whether the measurements should be returned as an HTTP
# response or via a result queue.
return_http = True
# Queue trigger
if ("Records" in event and event["Records"][0]["eventSource"] == 'aws:sqs'):
event = json.loads(event["Records"][0]["body"])
return_http = False
# Storage trigger
if ("Records" in event and "s3" in event["Records"][0]):
bucket_name = event["Records"][0]["s3"]["bucket"]["name"]
file_name = event["Records"][0]["s3"]["object"]["key"]
from function import storage
storage_inst = storage.storage.get_instance()
obj = storage_inst.get_object(bucket_name, file_name)
event = json.loads(obj)
return_http = False
# HTTP trigger with API Gateaway
if 'body' in event:
event = json.loads(event['body'])
# Run function and measure.
req_id = context.aws_request_id
event['request-id'] = req_id
event['income-timestamp'] = income_timestamp
begin = datetime.datetime.now()
from function import function
ret = function.handler(event)
end = datetime.datetime.now()
log_data = {
'output': ret['result']
}
if 'fns_triggered' in ret and ret['fns_triggered'] > 0:
log_data['fns_triggered'] = ret['fns_triggered']
if 'parent_execution_id' in ret:
log_data['parent_execution_id'] = ret['parent_execution_id']
if 'measurement' in ret:
log_data['measurement'] = ret['measurement']
if 'logs' in event:
log_data['time'] = (end - begin) / datetime.timedelta(microseconds=1)
results_begin = datetime.datetime.now()
from function import storage
storage_inst = storage.storage.get_instance()
b = event.get('logs').get('bucket')
storage_inst.upload_stream(b, '{}.json'.format(req_id),
io.BytesIO(json.dumps(log_data).encode('utf-8')))
results_end = datetime.datetime.now()
results_time = (results_end - results_begin) / datetime.timedelta(microseconds=1)
else:
results_time = 0
# cold test
is_cold = False
fname = os.path.join('/tmp', 'cold_run')
if not os.path.exists(fname):
is_cold = True
container_id = str(uuid.uuid4())[0:8]
with open(fname, 'a') as f:
f.write(container_id)
else:
with open(fname, 'r') as f:
container_id = f.read()
cold_start_var = ""
if "cold_start" in os.environ:
cold_start_var = os.environ["cold_start"]
stats = json.dumps({
'begin': begin.strftime('%s.%f'),
'end': end.strftime('%s.%f'),
'results_time': results_time,
'is_cold': is_cold,
'result': log_data,
'request_id': context.aws_request_id,
'cold_start_var': cold_start_var,
'container_id': container_id,
})
# Send the results onwards.
result_queue = os.getenv('RESULT_QUEUE')
if (return_http or result_queue is None):
# HTTP / library trigger, standalone function: return an HTTP response.
return {
'statusCode': 200,
'body': stats
}
else:
# Queue trigger, storage trigger, or application: write to a queue.
arn = context.invoked_function_arn.split(":")
region = arn[3]
account_id = arn[4]
queue_name = result_queue
from function import queue
queue_client = queue.queue(queue_name, account_id, region)
queue_client.send_message(stats)