@@ -29,12 +29,6 @@ import (
29
29
"github.com/triggermesh/aws-custom-runtime/pkg/events/apiGateway"
30
30
)
31
31
32
- const (
33
- numberOfinvokers = 8 // Number of bootstrap processes
34
- requestSizeLimit = 1e+7 // Request bosy size limit, 10Mb
35
- functionTTL = 3e+9 // Funtions deadline, 3 seconds
36
- )
37
-
38
32
type message struct {
39
33
id string
40
34
deadline int64
@@ -47,6 +41,10 @@ type responseWrapper struct {
47
41
}
48
42
49
43
var (
44
+ numberOfinvokers = 8 // Number of bootstrap processes
45
+ requestSizeLimit int64 = 5 // Request body size limit, Mb
46
+ functionTTL int64 = 10 // Funtions deadline, seconds
47
+
50
48
tasks chan message
51
49
results map [string ]chan message
52
50
@@ -85,7 +83,9 @@ func setupEnv() error {
85
83
}
86
84
87
85
func newTask (w http.ResponseWriter , r * http.Request ) {
88
- body , err := ioutil .ReadAll (http .MaxBytesReader (w , r .Body , requestSizeLimit ))
86
+ requestSizeLimitInBytes := requestSizeLimit * 1e+6
87
+ functionTTLInNanoSeconds := functionTTL * 1e+9
88
+ body , err := ioutil .ReadAll (http .MaxBytesReader (w , r .Body , requestSizeLimitInBytes ))
89
89
if err != nil {
90
90
http .Error (w , err .Error (), http .StatusInternalServerError )
91
91
return
@@ -95,7 +95,7 @@ func newTask(w http.ResponseWriter, r *http.Request) {
95
95
now := time .Now ().UnixNano ()
96
96
task := message {
97
97
id : fmt .Sprintf ("%d" , now ),
98
- deadline : now + functionTTL ,
98
+ deadline : now + functionTTLInNanoSeconds ,
99
99
data : body ,
100
100
}
101
101
fmt .Printf ("<- %s %s\n " , task .id , task .data )
@@ -109,7 +109,7 @@ func newTask(w http.ResponseWriter, r *http.Request) {
109
109
tasks <- task
110
110
111
111
select {
112
- case <- time .After (time .Duration (functionTTL )):
112
+ case <- time .After (time .Duration (functionTTLInNanoSeconds )):
113
113
fmt .Printf ("-> ! %s Deadline is reached\n " , task .id )
114
114
w .WriteHeader (http .StatusGone )
115
115
w .Write ([]byte (fmt .Sprintf ("Deadline is reached, data %s" , task .data )))
@@ -214,6 +214,30 @@ func mapEvent(h http.Handler) http.Handler {
214
214
})
215
215
}
216
216
217
+ func setLimits () {
218
+ if v , ok := os .LookupEnv ("INVOKER_COUNT" ); ok {
219
+ if vv , err := strconv .Atoi (v ); err != nil {
220
+ fmt .Printf ("can't set invokers limit, using default value %d\n " , numberOfinvokers )
221
+ } else {
222
+ numberOfinvokers = vv
223
+ }
224
+ }
225
+ if v , ok := os .LookupEnv ("REQUEST_SIZE_LIMIT" ); ok {
226
+ if vv , err := strconv .Atoi (v ); err != nil {
227
+ fmt .Printf ("can't set request size limit, using default value %d\n " , requestSizeLimit )
228
+ } else {
229
+ requestSizeLimit = int64 (vv )
230
+ }
231
+ }
232
+ if v , ok := os .LookupEnv ("FUNCTION_TTL" ); ok {
233
+ if vv , err := strconv .Atoi (v ); err != nil {
234
+ fmt .Printf ("can't set function ttl, using default value %d\n " , functionTTL )
235
+ } else {
236
+ functionTTL = int64 (vv )
237
+ }
238
+ }
239
+ }
240
+
217
241
func api () error {
218
242
apiRouter := http .NewServeMux ()
219
243
apiRouter .HandleFunc (awsEndpoint + "/init/error" , initError )
@@ -227,6 +251,9 @@ func main() {
227
251
results = make (map [string ]chan message )
228
252
defer close (tasks )
229
253
254
+ fmt .Println ("Setting limits" )
255
+ setLimits ()
256
+
230
257
fmt .Println ("Setup env" )
231
258
if err := setupEnv (); err != nil {
232
259
log .Fatalln (err )
@@ -237,8 +264,8 @@ func main() {
237
264
log .Fatalln (api ())
238
265
}()
239
266
240
- fmt .Println ("Starting invokers" )
241
267
for i := 0 ; i < numberOfinvokers ; i ++ {
268
+ fmt .Println ("Starting bootstrap" , i + 1 )
242
269
go func () {
243
270
if err := exec .Command ("sh" , "-c" , environment ["LAMBDA_TASK_ROOT" ]+ "/bootstrap" ).Run (); err != nil {
244
271
log .Fatalln (err )
0 commit comments