1
- // Copyright 2018 TriggerMesh, Inc
2
- //
3
- // Licensed under the Apache License, Version 2.0 (the "License");
4
- // you may not use this file except in compliance with the License.
5
- // You may obtain a copy of the License at
6
- //
7
- // http://www.apache.org/licenses/LICENSE-2.0
8
- //
9
- // Unless required by applicable law or agreed to in writing, software
10
- // distributed under the License is distributed on an "AS IS" BASIS,
11
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
- // See the License for the specific language governing permissions and
13
- // limitations under the License.
1
+ /*
2
+ Copyright 2021 Triggermesh Inc.
3
+
4
+ Licensed under the Apache License, Version 2.0 (the "License");
5
+ you may not use this file except in compliance with the License.
6
+ You may obtain a copy of the License at
7
+
8
+ http://www.apache.org/licenses/LICENSE-2.0
9
+
10
+ Unless required by applicable law or agreed to in writing, software
11
+ distributed under the License is distributed on an "AS IS" BASIS,
12
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ See the License for the specific language governing permissions and
14
+ limitations under the License.
15
+ */
14
16
15
17
package main
16
18
@@ -27,10 +29,9 @@ import (
27
29
"time"
28
30
29
31
"github.com/kelseyhightower/envconfig"
30
- "github.com/triggermesh/aws-custom-runtime/pkg/events"
31
- "github.com/triggermesh/aws-custom-runtime/pkg/events/apigateway"
32
- "github.com/triggermesh/aws-custom-runtime/pkg/events/cloudevents"
33
- "github.com/triggermesh/aws-custom-runtime/pkg/events/passthrough"
32
+
33
+ "github.com/triggermesh/aws-custom-runtime/pkg/converter"
34
+ "github.com/triggermesh/aws-custom-runtime/pkg/sender"
34
35
)
35
36
36
37
var (
@@ -67,13 +68,16 @@ type Specification struct {
67
68
// Lambda API port to put function requests and get results
68
69
ExternalAPIport string `envconfig:"external_api_port" default:"8080"`
69
70
70
- // Apply response wrapping before sending it back to the client.
71
- // Common case - AWS Lambda functions usually returns data formatted for API Gateway service.
72
- // Set "RESPONSE_WRAPPER: API_GATEWAY" and receive events as if they were processed by API Gateway.
73
- // Opposite scenario - return responses in CloudEvent format: "RESPONSE_WRAPPER: CLOUDEVENTS"
74
- // NOTE: Response wrapper does both encoding and decoding depending on the type. We should consider
75
- // separating wrappers by their function.
76
- ResponseWrapper string `envconfig:"response_wrapper"`
71
+ Sink string `envconfig:"k_sink"`
72
+ ResponseFormat string `envconfig:"response_format"`
73
+ }
74
+
75
+ type Handler struct {
76
+ Sender * sender.Sender
77
+ Converter converter.Converter
78
+
79
+ requestSizeLimit int64
80
+ functionTTL int64
77
81
}
78
82
79
83
type message struct {
@@ -98,10 +102,10 @@ func (rw *responseWrapper) WriteHeader(statusCode int) {
98
102
rw .StatusCode = statusCode
99
103
}
100
104
101
- func ( s * Specification ) setupEnv () error {
105
+ func setupEnv (internalAPIport string ) error {
102
106
environment ["_HANDLER" ], _ = os .LookupEnv ("_HANDLER" )
103
107
environment ["LAMBDA_TASK_ROOT" ], _ = os .LookupEnv ("LAMBDA_TASK_ROOT" )
104
- environment ["AWS_LAMBDA_RUNTIME_API" ] += ":" + s . InternalAPIport
108
+ environment ["AWS_LAMBDA_RUNTIME_API" ] += ":" + internalAPIport
105
109
106
110
for k , v := range environment {
107
111
if err := os .Setenv (k , v ); err != nil {
@@ -111,9 +115,9 @@ func (s *Specification) setupEnv() error {
111
115
return nil
112
116
}
113
117
114
- func (s * Specification ) newTask (w http.ResponseWriter , r * http.Request ) {
115
- requestSizeLimitInBytes := s . RequestSizeLimit * 1e+6
116
- functionTTLInNanoSeconds := s . FunctionTTL * 1e+9
118
+ func (h * Handler ) newTask (w http.ResponseWriter , r * http.Request ) {
119
+ requestSizeLimitInBytes := h . requestSizeLimit * 1e+6
120
+ functionTTLInNanoSeconds := h . functionTTL * 1e+9
117
121
body , err := ioutil .ReadAll (http .MaxBytesReader (w , r .Body , requestSizeLimitInBytes ))
118
122
if err != nil {
119
123
http .Error (w , err .Error (), http .StatusInternalServerError )
@@ -140,12 +144,19 @@ func (s *Specification) newTask(w http.ResponseWriter, r *http.Request) {
140
144
select {
141
145
case <- time .After (time .Duration (functionTTLInNanoSeconds )):
142
146
log .Printf ("-> ! %s Deadline is reached\n " , task .id )
143
- w .WriteHeader (http .StatusGone )
144
- w .Write ([]byte (fmt .Sprintf ("Deadline is reached, data %s" , task .data )))
147
+ resp := []byte (fmt .Sprintf ("Deadline is reached, data %s" , task .data ))
148
+ if err := h .Sender .Send (resp , http .StatusGone , w ); err != nil {
149
+ log .Printf ("! %s %v\n " , task .id , err )
150
+ }
145
151
case result := <- resultsChannel :
146
152
log .Printf ("-> %s %d %s\n " , result .id , result .statusCode , result .data )
147
- w .WriteHeader (result .statusCode )
148
- w .Write (result .data )
153
+ body , err := h .Converter .Convert (result .data )
154
+ if err != nil {
155
+ log .Printf ("! %s %v\n " , result .id , err )
156
+ }
157
+ if err := h .Sender .Send (body , result .statusCode , w ); err != nil {
158
+ log .Printf ("! %s %v\n " , result .id , err )
159
+ }
149
160
}
150
161
mutex .Lock ()
151
162
delete (results , task .id )
@@ -231,31 +242,6 @@ func responseHandler(w http.ResponseWriter, r *http.Request) {
231
242
return
232
243
}
233
244
234
- func (s * Specification ) mapEvent (h http.Handler ) http.Handler {
235
- var mapper events.Mapper
236
-
237
- switch s .ResponseWrapper {
238
- case "API_GATEWAY" :
239
- mapper = apigateway .NewMapper ()
240
- case "CLOUDEVENTS" :
241
- mapper = cloudevents .NewMapper ()
242
- if err := envconfig .Process ("CE" , mapper ); err != nil {
243
- log .Fatalf ("Cannot process CloudEvents wrapper env variables: %v" , err )
244
- }
245
- default :
246
- mapper = passthrough .NewMapper ()
247
- }
248
-
249
- return http .HandlerFunc (func (w http.ResponseWriter , r * http.Request ) {
250
- rw := responseWrapper {
251
- ResponseWriter : w ,
252
- }
253
- mapper .Request (r )
254
- h .ServeHTTP (& rw , r )
255
- mapper .Response (w , rw .StatusCode , rw .Body )
256
- })
257
- }
258
-
259
245
func ping (w http.ResponseWriter , r * http.Request ) {
260
246
w .WriteHeader (http .StatusOK )
261
247
w .Write ([]byte ("pong" ))
@@ -282,28 +268,46 @@ func api() error {
282
268
}
283
269
284
270
func main () {
271
+ // parse env
285
272
var spec Specification
286
273
if err := envconfig .Process ("" , & spec ); err != nil {
287
274
log .Fatalf ("Cannot process env variables: %v" , err )
288
275
}
289
276
log .Printf ("%+v\n " , spec )
290
277
291
278
log .Println ("Setting up runtime env" )
292
- if err := spec . setupEnv (); err != nil {
279
+ if err := setupEnv (spec . InternalAPIport ); err != nil {
293
280
log .Fatalf ("Cannot setup runime env: %v" , err )
294
281
}
295
282
283
+ // create converter
284
+ conv , err := converter .New (spec .ResponseFormat )
285
+ if err != nil {
286
+ log .Fatalf ("Cannot create converter: %v" , err )
287
+ }
288
+
289
+ // setup sender
290
+ handler := Handler {
291
+ Sender : sender .New (spec .Sink , conv .ContentType ()),
292
+ Converter : conv ,
293
+ requestSizeLimit : spec .RequestSizeLimit ,
294
+ functionTTL : spec .FunctionTTL ,
295
+ }
296
+
297
+ // setup channels
296
298
tasks = make (chan message , 100 )
297
299
results = make (map [string ]chan message )
298
300
defer close (tasks )
299
301
302
+ // start Lambda API
300
303
log .Println ("Starting API" )
301
304
go func () {
302
305
if err := api (); err != nil {
303
306
log .Fatalf ("Runtime internal API error: %v" , err )
304
307
}
305
308
}()
306
309
310
+ // start invokers
307
311
for i := 0 ; i < spec .NumberOfinvokers ; i ++ {
308
312
log .Println ("Starting bootstrap" , i + 1 )
309
313
go func (i int ) {
@@ -317,11 +321,12 @@ func main() {
317
321
}(i )
318
322
}
319
323
324
+ // start external API
320
325
taskRouter := http .NewServeMux ()
321
- taskHandler := http .HandlerFunc (spec .newTask )
322
- taskRouter .Handle ("/" , spec . mapEvent ( taskHandler ) )
326
+ taskHandler := http .HandlerFunc (handler .newTask )
327
+ taskRouter .Handle ("/" , taskHandler )
323
328
log .Println ("Listening..." )
324
- err : = http .ListenAndServe (":" + spec .ExternalAPIport , taskRouter )
329
+ err = http .ListenAndServe (":" + spec .ExternalAPIport , taskRouter )
325
330
if err != nil && err != http .ErrServerClosed {
326
331
log .Fatalf ("Runtime external API error: %v" , err )
327
332
}
0 commit comments