Skip to content
This repository was archived by the owner on Dec 11, 2023. It is now read-only.

Commit 6bcfc9e

Browse files
authored
Merge pull request #32 from triggermesh/request-conversion
Request conversion
2 parents 356748f + a0482fc commit 6bcfc9e

File tree

5 files changed

+125
-45
lines changed

5 files changed

+125
-45
lines changed

main.go

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -84,24 +84,10 @@ type message struct {
8484
id string
8585
deadline int64
8686
data []byte
87+
context map[string]string
8788
statusCode int
8889
}
8990

90-
type responseWrapper struct {
91-
http.ResponseWriter
92-
StatusCode int
93-
Body []byte
94-
}
95-
96-
func (rw *responseWrapper) Write(data []byte) (int, error) {
97-
rw.Body = data
98-
return len(data), nil
99-
}
100-
101-
func (rw *responseWrapper) WriteHeader(statusCode int) {
102-
rw.StatusCode = statusCode
103-
}
104-
10591
func setupEnv(internalAPIport string) error {
10692
environment["_HANDLER"], _ = os.LookupEnv("_HANDLER")
10793
environment["LAMBDA_TASK_ROOT"], _ = os.LookupEnv("LAMBDA_TASK_ROOT")
@@ -115,21 +101,38 @@ func setupEnv(internalAPIport string) error {
115101
return nil
116102
}
117103

118-
func (h *Handler) newTask(w http.ResponseWriter, r *http.Request) {
104+
func (h *Handler) serve(w http.ResponseWriter, r *http.Request) {
119105
requestSizeLimitInBytes := h.requestSizeLimit * 1e+6
120-
functionTTLInNanoSeconds := h.functionTTL * 1e+9
121106
body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, requestSizeLimitInBytes))
122107
if err != nil {
123-
http.Error(w, err.Error(), http.StatusInternalServerError)
108+
http.Error(w, err.Error(), http.StatusRequestEntityTooLarge)
124109
return
125110
}
126111
defer r.Body.Close()
127112

113+
req, context, err := h.Converter.Request(body, r.Header)
114+
if err != nil {
115+
http.Error(w, err.Error(), http.StatusInternalServerError)
116+
return
117+
}
118+
119+
result := enqueue(req, context, h.functionTTL*1e+9)
120+
result.data, err = h.Converter.Response(result.data)
121+
if err != nil {
122+
result.data = []byte(fmt.Sprintf("Response conversion error: %v", err))
123+
}
124+
if err := h.Sender.Send(result.data, result.statusCode, w); err != nil {
125+
log.Printf("! %s %s %v\n", result.id, result.data, err)
126+
}
127+
}
128+
129+
func enqueue(request []byte, context map[string]string, ttl int64) message {
128130
now := time.Now().UnixNano()
129131
task := message{
130132
id: fmt.Sprintf("%d", now),
131-
deadline: now + functionTTLInNanoSeconds,
132-
data: body,
133+
deadline: now + ttl,
134+
data: request,
135+
context: context,
133136
}
134137
log.Printf("<- %s %s\n", task.id, task.data)
135138

@@ -141,27 +144,22 @@ func (h *Handler) newTask(w http.ResponseWriter, r *http.Request) {
141144

142145
tasks <- task
143146

147+
var resp message
144148
select {
145-
case <-time.After(time.Duration(functionTTLInNanoSeconds)):
146-
log.Printf("-> ! %s Deadline is reached\n", task.id)
147-
resp := []byte(fmt.Sprintf("Deadline is reached, data %s", task.data))
148-
if err := h.Sender.Send(resp, http.StatusGone, w); err != nil {
149-
log.Printf("! %s %v\n", task.id, err)
149+
case <-time.After(time.Duration(ttl)):
150+
resp = message{
151+
id: task.id,
152+
data: []byte(fmt.Sprintf("Deadline is reached, data %s", task.data)),
153+
statusCode: http.StatusGone,
150154
}
151155
case result := <-resultsChannel:
152-
log.Printf("-> %s %d %s\n", result.id, result.statusCode, result.data)
153-
body, err := h.Converter.Convert(result.data)
154-
if err != nil {
155-
log.Printf("! %s %v\n", result.id, err)
156-
}
157-
if err := h.Sender.Send(body, result.statusCode, w); err != nil {
158-
log.Printf("! %s %v\n", result.id, err)
159-
}
156+
resp = result
160157
}
161158
mutex.Lock()
162159
delete(results, task.id)
163160
mutex.Unlock()
164-
return
161+
log.Printf("-> %s %d %s\n", resp.id, resp.statusCode, resp.data)
162+
return resp
165163
}
166164

167165
func getTask(w http.ResponseWriter, r *http.Request) {
@@ -172,6 +170,9 @@ func getTask(w http.ResponseWriter, r *http.Request) {
172170
w.Header().Set("Lambda-Runtime-Deadline-Ms", strconv.Itoa(int(task.deadline)))
173171
w.Header().Set("Lambda-Runtime-Invoked-Function-Arn", "arn:aws:lambda:us-east-1:123456789012:function:custom-runtime")
174172
w.Header().Set("Lambda-Runtime-Trace-Id", "0")
173+
for k, v := range task.context {
174+
w.Header().Set(k, v)
175+
}
175176

176177
w.WriteHeader(http.StatusOK)
177178
w.Write(task.data)
@@ -323,8 +324,7 @@ func main() {
323324

324325
// start external API
325326
taskRouter := http.NewServeMux()
326-
taskHandler := http.HandlerFunc(handler.newTask)
327-
taskRouter.Handle("/", taskHandler)
327+
taskRouter.Handle("/", http.HandlerFunc(handler.serve))
328328
log.Println("Listening...")
329329
err = http.ListenAndServe(":"+spec.ExternalAPIport, taskRouter)
330330
if err != nil && err != http.ErrServerClosed {

main_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func TestNewTask(t *testing.T) {
6363
defer close(tasks)
6464

6565
recorder := httptest.NewRecorder()
66-
h := http.HandlerFunc(handler.newTask)
66+
h := http.HandlerFunc(handler.serve)
6767

6868
req, err := http.NewRequest("POST", "/", bytes.NewBuffer(payload))
6969
if err != nil {

pkg/converter/cloudevents/cloudevents.go

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,29 @@ limitations under the License.
1717
package cloudevents
1818

1919
import (
20+
"bytes"
2021
"encoding/json"
2122
"fmt"
23+
"net/http"
24+
"strings"
2225
"time"
2326

2427
"github.com/google/uuid"
2528
"github.com/kelseyhightower/envconfig"
2629
)
2730

31+
const contentType = "application/cloudevents+json"
32+
2833
type ceBody struct {
2934
ID string `json:"id"`
3035
Type string `json:"type"`
3136
Time string `json:"time"`
3237
Source string `json:"source"`
3338
Specversion string `json:"specversion"`
39+
Contenttype string `json:"datacontenttype"`
3440
Data interface{} `json:"data"`
3541
}
3642

37-
const contentType = "application/cloudevents+json"
38-
3943
// CloudEvent is a data structure required to map KLR responses to cloudevents
4044
type CloudEvent struct {
4145
EventType string `envconfig:"type" default:"ce.klr.triggermesh.io"`
@@ -51,7 +55,7 @@ func New() (*CloudEvent, error) {
5155
return &ce, nil
5256
}
5357

54-
func (ce *CloudEvent) Convert(data []byte) ([]byte, error) {
58+
func (ce *CloudEvent) Response(data []byte) ([]byte, error) {
5559
// If response format is set to CloudEvents
5660
// and CE_TYPE is empty,
5761
// then reply with the empty response
@@ -60,11 +64,18 @@ func (ce *CloudEvent) Convert(data []byte) ([]byte, error) {
6064
}
6165

6266
var body interface{}
63-
body = string(data)
67+
contentType := "text/plain"
6468

65-
// try to decode function's response into JSON
66-
if json.Valid(data) {
69+
switch {
70+
case json.Valid(data) &&
71+
(bytes.TrimSpace(data)[0] == '{' ||
72+
bytes.TrimSpace(data)[0] == '['):
73+
contentType = "application/json"
6774
body = json.RawMessage(data)
75+
default:
76+
data = bytes.TrimSpace(data)
77+
data = bytes.Trim(data, "\"")
78+
body = string(data)
6879
}
6980

7081
b := ceBody{
@@ -73,11 +84,71 @@ func (ce *CloudEvent) Convert(data []byte) ([]byte, error) {
7384
Time: time.Now().Format(time.RFC3339),
7485
Source: ce.Source,
7586
Specversion: "1.0",
87+
Contenttype: contentType,
7688
Data: body,
7789
}
7890
return json.Marshal(b)
7991
}
8092

93+
func (ce *CloudEvent) Request(request []byte, headers http.Header) ([]byte, map[string]string, error) {
94+
var context map[string]string
95+
var body []byte
96+
var err error
97+
98+
switch headers.Get("Content-Type") {
99+
case "application/cloudevents+json":
100+
if body, context, err = parseStructuredCE(request); err != nil {
101+
return nil, nil, fmt.Errorf("structured CloudEvent parse error: %w", err)
102+
}
103+
case "application/json":
104+
body = request
105+
context = parseBinaryCE(headers)
106+
default:
107+
return request, nil, nil
108+
}
109+
110+
ceContext, err := json.Marshal(context)
111+
if err != nil {
112+
return nil, nil, fmt.Errorf("cannot encode request context: %w", err)
113+
}
114+
115+
runtimeContext := map[string]string{
116+
"Lambda-Runtime-Cloudevents-Context": string(ceContext),
117+
}
118+
119+
return body, runtimeContext, nil
120+
}
121+
122+
func parseStructuredCE(body []byte) ([]byte, map[string]string, error) {
123+
var event map[string]interface{}
124+
if err := json.Unmarshal(body, &event); err != nil {
125+
return nil, nil, fmt.Errorf("cannot unmarshal body: %w", err)
126+
}
127+
128+
data, err := json.Marshal(event["data"])
129+
if err != nil {
130+
return nil, nil, fmt.Errorf("cannot marshal body: %w", err)
131+
}
132+
133+
delete(event, "data")
134+
headers := make(map[string]string, len(event))
135+
for k, v := range event {
136+
headers[k] = fmt.Sprintf("%v", v)
137+
}
138+
139+
return data, headers, nil
140+
}
141+
142+
func parseBinaryCE(headers http.Header) map[string]string {
143+
h := make(map[string]string)
144+
for k, v := range headers {
145+
if strings.HasPrefix(k, "Ce-") {
146+
h[strings.ToLower(k[3:])] = strings.Join(v, ",")
147+
}
148+
}
149+
return h
150+
}
151+
81152
func (ce *CloudEvent) ContentType() string {
82153
return contentType
83154
}

pkg/converter/converter.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@ limitations under the License.
1717
package converter
1818

1919
import (
20+
"net/http"
21+
2022
"github.com/triggermesh/aws-custom-runtime/pkg/converter/cloudevents"
2123
"github.com/triggermesh/aws-custom-runtime/pkg/converter/plain"
2224
)
2325

2426
type Converter interface {
25-
Convert([]byte) ([]byte, error)
27+
Response([]byte) ([]byte, error)
28+
Request([]byte, http.Header) ([]byte, map[string]string, error)
2629
ContentType() string
2730
}
2831

pkg/converter/plain/cloudevents.go renamed to pkg/converter/plain/plain.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ limitations under the License.
1616

1717
package plain
1818

19+
import "net/http"
20+
1921
type Plain struct{}
2022

2123
const contentType = "plain/text"
@@ -24,10 +26,14 @@ func New() (*Plain, error) {
2426
return &Plain{}, nil
2527
}
2628

27-
func (p *Plain) Convert(data []byte) ([]byte, error) {
29+
func (p *Plain) Response(data []byte) ([]byte, error) {
2830
return data, nil
2931
}
3032

33+
func (p *Plain) Request(request []byte, headers http.Header) ([]byte, map[string]string, error) {
34+
return request, nil, nil
35+
}
36+
3137
func (p *Plain) ContentType() string {
3238
return contentType
3339
}

0 commit comments

Comments
 (0)