Skip to content

Commit

Permalink
Add namespace in function name for metrics
Browse files Browse the repository at this point in the history
This commit adds namespace in function names while logging metrics to
prometheus, irrespective of the function is invoked with namespace suffix
or not.

This is also required to add multiple namespace support to faas-idler

openfaasltd/faas-idler#37 which is part
of openfaas/faas-netes#511

Signed-off-by: Vivek Singh <[email protected]>
  • Loading branch information
viveksyngh authored and alexellis committed Nov 2, 2020
1 parent be80904 commit f7b02b4
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 43 deletions.
13 changes: 9 additions & 4 deletions gateway/handlers/notifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,21 @@ func urlToLabel(path string) string {
// PrometheusFunctionNotifier records metrics to Prometheus
type PrometheusFunctionNotifier struct {
Metrics *metrics.MetricOptions
//FunctionNamespace default namespace of the function
FunctionNamespace string
}

// Notify records metrics in Prometheus
func (p PrometheusFunctionNotifier) Notify(method string, URL string, originalURL string, statusCode int, event string, duration time.Duration) {
if event == "completed" {
serviceName := getServiceName(originalURL)
if len(p.FunctionNamespace) > 0 {
if index := strings.Index(serviceName, "."); index == -1 {
serviceName = fmt.Sprintf("%s.%s", serviceName, p.FunctionNamespace)
}
}

if event == "completed" {
seconds := duration.Seconds()
serviceName := getServiceName(originalURL)

p.Metrics.GatewayFunctionsHistogram.
WithLabelValues(serviceName).
Observe(seconds)
Expand All @@ -62,7 +68,6 @@ func (p PrometheusFunctionNotifier) Notify(method string, URL string, originalUR
With(prometheus.Labels{"function_name": serviceName, "code": code}).
Inc()
} else if event == "started" {
serviceName := getServiceName(originalURL)
p.Metrics.GatewayFunctionInvocationStarted.WithLabelValues(serviceName).Inc()
}

Expand Down
3 changes: 2 additions & 1 deletion gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func main() {
loggingNotifier := handlers.LoggingNotifier{}

prometheusNotifier := handlers.PrometheusFunctionNotifier{
Metrics: &metricsOptions,
Metrics: &metricsOptions,
FunctionNamespace: config.Namespace,
}

prometheusServiceNotifier := handlers.PrometheusServiceNotifier{
Expand Down
146 changes: 108 additions & 38 deletions gateway/metrics/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ package metrics

import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"path"
"time"

"log"
Expand Down Expand Up @@ -56,8 +58,14 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {

e.metricOptions.ServiceReplicasGauge.Reset()
for _, service := range e.services {
var serviceName string
if len(service.Namespace) > 0 {
serviceName = fmt.Sprintf("%s.%s", service.Name, service.Namespace)
} else {
serviceName = service.Name
}
e.metricOptions.ServiceReplicasGauge.
WithLabelValues(service.Name).
WithLabelValues(serviceName).
Set(float64(service.Replicas))
}

Expand All @@ -72,9 +80,46 @@ func (e *Exporter) StartServiceWatcher(endpointURL url.URL, metricsOptions Metri
ticker := time.NewTicker(interval)
quit := make(chan struct{})

timeout := 3 * time.Second
go func() {
for {
select {
case <-ticker.C:

namespaces, err := e.getNamespaces(endpointURL)
if err != nil {
log.Println(err)
}

if len(namespaces) == 0 {
emptyNamespace := ""
services, err := e.getFunctions(endpointURL, emptyNamespace)
if err != nil {
log.Println(err)
continue
}
e.services = services
} else {
for _, namespace := range namespaces {
services, err := e.getFunctions(endpointURL, namespace)
if err != nil {
log.Println(err)
continue
}
e.services = append(e.services, services...)
}
}

break
case <-quit:
return
}
}
}()
}

proxyClient := http.Client{
func (e *Exporter) getHTTPClient(timeout time.Duration) http.Client {

return http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Expand All @@ -87,45 +132,70 @@ func (e *Exporter) StartServiceWatcher(endpointURL url.URL, metricsOptions Metri
ExpectContinueTimeout: 1500 * time.Millisecond,
},
}
}

go func() {
for {
select {
case <-ticker.C:
func (e *Exporter) getFunctions(endpointURL url.URL, namespace string) ([]types.FunctionStatus, error) {
timeout := 3 * time.Second
proxyClient := e.getHTTPClient(timeout)

get, err := http.NewRequest(http.MethodGet, endpointURL.String()+"system/functions", nil)
if err != nil {
log.Println(err)
return
}
endpointURL.Path = path.Join(endpointURL.Path, "/system/functions")
if len(namespace) > 0 {
q := endpointURL.Query()
q.Set("namespace", namespace)
endpointURL.RawQuery = q.Encode()
}

if e.credentials != nil {
get.SetBasicAuth(e.credentials.User, e.credentials.Password)
}
get, _ := http.NewRequest(http.MethodGet, endpointURL.String(), nil)
if e.credentials != nil {
get.SetBasicAuth(e.credentials.User, e.credentials.Password)
}

services := []types.FunctionStatus{}
res, err := proxyClient.Do(get)
if err != nil {
log.Println(err)
continue
}
bytesOut, readErr := ioutil.ReadAll(res.Body)
if readErr != nil {
log.Println(err)
continue
}
unmarshalErr := json.Unmarshal(bytesOut, &services)
if unmarshalErr != nil {
log.Println(err)
continue
}
services := []types.FunctionStatus{}
res, err := proxyClient.Do(get)
if err != nil {
return services, err
}

e.services = services
bytesOut, readErr := ioutil.ReadAll(res.Body)
if readErr != nil {
return services, readErr
}

break
case <-quit:
return
}
}
}()
unmarshalErr := json.Unmarshal(bytesOut, &services)
if unmarshalErr != nil {
return services, unmarshalErr
}
return services, nil
}

func (e *Exporter) getNamespaces(endpointURL url.URL) ([]string, error) {
namespaces := []string{}

get, _ := http.NewRequest(http.MethodGet, endpointURL.String()+"system/namespaces", nil)
if e.credentials != nil {
get.SetBasicAuth(e.credentials.User, e.credentials.Password)
}

timeout := 3 * time.Second
proxyClient := e.getHTTPClient(timeout)

res, err := proxyClient.Do(get)
if err != nil {
return namespaces, err
}

if res.StatusCode == http.StatusNotFound {
return namespaces, nil
}

bytesOut, readErr := ioutil.ReadAll(res.Body)
if readErr != nil {
return namespaces, readErr
}

unmarshalErr := json.Unmarshal(bytesOut, &namespaces)
if unmarshalErr != nil {
return namespaces, unmarshalErr
}
return namespaces, nil
}

0 comments on commit f7b02b4

Please sign in to comment.