@@ -6,6 +6,7 @@ local cjson = require "cjson.safe"
6
6
local pl_stringx = require (" pl.stringx" )
7
7
local date = require (" date" )
8
8
local get_request_id = require (" kong.tracing.request_id" ).get
9
+ local AWS_Stream = require (" kong.plugins.aws-lambda.aws_stream" )
9
10
10
11
local EMPTY = {}
11
12
@@ -350,6 +351,115 @@ local function remove_array_mt_for_empty_table(tbl)
350
351
return tbl
351
352
end
352
353
354
+ local HttpIntegrationResponseProcessor = {}
355
+ function HttpIntegrationResponseProcessor :new ()
356
+ local instance = {
357
+ first_chunk_body = " " ,
358
+ null_count = 0 ,
359
+ prelude = " "
360
+ }
361
+ setmetatable (instance , {__index = HttpIntegrationResponseProcessor })
362
+ return instance
363
+ end
364
+
365
+ -- Update `ngx.status` and the parameter `headers` from the prelude string.
366
+ -- Return error if any.
367
+ function HttpIntegrationResponseProcessor :update_status_and_headers_from_prelude (headers )
368
+ local p , err = cjson .decode (self .prelude )
369
+ if err or not p then
370
+ return error (err )
371
+ end
372
+ if p [" statusCode" ] then
373
+ ngx .status = p [" statusCode" ]
374
+ -- headers and cookies will only be applied if the statusCode is provided
375
+ if p [" headers" ] then
376
+ for k , v in pairs (p [" headers" ]) do
377
+ headers [k ] = v
378
+ end
379
+ end
380
+ if p [" cookies" ] then
381
+ headers [" Set-Cookie" ] = p [" cookies" ]
382
+ end
383
+ end
384
+ end
385
+
386
+ -- Process one message.
387
+ -- Return error if any.
388
+ function HttpIntegrationResponseProcessor :process_message (msg , headers )
389
+ for _ , header in ipairs (msg .headers ) do
390
+ if self .null_count == 8 then
391
+ break
392
+ end
393
+ if header .key == " :event-type" and header .value == " PayloadChunk" then
394
+ -- print(msg.body)
395
+ for i = 1 , # msg .body do
396
+ local c = msg .body :byte (i )
397
+ if c == 0 then
398
+ self .null_count = self .null_count + 1
399
+ if self .null_count == 8 then
400
+ local err = self :update_status_and_headers_from_prelude (headers )
401
+ if err then
402
+ return error (err )
403
+ end
404
+
405
+ self .first_chunk_body = msg .body :sub (i + 1 )
406
+ break
407
+ end
408
+ else
409
+ self .prelude = self .prelude .. string.char (c )
410
+ end
411
+ end
412
+ end
413
+ end
414
+ end
415
+
416
+
417
+ function HttpIntegrationResponseProcessor :process (reader , buffer_size , headers )
418
+ while self .null_count < 8 do
419
+ local chunk , err = reader (buffer_size )
420
+ if err then
421
+ return nil , error (err )
422
+ end
423
+
424
+ if chunk then
425
+ -- the chunk is in `application/vnd.amazon.eventstream` formatted
426
+ -- which is a binary format, we need to parse it
427
+ local parser , err = AWS_Stream :new (chunk , false )
428
+ if err or parser == nil then
429
+ -- print("ERROR: ", err)
430
+ return nil , error (err )
431
+ end
432
+
433
+ while true do
434
+ if self .null_count == 8 then
435
+ break -- will jump to the end of the function
436
+ end
437
+
438
+ local msg = parser :next_message ()
439
+
440
+ if not msg then
441
+ break -- read again
442
+ end
443
+
444
+ -- print(require("pl.pretty").write(msg))
445
+ local err = self :process_message (msg , headers )
446
+ if err then
447
+ return nil , error (err )
448
+ end
449
+ end
450
+ end
451
+ end
452
+
453
+ return self .first_chunk_body , nil
454
+ end
455
+
456
+ -- Process the http integration response.
457
+ -- This will update `ngx.status` and the parameter `headers`.
458
+ -- Return the first chunk body and error if any.
459
+ local function process_http_integration_response (reader , buffer_size , headers )
460
+ local processor = HttpIntegrationResponseProcessor :new ()
461
+ return processor :process (reader , buffer_size , headers )
462
+ end
353
463
354
464
return {
355
465
aws_serializer = aws_serializer ,
@@ -358,4 +468,5 @@ return {
358
468
build_request_payload = build_request_payload ,
359
469
extract_proxy_response = extract_proxy_response ,
360
470
remove_array_mt_for_empty_table = remove_array_mt_for_empty_table ,
471
+ process_http_integration_response = process_http_integration_response ,
361
472
}
0 commit comments