15
15
package main
16
16
17
17
import (
18
+ "encoding/json"
18
19
"fmt"
19
20
"io/ioutil"
20
21
"log"
@@ -34,9 +35,12 @@ const (
34
35
)
35
36
36
37
type message struct {
37
- id string
38
- deadline int64
39
- data []byte
38
+ ID string
39
+ Deadline int64
40
+ StatusCode int
41
+ Headers map [string ]interface {}
42
+ IsBase64Encoded bool
43
+ Body string
40
44
}
41
45
42
46
var (
@@ -82,32 +86,37 @@ func newTask(w http.ResponseWriter, r *http.Request) {
82
86
83
87
now := time .Now ().UnixNano ()
84
88
task := message {
85
- id : fmt .Sprintf ("%d" , now ),
86
- deadline : now + functionTTL ,
87
- data : body ,
89
+ ID : fmt .Sprintf ("%d" , now ),
90
+ Deadline : now + functionTTL ,
91
+ Body : string ( body ) ,
88
92
}
89
- fmt .Printf ("<- %s %s\n " , task .id , task .data )
93
+ fmt .Printf ("<- %s %s\n " , task .ID , task .Body )
90
94
91
95
resultsChannel := make (chan message )
92
96
mutex .Lock ()
93
- results [task .id ] = resultsChannel
97
+ results [task .ID ] = resultsChannel
94
98
mutex .Unlock ()
95
99
defer close (resultsChannel )
96
100
97
101
tasks <- task
98
102
99
103
select {
100
104
case <- time .After (time .Duration (functionTTL )):
101
- fmt .Printf ("-> ! %s Deadline is reached\n " , task .id )
105
+ fmt .Printf ("-> ! %s Deadline is reached\n " , task .ID )
102
106
w .WriteHeader (http .StatusGone )
103
- w .Write ([]byte (fmt .Sprintf ("Deadline is reached, data %s" , task .data )))
107
+ w .Write ([]byte (fmt .Sprintf ("Deadline is reached, data %s" , task .Body )))
104
108
case result := <- resultsChannel :
105
- fmt .Printf ("-> %s %s\n " , result .id , result .data )
106
- w .WriteHeader (http .StatusOK )
107
- w .Write (result .data )
109
+ fmt .Printf ("-> %s %d %s\n " , result .ID , result .StatusCode , result .Body )
110
+ for k , v := range result .Headers {
111
+ if value , ok := v .(string ); ok {
112
+ w .Header ().Add (k , value )
113
+ }
114
+ }
115
+ w .WriteHeader (result .StatusCode )
116
+ w .Write ([]byte (result .Body ))
108
117
}
109
118
mutex .Lock ()
110
- delete (results , task .id )
119
+ delete (results , task .ID )
111
120
mutex .Unlock ()
112
121
return
113
122
}
@@ -116,13 +125,13 @@ func getTask(w http.ResponseWriter, r *http.Request) {
116
125
task := <- tasks
117
126
118
127
// Dummy headers required by Rust client. Replace with something meaningful
119
- w .Header ().Set ("Lambda-Runtime-Aws-Request-Id" , task .id )
120
- w .Header ().Set ("Lambda-Runtime-Deadline-Ms" , strconv .Itoa (int (task .deadline )))
128
+ w .Header ().Set ("Lambda-Runtime-Aws-Request-Id" , task .ID )
129
+ w .Header ().Set ("Lambda-Runtime-Deadline-Ms" , strconv .Itoa (int (task .Deadline )))
121
130
w .Header ().Set ("Lambda-Runtime-Invoked-Function-Arn" , "arn:aws:lambda:us-east-1:123456789012:function:custom-runtime" )
122
131
w .Header ().Set ("Lambda-Runtime-Trace-Id" , "0" )
123
132
124
133
w .WriteHeader (http .StatusOK )
125
- w .Write (task .data )
134
+ w .Write ([] byte ( task .Body ) )
126
135
return
127
136
}
128
137
@@ -154,7 +163,7 @@ func responseHandler(w http.ResponseWriter, r *http.Request) {
154
163
return
155
164
}
156
165
157
- data , err := ioutil .ReadAll (r .Body )
166
+ body , err := ioutil .ReadAll (r .Body )
158
167
if err != nil {
159
168
fmt .Printf ("! %s\n " , err )
160
169
return
@@ -170,23 +179,41 @@ func responseHandler(w http.ResponseWriter, r *http.Request) {
170
179
return
171
180
}
172
181
182
+ result := message {
183
+ Body : string (body ),
184
+ StatusCode : 200 ,
185
+ }
173
186
switch kind {
174
187
case "response" :
188
+ if js , ok := parseJSON (body ); ok {
189
+ result = js
190
+ }
175
191
case "error" :
176
- fmt .Printf ("! Error: %s\n " , data )
192
+ result .StatusCode = 500
193
+ fmt .Printf ("! Error: %s\n " , body )
177
194
default :
178
195
w .WriteHeader (http .StatusNotFound )
179
196
w .Write ([]byte (fmt .Sprintf ("Unknown endpoint: %s" , kind )))
180
197
return
181
198
}
182
- resultsChannel <- message {
183
- id : id ,
184
- data : data ,
185
- }
199
+ result .ID = id
200
+ resultsChannel <- result
186
201
w .WriteHeader (http .StatusAccepted )
187
202
return
188
203
}
189
204
205
+ func parseJSON (s []byte ) (message , bool ) {
206
+ var js message
207
+ if err := json .Unmarshal (s , & js ); err != nil {
208
+ fmt .Println (s , err )
209
+ return js , false
210
+ }
211
+ if js .StatusCode == 0 {
212
+ return js , false
213
+ }
214
+ return js , true
215
+ }
216
+
190
217
func api () error {
191
218
apiRouter := http .NewServeMux ()
192
219
apiRouter .HandleFunc (awsEndpoint + "/init/error" , initError )
0 commit comments