@@ -31,6 +31,7 @@ import (
3131 "github.com/kelseyhightower/envconfig"
3232
3333 "github.com/triggermesh/aws-custom-runtime/pkg/converter"
34+ "github.com/triggermesh/aws-custom-runtime/pkg/metrics"
3435 "github.com/triggermesh/aws-custom-runtime/pkg/sender"
3536)
3637
@@ -73,8 +74,9 @@ type Specification struct {
7374}
7475
7576type Handler struct {
76- Sender * sender.Sender
77- Converter converter.Converter
77+ sender * sender.Sender
78+ converter converter.Converter
79+ reporter * metrics.EventProcessingStatsReporter
7880
7981 requestSizeLimit int64
8082 functionTTL int64
@@ -102,28 +104,41 @@ func setupEnv(internalAPIport string) error {
102104}
103105
104106func (h * Handler ) serve (w http.ResponseWriter , r * http.Request ) {
107+ eventTypeTag , eventSrcTag := metrics .DefaultRequestType , metrics .DefaultRequestSource
108+ start := time .Now ()
109+ defer func () {
110+ h .reporter .ReportProcessingLatency (time .Since (start ), eventTypeTag , eventSrcTag )
111+ }()
112+
105113 requestSizeLimitInBytes := h .requestSizeLimit * 1e+6
106114 body , err := ioutil .ReadAll (http .MaxBytesReader (w , r .Body , requestSizeLimitInBytes ))
107115 if err != nil {
116+ h .reporter .ReportProcessingError (false , eventTypeTag , eventSrcTag )
108117 http .Error (w , err .Error (), http .StatusRequestEntityTooLarge )
109118 return
110119 }
111120 defer r .Body .Close ()
112121
113- req , context , err := h .Converter .Request (body , r .Header )
122+ req , context , err := h .converter .Request (body , r .Header )
114123 if err != nil {
124+ h .reporter .ReportProcessingError (false , eventTypeTag , eventSrcTag )
115125 http .Error (w , err .Error (), http .StatusInternalServerError )
116126 return
117127 }
118128
129+ eventTypeTag , eventSrcTag = metrics .CETagsFromContext (context )
130+
119131 result := enqueue (req , context , h .functionTTL * 1e+9 )
120- result .data , err = h .Converter .Response (result .data )
132+ result .data , err = h .converter .Response (result .data )
121133 if err != nil {
122134 result .data = []byte (fmt .Sprintf ("Response conversion error: %v" , err ))
123135 }
124- if err := h .Sender .Send (result .data , result .statusCode , w ); err != nil {
136+ if err := h .sender .Send (result .data , result .statusCode , w ); err != nil {
137+ h .reporter .ReportProcessingError (false , eventTypeTag , eventSrcTag )
125138 log .Printf ("! %s %s %v\n " , result .id , result .data , err )
139+ return
126140 }
141+ h .reporter .ReportProcessingSuccess (eventTypeTag , eventSrcTag )
127142}
128143
129144func enqueue (request []byte , context map [string ]string , ttl int64 ) message {
@@ -134,7 +149,7 @@ func enqueue(request []byte, context map[string]string, ttl int64) message {
134149 data : request ,
135150 context : context ,
136151 }
137- log .Printf ("<- %s %s \n " , task .id , task . data )
152+ log .Printf ("<- %s\n " , task .id )
138153
139154 resultsChannel := make (chan message )
140155 mutex .Lock ()
@@ -158,7 +173,7 @@ func enqueue(request []byte, context map[string]string, ttl int64) message {
158173 mutex .Lock ()
159174 delete (results , task .id )
160175 mutex .Unlock ()
161- log .Printf ("-> %s %d %s \n " , resp .id , resp .statusCode , resp . data )
176+ log .Printf ("-> %s %d\n " , resp .id , resp .statusCode )
162177 return resp
163178}
164179
@@ -283,10 +298,17 @@ func main() {
283298 log .Fatalf ("Cannot create converter: %v" , err )
284299 }
285300
301+ // start metrics reporter
302+ mr , err := metrics .StatsExporter ()
303+ if err != nil {
304+ log .Fatalf ("Cannot start stats exporter: %v" , err )
305+ }
306+
286307 // setup sender
287308 handler := Handler {
288- Sender : sender .New (spec .Sink , conv .ContentType ()),
289- Converter : conv ,
309+ sender : sender .New (spec .Sink , conv .ContentType ()),
310+ converter : conv ,
311+ reporter : mr ,
290312 requestSizeLimit : spec .RequestSizeLimit ,
291313 functionTTL : spec .FunctionTTL ,
292314 }
0 commit comments