diff --git a/gateway/build.sh b/gateway/build.sh index 4f0712321..83924acdc 100755 --- a/gateway/build.sh +++ b/gateway/build.sh @@ -42,4 +42,4 @@ docker build --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_p --build-arg VERSION="${VERSION:-dev}" \ --build-arg GOARM="${GOARM}" \ --build-arg ARCH="${arch}" \ - -t $NS/gateway:$eTAG . -f $dockerfile --no-cache + -t $NS/gateway:$eTAG . -f $dockerfile diff --git a/gateway/handlers/alerthandler.go b/gateway/handlers/alerthandler.go index 11d081160..98cff7433 100644 --- a/gateway/handlers/alerthandler.go +++ b/gateway/handlers/alerthandler.go @@ -16,7 +16,7 @@ import ( ) // MakeAlertHandler handles alerts from Prometheus Alertmanager -func MakeAlertHandler(service scaling.ServiceQuery) http.HandlerFunc { +func MakeAlertHandler(service scaling.ServiceQuery, defaultNamespace string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { log.Println("Alert received.") @@ -42,7 +42,7 @@ func MakeAlertHandler(service scaling.ServiceQuery) http.HandlerFunc { return } - errors := handleAlerts(&req, service) + errors := handleAlerts(&req, service, defaultNamespace) if len(errors) > 0 { log.Println(errors) var errorOutput string @@ -58,10 +58,10 @@ func MakeAlertHandler(service scaling.ServiceQuery) http.HandlerFunc { } } -func handleAlerts(req *requests.PrometheusAlert, service scaling.ServiceQuery) []error { +func handleAlerts(req *requests.PrometheusAlert, service scaling.ServiceQuery, defaultNamespace string) []error { var errors []error for _, alert := range req.Alerts { - if err := scaleService(alert, service); err != nil { + if err := scaleService(alert, service, defaultNamespace); err != nil { log.Println(err) errors = append(errors, err) } @@ -70,12 +70,13 @@ func handleAlerts(req *requests.PrometheusAlert, service scaling.ServiceQuery) [ return errors } -func scaleService(alert requests.PrometheusInnerAlert, service scaling.ServiceQuery) error { +func scaleService(alert requests.PrometheusInnerAlert, service scaling.ServiceQuery, defaultNamespace string) error { var err error - serviceName := alert.Labels.FunctionName + + serviceName, namespace := getNamespace(defaultNamespace, alert.Labels.FunctionName) if len(serviceName) > 0 { - queryResponse, getErr := service.GetReplicas(serviceName) + queryResponse, getErr := service.GetReplicas(serviceName, namespace) if getErr == nil { status := alert.Status @@ -86,7 +87,7 @@ func scaleService(alert requests.PrometheusInnerAlert, service scaling.ServiceQu return nil } - updateErr := service.SetReplicas(serviceName, newReplicas) + updateErr := service.SetReplicas(serviceName, namespace, newReplicas) if updateErr != nil { err = updateErr } diff --git a/gateway/handlers/baseurlresolver_test.go b/gateway/handlers/baseurlresolver_test.go index db32015d5..318cba374 100644 --- a/gateway/handlers/baseurlresolver_test.go +++ b/gateway/handlers/baseurlresolver_test.go @@ -5,8 +5,10 @@ package handlers import ( "fmt" + "log" "net/http" "net/url" + "strings" "testing" ) @@ -27,6 +29,28 @@ func TestSingleHostBaseURLResolver(t *testing.T) { const watchdogPort = 8080 +func TestFunctionAsHostBaseURLResolver_WithNamespaceOverride(t *testing.T) { + + suffix := "openfaas-fn.local.cluster.svc." + namespace := "openfaas-fn" + newNS := "production-fn" + + r := FunctionAsHostBaseURLResolver{FunctionSuffix: suffix, FunctionNamespace: namespace} + + req, _ := http.NewRequest(http.MethodGet, "http://localhost/function/hello."+newNS, nil) + + resolved := r.Resolve(req) + + newSuffix := strings.Replace(suffix, namespace, newNS, -1) + + want := fmt.Sprintf("http://hello.%s:%d", newSuffix, watchdogPort) + log.Println(want) + if resolved != want { + t.Logf("r.Resolve failed, want: %s got: %s", want, resolved) + t.Fail() + } +} + func TestFunctionAsHostBaseURLResolver_WithSuffix(t *testing.T) { suffix := "openfaas-fn.local.cluster.svc." r := FunctionAsHostBaseURLResolver{FunctionSuffix: suffix} @@ -35,7 +59,7 @@ func TestFunctionAsHostBaseURLResolver_WithSuffix(t *testing.T) { resolved := r.Resolve(req) want := fmt.Sprintf("http://hello.%s:%d", suffix, watchdogPort) - + log.Println(want) if resolved != want { t.Logf("r.Resolve failed, want: %s got: %s", want, resolved) t.Fail() diff --git a/gateway/handlers/forwarding_proxy.go b/gateway/handlers/forwarding_proxy.go index b826e1f71..f270f59ec 100644 --- a/gateway/handlers/forwarding_proxy.go +++ b/gateway/handlers/forwarding_proxy.go @@ -179,7 +179,8 @@ func (s SingleHostBaseURLResolver) Resolve(r *http.Request) string { // FunctionAsHostBaseURLResolver resolves URLs using a function from the URL as a host type FunctionAsHostBaseURLResolver struct { - FunctionSuffix string + FunctionSuffix string + FunctionNamespace string } // Resolve the base URL for a request @@ -188,8 +189,13 @@ func (f FunctionAsHostBaseURLResolver) Resolve(r *http.Request) string { const watchdogPort = 8080 var suffix string + if len(f.FunctionSuffix) > 0 { - suffix = "." + f.FunctionSuffix + if index := strings.LastIndex(svcName, "."); index > -1 && len(svcName) > index+1 { + suffix = strings.Replace(f.FunctionSuffix, f.FunctionNamespace, "", -1) + } else { + suffix = "." + f.FunctionSuffix + } } return fmt.Sprintf("http://%s%s:%d", svcName, suffix, watchdogPort) diff --git a/gateway/handlers/forwarding_proxy_test.go b/gateway/handlers/forwarding_proxy_test.go index ed0f15b10..237068d04 100644 --- a/gateway/handlers/forwarding_proxy_test.go +++ b/gateway/handlers/forwarding_proxy_test.go @@ -135,6 +135,11 @@ func Test_getServiceName(t *testing.T) { url: "/function/testFunc", serviceName: "testFunc", }, + { + name: "includes namespace", + url: "/function/test1.fn", + serviceName: "test1.fn", + }, { name: "can handle request with trailing slash", url: "/function/testFunc/", diff --git a/gateway/handlers/function_prefix_trimming_url_path_transformer_test.go b/gateway/handlers/function_prefix_trimming_url_path_transformer_test.go index 9ecc358f9..f344ba99e 100644 --- a/gateway/handlers/function_prefix_trimming_url_path_transformer_test.go +++ b/gateway/handlers/function_prefix_trimming_url_path_transformer_test.go @@ -32,6 +32,30 @@ func Test_Transform_RemovesFunctionPrefixWithSingleParam(t *testing.T) { } } +func Test_Transform_RemovesFunctionPrefixWithDotInName(t *testing.T) { + + req, _ := http.NewRequest(http.MethodGet, "/function/figlet.fn", nil) + transformer := FunctionPrefixTrimmingURLPathTransformer{} + want := "" + got := transformer.Transform(req) + + if want != got { + t.Errorf("want: %s, got: %s", want, got) + } +} + +func Test_Transform_RemovesFunctionPrefixWithDotInNameAndPath(t *testing.T) { + + req, _ := http.NewRequest(http.MethodGet, "/function/figlet.fn/employees", nil) + transformer := FunctionPrefixTrimmingURLPathTransformer{} + want := "/employees" + got := transformer.Transform(req) + + if want != got { + t.Errorf("want: %s, got: %s", want, got) + } +} + func Test_Transform_RemovesFunctionPrefixWithParams(t *testing.T) { req, _ := http.NewRequest(http.MethodGet, "/function/figlet/employees/100", nil) diff --git a/gateway/handlers/namespaces_test.go b/gateway/handlers/namespaces_test.go new file mode 100644 index 000000000..56b3c4dd5 --- /dev/null +++ b/gateway/handlers/namespaces_test.go @@ -0,0 +1,45 @@ +// Copyright (c) Alex Ellis 2017. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +package handlers + +import "testing" + +func Test_getNamespace_Default(t *testing.T) { + root, ns := getNamespace("openfaas-fn", "figlet.openfaas-fn") + wantRoot := "figlet" + wantNs := "openfaas-fn" + + if root != wantRoot { + t.Errorf("function root: want %s, got %s", wantRoot, root) + } + if ns != wantNs { + t.Errorf("function ns: want %s, got %s", wantNs, ns) + } +} + +func Test_getNamespace_Override(t *testing.T) { + root, ns := getNamespace("fn", "figlet.fn") + wantRoot := "figlet" + wantNs := "fn" + + if root != wantRoot { + t.Errorf("function root: want %s, got %s", wantRoot, root) + } + if ns != wantNs { + t.Errorf("function ns: want %s, got %s", wantNs, ns) + } +} + +func Test_getNamespace_Empty(t *testing.T) { + root, ns := getNamespace("", "figlet") + wantRoot := "figlet" + wantNs := "" + + if root != wantRoot { + t.Errorf("function root: want %s, got %s", wantRoot, root) + } + if ns != wantNs { + t.Errorf("function ns: want %s, got %s", wantNs, ns) + } +} diff --git a/gateway/handlers/notifiers.go b/gateway/handlers/notifiers.go index 927cacf75..1154b75bb 100644 --- a/gateway/handlers/notifiers.go +++ b/gateway/handlers/notifiers.go @@ -66,7 +66,7 @@ func getServiceName(urlValue string) string { forward := "/function/" if strings.HasPrefix(urlValue, forward) { // With a path like `/function/xyz/rest/of/path?q=a`, the service - // name we wish to locate is just the `xyz` portion. With a postive + // name we wish to locate is just the `xyz` portion. With a positive // match on the regex below, it will return a three-element slice. // The item at index `0` is the same as `urlValue`, at `1` // will be the service name we need, and at `2` the rest of the path. diff --git a/gateway/handlers/scaling.go b/gateway/handlers/scaling.go index a7d293d86..0484929d1 100644 --- a/gateway/handlers/scaling.go +++ b/gateway/handlers/scaling.go @@ -7,27 +7,36 @@ import ( "fmt" "log" "net/http" + "strings" "github.com/openfaas/faas/gateway/scaling" ) +func getNamespace(defaultNamespace, fullName string) (string, string) { + if index := strings.LastIndex(fullName, "."); index > -1 { + return fullName[:index], fullName[index+1:] + } + return fullName, defaultNamespace +} + // MakeScalingHandler creates handler which can scale a function from // zero to N replica(s). After scaling the next http.HandlerFunc will // be called. If the function is not ready after the configured // amount of attempts / queries then next will not be invoked and a status // will be returned to the client. -func MakeScalingHandler(next http.HandlerFunc, config scaling.ScalingConfig) http.HandlerFunc { +func MakeScalingHandler(next http.HandlerFunc, config scaling.ScalingConfig, defaultNamespace string) http.HandlerFunc { scaler := scaling.NewFunctionScaler(config) return func(w http.ResponseWriter, r *http.Request) { - functionName := getServiceName(r.URL.String()) - res := scaler.Scale(functionName) + functionName, namespace := getNamespace(defaultNamespace, getServiceName(r.URL.String())) + + res := scaler.Scale(functionName, namespace) if !res.Found { - errStr := fmt.Sprintf("error finding function %s: %s", functionName, res.Error.Error()) - log.Printf("Scaling: %s", errStr) + errStr := fmt.Sprintf("error finding function %s.%s: %s", functionName, namespace, res.Error.Error()) + log.Printf("Scaling: %s\n", errStr) w.WriteHeader(http.StatusNotFound) w.Write([]byte(errStr)) @@ -35,8 +44,8 @@ func MakeScalingHandler(next http.HandlerFunc, config scaling.ScalingConfig) htt } if res.Error != nil { - errStr := fmt.Sprintf("error finding function %s: %s", functionName, res.Error.Error()) - log.Printf("Scaling: %s", errStr) + errStr := fmt.Sprintf("error finding function %s.%s: %s", functionName, namespace, res.Error.Error()) + log.Printf("Scaling: %s\n", errStr) w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(errStr)) @@ -48,6 +57,6 @@ func MakeScalingHandler(next http.HandlerFunc, config scaling.ScalingConfig) htt return } - log.Printf("[Scale] function=%s 0=>N timed-out after %f seconds", functionName, res.Duration.Seconds()) + log.Printf("[Scale] function=%s.%s 0=>N timed-out after %f seconds\n", functionName, namespace, res.Duration.Seconds()) } } diff --git a/gateway/plugin/external.go b/gateway/plugin/external.go index bbfd6ac68..3e3340266 100644 --- a/gateway/plugin/external.go +++ b/gateway/plugin/external.go @@ -54,12 +54,13 @@ type ExternalServiceQuery struct { // ScaleServiceRequest request scaling of replica type ScaleServiceRequest struct { - ServiceName string `json:"serviceName"` - Replicas uint64 `json:"replicas"` + ServiceName string `json:"serviceName"` + ServiceNamespace string `json:"serviceNamespace"` + Replicas uint64 `json:"replicas"` } // GetReplicas replica count for function -func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQueryResponse, error) { +func (s ExternalServiceQuery) GetReplicas(serviceName, serviceNamespace string) (scaling.ServiceQueryResponse, error) { start := time.Now() var err error @@ -67,7 +68,7 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQu function := types.FunctionStatus{} - urlPath := fmt.Sprintf("%ssystem/function/%s", s.URL.String(), serviceName) + urlPath := fmt.Sprintf("%ssystem/function/%s?namespace=%s", s.URL.String(), serviceName, serviceNamespace) req, _ := http.NewRequest(http.MethodGet, urlPath, nil) @@ -91,8 +92,10 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQu if err != nil { log.Println(urlPath, err) } + log.Printf("GetReplicas [%s.%s] took: %fs", serviceName, serviceNamespace, time.Since(start).Seconds()) + } else { - log.Printf("GetReplicas took: %fs", time.Since(start).Seconds()) + log.Printf("GetReplicas [%s.%s] took: %fs, code: %d\n", serviceName, serviceNamespace, time.Since(start).Seconds(), res.StatusCode) return emptyServiceQueryResponse, fmt.Errorf("server returned non-200 status code (%d) for function, %s", res.StatusCode, serviceName) } } @@ -115,8 +118,7 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQu log.Printf("Bad Scaling Factor: %d, is not in range of [0 - 100]. Will fallback to %d", extractedScalingFactor, scalingFactor) } } - - log.Printf("GetReplicas took: %fs", time.Since(start).Seconds()) + log.Printf("GetReplicas [%s.%s] took: %fs", serviceName, serviceNamespace, time.Since(start).Seconds()) return scaling.ServiceQueryResponse{ Replicas: function.Replicas, @@ -128,12 +130,13 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQu } // SetReplicas update the replica count -func (s ExternalServiceQuery) SetReplicas(serviceName string, count uint64) error { +func (s ExternalServiceQuery) SetReplicas(serviceName, serviceNamespace string, count uint64) error { var err error scaleReq := ScaleServiceRequest{ - ServiceName: serviceName, - Replicas: count, + ServiceName: serviceName, + Replicas: count, + ServiceNamespace: serviceNamespace, } requestBody, err := json.Marshal(scaleReq) @@ -141,7 +144,8 @@ func (s ExternalServiceQuery) SetReplicas(serviceName string, count uint64) erro return err } - urlPath := fmt.Sprintf("%ssystem/scale-function/%s", s.URL.String(), serviceName) + start := time.Now() + urlPath := fmt.Sprintf("%ssystem/scale-function/%s?namespace=%s", s.URL.String(), serviceName, serviceNamespace) req, _ := http.NewRequest(http.MethodPost, urlPath, bytes.NewReader(requestBody)) if s.AuthInjector != nil { @@ -163,6 +167,8 @@ func (s ExternalServiceQuery) SetReplicas(serviceName string, count uint64) erro err = fmt.Errorf("error scaling HTTP code %d, %s", res.StatusCode, urlPath) } + log.Printf("SetReplicas [%s.%s] took: %fs", serviceName, serviceNamespace, time.Since(start).Seconds()) + return err } diff --git a/gateway/plugin/external_test.go b/gateway/plugin/external_test.go index d16e80b33..13b3410b8 100644 --- a/gateway/plugin/external_test.go +++ b/gateway/plugin/external_test.go @@ -52,7 +52,7 @@ func TestGetReplicasNonExistentFn(t *testing.T) { esq := NewExternalServiceQuery(*url, injector) - svcQryResp, err := esq.GetReplicas("burt") + svcQryResp, err := esq.GetReplicas("figlet", "") if err == nil { t.Logf("Error was nil, expected non-nil - the service query response value was %+v ", svcQryResp) @@ -82,7 +82,7 @@ func TestGetReplicasExistentFn(t *testing.T) { esq := NewExternalServiceQuery(*url, injector) - svcQryResp, err := esq.GetReplicas("burt") + svcQryResp, err := esq.GetReplicas("figlet", "") if err != nil { t.Logf("Expected err to be nil got: %s ", err.Error()) @@ -106,7 +106,7 @@ func TestSetReplicasNonExistentFn(t *testing.T) { url, _ := url.Parse(testServer.URL + "/") esq := NewExternalServiceQuery(*url, injector) - err := esq.SetReplicas("burt", 1) + err := esq.SetReplicas("figlet", "", 1) expectedErrStr := "error scaling HTTP code 500" @@ -129,7 +129,7 @@ func TestSetReplicasExistentFn(t *testing.T) { url, _ := url.Parse(testServer.URL + "/") esq := NewExternalServiceQuery(*url, injector) - err := esq.SetReplicas("burt", 1) + err := esq.SetReplicas("figlet", "", 1) if err != nil { t.Logf("Expected err to be nil got: %s ", err.Error()) diff --git a/gateway/scaling/function_cache.go b/gateway/scaling/function_cache.go index c3afa877f..1f6c9d343 100644 --- a/gateway/scaling/function_cache.go +++ b/gateway/scaling/function_cache.go @@ -29,22 +29,22 @@ type FunctionCache struct { } // Set replica count for functionName -func (fc *FunctionCache) Set(functionName string, serviceQueryResponse ServiceQueryResponse) { +func (fc *FunctionCache) Set(functionName, namespace string, serviceQueryResponse ServiceQueryResponse) { fc.Sync.Lock() defer fc.Sync.Unlock() - if _, exists := fc.Cache[functionName]; !exists { - fc.Cache[functionName] = &FunctionMeta{} + if _, exists := fc.Cache[functionName+"."+namespace]; !exists { + fc.Cache[functionName+"."+namespace] = &FunctionMeta{} } - fc.Cache[functionName].LastRefresh = time.Now() - fc.Cache[functionName].ServiceQueryResponse = serviceQueryResponse + fc.Cache[functionName+"."+namespace].LastRefresh = time.Now() + fc.Cache[functionName+"."+namespace].ServiceQueryResponse = serviceQueryResponse // entry.LastRefresh = time.Now() // entry.ServiceQueryResponse = serviceQueryResponse } // Get replica count for functionName -func (fc *FunctionCache) Get(functionName string) (ServiceQueryResponse, bool) { +func (fc *FunctionCache) Get(functionName, namespace string) (ServiceQueryResponse, bool) { replicas := ServiceQueryResponse{ AvailableReplicas: 0, } @@ -53,7 +53,7 @@ func (fc *FunctionCache) Get(functionName string) (ServiceQueryResponse, bool) { fc.Sync.RLock() defer fc.Sync.RUnlock() - if val, exists := fc.Cache[functionName]; exists { + if val, exists := fc.Cache[functionName+"."+namespace]; exists { replicas = val.ServiceQueryResponse hit = !val.Expired(fc.Expiry) } diff --git a/gateway/scaling/function_cache_test.go b/gateway/scaling/function_cache_test.go index f8b39ac0e..7b55c795a 100644 --- a/gateway/scaling/function_cache_test.go +++ b/gateway/scaling/function_cache_test.go @@ -12,7 +12,7 @@ func Test_LastRefreshSet(t *testing.T) { before := time.Now() fnName := "echo" - + namespace := "" cache := FunctionCache{ Cache: make(map[string]*FunctionMeta), Expiry: time.Millisecond * 1, @@ -23,14 +23,14 @@ func Test_LastRefreshSet(t *testing.T) { t.Fail() } - cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1}) + cache.Set(fnName, "", ServiceQueryResponse{AvailableReplicas: 1}) - if _, exists := cache.Cache[fnName]; !exists { + if _, exists := cache.Cache[fnName+"."+namespace]; !exists { t.Errorf("Expected entry to exist after setting %s", fnName) t.Fail() } - if cache.Cache[fnName].LastRefresh.Before(before) { + if cache.Cache[fnName+"."+namespace].LastRefresh.Before(before) { t.Errorf("Expected LastRefresh for function to have been after start of test") t.Fail() } @@ -38,16 +38,16 @@ func Test_LastRefreshSet(t *testing.T) { func Test_CacheExpiresIn1MS(t *testing.T) { fnName := "echo" - + namespace := "" cache := FunctionCache{ Cache: make(map[string]*FunctionMeta), Expiry: time.Millisecond * 1, } - cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1}) + cache.Set(fnName, namespace, ServiceQueryResponse{AvailableReplicas: 1}) time.Sleep(time.Millisecond * 2) - _, hit := cache.Get(fnName) + _, hit := cache.Get(fnName, namespace) wantHit := false @@ -58,15 +58,16 @@ func Test_CacheExpiresIn1MS(t *testing.T) { func Test_CacheGivesHitWithLongExpiry(t *testing.T) { fnName := "echo" + namespace := "" cache := FunctionCache{ Cache: make(map[string]*FunctionMeta), Expiry: time.Millisecond * 500, } - cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1}) + cache.Set(fnName, namespace, ServiceQueryResponse{AvailableReplicas: 1}) + _, hit := cache.Get(fnName, namespace) - _, hit := cache.Get(fnName) wantHit := true if hit != wantHit { @@ -76,16 +77,17 @@ func Test_CacheGivesHitWithLongExpiry(t *testing.T) { func Test_CacheFunctionExists(t *testing.T) { fnName := "echo" + namespace := "" cache := FunctionCache{ Cache: make(map[string]*FunctionMeta), Expiry: time.Millisecond * 10, } - cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1}) + cache.Set(fnName, namespace, ServiceQueryResponse{AvailableReplicas: 1}) time.Sleep(time.Millisecond * 2) - _, hit := cache.Get(fnName) + _, hit := cache.Get(fnName, namespace) wantHit := true @@ -93,19 +95,41 @@ func Test_CacheFunctionExists(t *testing.T) { t.Errorf("hit, want: %v, got %v", wantHit, hit) } } + +func Test_CacheFunctionExistsWithNamespace(t *testing.T) { + fnName := "echo" + namespace := "openfaas-fn" + + cache := FunctionCache{ + Cache: make(map[string]*FunctionMeta), + Expiry: time.Millisecond * 10, + } + + cache.Set(fnName, namespace, ServiceQueryResponse{AvailableReplicas: 1}) + + _, hit := cache.Get(fnName, namespace) + + wantHit := true + + if hit != wantHit { + t.Errorf("hit, want: %v, got %v", wantHit, hit) + } +} + func Test_CacheFunctionNotExist(t *testing.T) { fnName := "echo" testName := "burt" + namespace := "" cache := FunctionCache{ Cache: make(map[string]*FunctionMeta), Expiry: time.Millisecond * 10, } - cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1}) + cache.Set(fnName, namespace, ServiceQueryResponse{AvailableReplicas: 1}) time.Sleep(time.Millisecond * 2) - _, hit := cache.Get(testName) + _, hit := cache.Get(testName, namespace) wantHit := false diff --git a/gateway/scaling/function_scaler.go b/gateway/scaling/function_scaler.go index cadb68164..a9b7cc9d1 100644 --- a/gateway/scaling/function_scaler.go +++ b/gateway/scaling/function_scaler.go @@ -36,10 +36,10 @@ type FunctionScaleResult struct { // Scale scales a function from zero replicas to 1 or the value set in // the minimum replicas metadata -func (f *FunctionScaler) Scale(functionName string) FunctionScaleResult { +func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResult { start := time.Now() - if cachedResponse, hit := f.Cache.Get(functionName); hit && + if cachedResponse, hit := f.Cache.Get(functionName, namespace); hit && cachedResponse.AvailableReplicas > 0 { return FunctionScaleResult{ Error: nil, @@ -49,7 +49,7 @@ func (f *FunctionScaler) Scale(functionName string) FunctionScaleResult { } } - queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName) + queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName, namespace) if err != nil { return FunctionScaleResult{ @@ -60,7 +60,7 @@ func (f *FunctionScaler) Scale(functionName string) FunctionScaleResult { } } - f.Cache.Set(functionName, queryResponse) + f.Cache.Set(functionName, namespace, queryResponse) if queryResponse.AvailableReplicas == 0 { minReplicas := uint64(1) @@ -69,19 +69,19 @@ func (f *FunctionScaler) Scale(functionName string) FunctionScaleResult { } scaleResult := backoff(func(attempt int) error { - queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName) + queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName, namespace) if err != nil { return err } - f.Cache.Set(functionName, queryResponse) + f.Cache.Set(functionName, namespace, queryResponse) if queryResponse.Replicas > 0 { return nil } log.Printf("[Scale %d] function=%s 0 => %d requested", attempt, functionName, minReplicas) - setScaleErr := f.Config.ServiceQuery.SetReplicas(functionName, minReplicas) + setScaleErr := f.Config.ServiceQuery.SetReplicas(functionName, namespace, minReplicas) if setScaleErr != nil { return fmt.Errorf("unable to scale function [%s], err: %s", functionName, setScaleErr) } @@ -100,9 +100,9 @@ func (f *FunctionScaler) Scale(functionName string) FunctionScaleResult { } for i := 0; i < int(f.Config.MaxPollCount); i++ { - queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName) + queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName, namespace) if err == nil { - f.Cache.Set(functionName, queryResponse) + f.Cache.Set(functionName, namespace, queryResponse) } totalTime := time.Since(start) diff --git a/gateway/scaling/range.go b/gateway/scaling/ranges.go similarity index 100% rename from gateway/scaling/range.go rename to gateway/scaling/ranges.go diff --git a/gateway/scaling/service_query.go b/gateway/scaling/service_query.go index 84880a412..e33c086b7 100644 --- a/gateway/scaling/service_query.go +++ b/gateway/scaling/service_query.go @@ -5,8 +5,8 @@ package scaling // ServiceQuery provides interface for replica querying/setting type ServiceQuery interface { - GetReplicas(service string) (response ServiceQueryResponse, err error) - SetReplicas(service string, count uint64) error + GetReplicas(service, namespace string) (response ServiceQueryResponse, err error) + SetReplicas(service, namespace string, count uint64) error } // ServiceQueryResponse response from querying a function status diff --git a/gateway/server.go b/gateway/server.go index b7814e95b..ba5299a4c 100644 --- a/gateway/server.go +++ b/gateway/server.go @@ -26,7 +26,11 @@ func main() { osEnv := types.OsEnv{} readConfig := types.ReadConfig{} - config := readConfig.Read(osEnv) + config, configErr := readConfig.Read(osEnv) + + if configErr != nil { + log.Fatalln(configErr) + } log.Printf("HTTP Read Timeout: %s", config.ReadTimeout) log.Printf("HTTP Write Timeout: %s", config.WriteTimeout) @@ -85,7 +89,10 @@ func main() { nilURLTransformer := handlers.TransparentURLPathTransformer{} if config.DirectFunctions { - functionURLResolver = handlers.FunctionAsHostBaseURLResolver{FunctionSuffix: config.DirectFunctionsSuffix} + functionURLResolver = handlers.FunctionAsHostBaseURLResolver{ + FunctionSuffix: config.DirectFunctionsSuffix, + FunctionNamespace: config.Namespace, + } functionURLTransformer = handlers.FunctionPrefixTrimmingURLPathTransformer{} } else { functionURLResolver = urlResolver @@ -110,9 +117,9 @@ func main() { faasHandlers.InfoHandler = handlers.MakeInfoHandler(handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)) faasHandlers.SecretHandler = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector) - alertHandler := plugin.NewExternalServiceQuery(*config.FunctionsProviderURL, serviceAuthInjector) + externalServiceQuery := plugin.NewExternalServiceQuery(*config.FunctionsProviderURL, serviceAuthInjector) faasHandlers.Alert = handlers.MakeNotifierWrapper( - handlers.MakeAlertHandler(alertHandler), + handlers.MakeAlertHandler(externalServiceQuery, config.Namespace), forwardingNotifiers, ) @@ -183,10 +190,10 @@ func main() { SetScaleRetries: uint(20), FunctionPollInterval: time.Millisecond * 50, CacheExpiry: time.Second * 5, // freshness of replica values before going stale - ServiceQuery: alertHandler, + ServiceQuery: externalServiceQuery, } - functionProxy = handlers.MakeScalingHandler(faasHandlers.Proxy, scalingConfig) + functionProxy = handlers.MakeScalingHandler(faasHandlers.Proxy, scalingConfig, config.Namespace) } r.HandleFunc("/function/{name:["+NameExpression+"]+}", functionProxy) diff --git a/gateway/types/readconfig.go b/gateway/types/readconfig.go index 89ffa7abf..dfb08ed9b 100644 --- a/gateway/types/readconfig.go +++ b/gateway/types/readconfig.go @@ -4,10 +4,11 @@ package types import ( - "log" + "fmt" "net/url" "os" "strconv" + "strings" "time" ) @@ -52,7 +53,7 @@ func parseIntOrDurationValue(val string, fallback time.Duration) time.Duration { } // Read fetches gateway server configuration from environmental variables -func (ReadConfig) Read(hasEnv HasEnv) GatewayConfig { +func (ReadConfig) Read(hasEnv HasEnv) (*GatewayConfig, error) { cfg := GatewayConfig{ PrometheusHost: "prometheus", PrometheusPort: 9090, @@ -68,7 +69,7 @@ func (ReadConfig) Read(hasEnv HasEnv) GatewayConfig { var err error cfg.FunctionsProviderURL, err = url.Parse(hasEnv.Getenv("functions_provider_url")) if err != nil { - log.Fatal("If functions_provider_url is provided, then it should be a valid URL.", err) + return nil, fmt.Errorf("if functions_provider_url is provided, then it should be a valid URL, error: %s", err) } } @@ -76,7 +77,7 @@ func (ReadConfig) Read(hasEnv HasEnv) GatewayConfig { var err error cfg.LogsProviderURL, err = url.Parse(hasEnv.Getenv("logs_provider_url")) if err != nil { - log.Fatal("If logs_provider_url is provided, then it should be a valid URL.", err) + return nil, fmt.Errorf("if logs_provider_url is provided, then it should be a valid URL, error: %s", err) } } else if cfg.FunctionsProviderURL != nil { cfg.LogsProviderURL, _ = url.Parse(cfg.FunctionsProviderURL.String()) @@ -93,7 +94,7 @@ func (ReadConfig) Read(hasEnv HasEnv) GatewayConfig { if err == nil { cfg.NATSPort = &port } else { - log.Println("faas_nats_port invalid number: " + faasNATSPort) + return nil, fmt.Errorf("faas_nats_port invalid number: %s", faasNATSPort) } } @@ -101,10 +102,10 @@ func (ReadConfig) Read(hasEnv HasEnv) GatewayConfig { if len(prometheusPort) > 0 { prometheusPortVal, err := strconv.Atoi(prometheusPort) if err != nil { - log.Println("Invalid port for faas_prometheus_port") - } else { - cfg.PrometheusPort = prometheusPortVal + return nil, fmt.Errorf("faas_prometheus_port invalid number: %s", faasNATSPort) } + cfg.PrometheusPort = prometheusPortVal + } prometheusHost := hasEnv.Getenv("faas_prometheus_host") @@ -131,26 +132,34 @@ func (ReadConfig) Read(hasEnv HasEnv) GatewayConfig { if len(maxIdleConns) > 0 { val, err := strconv.Atoi(maxIdleConns) if err != nil { - log.Println("Invalid value for max_idle_conns") - } else { - cfg.MaxIdleConns = val + return nil, fmt.Errorf("invalid value for max_idle_conns: %s", maxIdleConns) } + cfg.MaxIdleConns = val + } maxIdleConnsPerHost := hasEnv.Getenv("max_idle_conns_per_host") if len(maxIdleConnsPerHost) > 0 { val, err := strconv.Atoi(maxIdleConnsPerHost) if err != nil { - log.Println("Invalid value for max_idle_conns_per_host") - } else { - cfg.MaxIdleConnsPerHost = val + return nil, fmt.Errorf("invalid value for max_idle_conns_per_host: %s", maxIdleConnsPerHost) } + cfg.MaxIdleConnsPerHost = val + } cfg.AuthProxyURL = hasEnv.Getenv("auth_proxy_url") cfg.AuthProxyPassBody = parseBoolValue(hasEnv.Getenv("auth_proxy_pass_body")) - return cfg + cfg.Namespace = hasEnv.Getenv("function_namespace") + + if len(cfg.DirectFunctionsSuffix) > 0 && len(cfg.Namespace) > 0 { + if strings.HasPrefix(cfg.DirectFunctionsSuffix, cfg.Namespace) == false { + return nil, fmt.Errorf("function_namespace must be a sub-string of direct_functions_suffix") + } + } + + return &cfg, nil } // GatewayConfig provides config for the API Gateway server process @@ -209,6 +218,9 @@ type GatewayConfig struct { // AuthProxyPassBody pass body to validation proxy AuthProxyPassBody bool + + // Namespace for endpoints + Namespace string } // UseNATS Use NATSor not diff --git a/gateway/types/readconfig_test.go b/gateway/types/readconfig_test.go index 13b6ed921..bfdda03a2 100644 --- a/gateway/types/readconfig_test.go +++ b/gateway/types/readconfig_test.go @@ -31,7 +31,7 @@ func TestRead_UseExternalProvider_Defaults(t *testing.T) { defaults := NewEnvBucket() readConfig := ReadConfig{} - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if config.UseExternalProvider() != false { t.Log("Default for UseExternalProvider should be false") @@ -44,11 +44,79 @@ func TestRead_UseExternalProvider_Defaults(t *testing.T) { } if len(config.DirectFunctionsSuffix) > 0 { - t.Log("Default for DirectFunctionsSuffix should be empty as a default") + t.Log("Default for DirectFunctionsSuffix should be empty") + t.Fail() + } + + if len(config.Namespace) > 0 { + t.Log("Default for Namespace should be empty") + t.Fail() + } +} + +func TestRead_NamespaceOverride(t *testing.T) { + + defaults := NewEnvBucket() + readConfig := ReadConfig{} + + defaults.Setenv("function_namespace", "fn") + wantSuffix := "fn" + + config, _ := readConfig.Read(defaults) + + if config.Namespace != wantSuffix { + t.Logf("Namespace want: %s, got: %s", wantSuffix, config.Namespace) t.Fail() } } +func TestRead_NamespaceOverrideAgressWithFunctionSuffix_Valid(t *testing.T) { + + defaults := NewEnvBucket() + readConfig := ReadConfig{} + + defaults.Setenv("direct_functions", "true") + wantSuffix := "openfaas-fn.cluster.local.svc." + + defaults.Setenv("direct_functions_suffix", wantSuffix) + defaults.Setenv("function_namespace", "openfaas-fn") + + _, err := readConfig.Read(defaults) + + if err != nil { + t.Logf("Error found: %s", err) + t.Fail() + } +} + +func TestRead_NamespaceOverrideAgressWithFunctionSuffix_Invalid(t *testing.T) { + + defaults := NewEnvBucket() + readConfig := ReadConfig{} + + defaults.Setenv("direct_functions", "true") + wantSuffix := "openfaas-fn.cluster.local.svc." + + defaults.Setenv("direct_functions_suffix", wantSuffix) + defaults.Setenv("function_namespace", "fn") + + _, err := readConfig.Read(defaults) + + if err == nil { + t.Logf("Expected an error because function_namespace should be a sub-string of direct_functions_suffix") + t.Fail() + return + } + + want := "function_namespace must be a sub-string of direct_functions_suffix" + + if want != err.Error() { + t.Logf("Error want: %s, got: %s", want, err.Error()) + t.Fail() + } + +} + func TestRead_DirectFunctionsOverride(t *testing.T) { defaults := NewEnvBucket() readConfig := ReadConfig{} @@ -56,7 +124,7 @@ func TestRead_DirectFunctionsOverride(t *testing.T) { wantSuffix := "openfaas-fn.cluster.local.svc." defaults.Setenv("direct_functions_suffix", wantSuffix) - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if config.DirectFunctions != true { t.Logf("DirectFunctions should be true, got: %v", config.DirectFunctions) @@ -73,7 +141,7 @@ func TestRead_ScaleZeroDefaultAndOverride(t *testing.T) { defaults := NewEnvBucket() readConfig := ReadConfig{} // defaults.Setenv("scale_from_zero", "true") - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) want := false if config.ScaleFromZero != want { @@ -82,7 +150,7 @@ func TestRead_ScaleZeroDefaultAndOverride(t *testing.T) { } defaults.Setenv("scale_from_zero", "true") - config = readConfig.Read(defaults) + config, _ = readConfig.Read(defaults) want = true if config.ScaleFromZero != want { @@ -96,7 +164,7 @@ func TestRead_EmptyTimeoutConfig(t *testing.T) { defaults := NewEnvBucket() readConfig := ReadConfig{} - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if (config.ReadTimeout) != time.Duration(8)*time.Second { t.Log("ReadTimeout incorrect") @@ -114,7 +182,7 @@ func TestRead_ReadAndWriteTimeoutConfig(t *testing.T) { defaults.Setenv("write_timeout", "60") readConfig := ReadConfig{} - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if (config.ReadTimeout) != time.Duration(10)*time.Second { t.Logf("ReadTimeout incorrect, got: %d\n", config.ReadTimeout) @@ -132,7 +200,7 @@ func TestRead_ReadAndWriteTimeoutDurationConfig(t *testing.T) { defaults.Setenv("write_timeout", "1m30s") readConfig := ReadConfig{} - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if (config.ReadTimeout) != time.Duration(20)*time.Second { t.Logf("ReadTimeout incorrect, got: %d\n", config.ReadTimeout) @@ -148,7 +216,7 @@ func TestRead_UseNATSDefaultsToOff(t *testing.T) { defaults := NewEnvBucket() readConfig := ReadConfig{} - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if config.UseNATS() == true { t.Log("NATS is supposed to be off by default") @@ -162,7 +230,7 @@ func TestRead_UseNATS(t *testing.T) { defaults.Setenv("faas_nats_port", "6222") readConfig := ReadConfig{} - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if config.UseNATS() == false { t.Log("NATS was requested in config, but not enabled.") @@ -177,12 +245,17 @@ func TestRead_UseNATSBadPort(t *testing.T) { defaults.Setenv("faas_nats_port", "6fff") readConfig := ReadConfig{} - config := readConfig.Read(defaults) + _, err := readConfig.Read(defaults) - if config.UseNATS() == true { - t.Log("NATS had bad config, should not be enabled.") - t.Fail() + if err != nil { + want := "faas_nats_port invalid number: 6fff" + + if want != err.Error() { + t.Errorf("want error: %q, got: %q", want, err.Error()) + t.Fail() + } } + } func TestRead_PrometheusNonDefaults(t *testing.T) { @@ -191,7 +264,7 @@ func TestRead_PrometheusNonDefaults(t *testing.T) { defaults.Setenv("faas_prometheus_port", "9999") readConfig := ReadConfig{} - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if config.PrometheusHost != "prom1" { t.Logf("config.PrometheusHost, want: %s, got: %s\n", "prom1", config.PrometheusHost) @@ -209,7 +282,7 @@ func TestRead_PrometheusDefaults(t *testing.T) { readConfig := ReadConfig{} - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if config.PrometheusHost != "prometheus" { t.Logf("config.PrometheusHost, want: %s, got: %s\n", "prometheus", config.PrometheusHost) @@ -227,7 +300,7 @@ func TestRead_BasicAuthDefaults(t *testing.T) { readConfig := ReadConfig{} - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if config.UseBasicAuth != false { t.Logf("config.UseBasicAuth, want: %t, got: %t\n", false, config.UseBasicAuth) @@ -248,7 +321,7 @@ func TestRead_BasicAuth_SetTrue(t *testing.T) { readConfig := ReadConfig{} - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if config.UseBasicAuth != true { t.Logf("config.UseBasicAuth, want: %t, got: %t\n", true, config.UseBasicAuth) @@ -267,7 +340,7 @@ func TestRead_MaxIdleConnsDefaults(t *testing.T) { readConfig := ReadConfig{} - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if config.MaxIdleConns != 1024 { t.Logf("config.MaxIdleConns, want: %d, got: %d\n", 1024, config.MaxIdleConns) @@ -287,7 +360,7 @@ func TestRead_MaxIdleConns_Override(t *testing.T) { defaults.Setenv("max_idle_conns", fmt.Sprintf("%d", 100)) defaults.Setenv("max_idle_conns_per_host", fmt.Sprintf("%d", 2)) - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if config.MaxIdleConns != 100 { t.Logf("config.MaxIdleConns, want: %d, got: %d\n", 100, config.MaxIdleConns) @@ -307,7 +380,7 @@ func TestRead_AuthProxy_Defaults(t *testing.T) { wantURL := "" wantBody := false - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if config.AuthProxyPassBody != wantBody { t.Logf("config.AuthProxyPassBody, want: %t, got: %t\n", wantBody, config.AuthProxyPassBody) @@ -329,7 +402,7 @@ func TestRead_AuthProxy_DefaultsOverrides(t *testing.T) { defaults.Setenv("auth_proxy_url", wantURL) defaults.Setenv("auth_proxy_pass_body", fmt.Sprintf("%t", wantBody)) - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if config.AuthProxyPassBody != wantBody { t.Logf("config.AuthProxyPassBody, want: %t, got: %t\n", wantBody, config.AuthProxyPassBody) @@ -347,7 +420,7 @@ func TestRead_LogsProviderURL(t *testing.T) { t.Run("default value is nil when functions_provider_url is empty", func(t *testing.T) { readConfig := ReadConfig{} - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if config.LogsProviderURL != nil { t.Fatalf("config.LogsProviderURL, want: %s, got: %s\n", "", config.LogsProviderURL) } @@ -358,7 +431,7 @@ func TestRead_LogsProviderURL(t *testing.T) { defaults.Setenv("functions_provider_url", expected) readConfig := ReadConfig{} - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if config.LogsProviderURL.String() != expected { t.Fatalf("config.LogsProviderURL, want: %s, got: %s\n", expected, config.LogsProviderURL) } @@ -369,7 +442,7 @@ func TestRead_LogsProviderURL(t *testing.T) { defaults.Setenv("logs_provider_url", expected) readConfig := ReadConfig{} - config := readConfig.Read(defaults) + config, _ := readConfig.Read(defaults) if config.LogsProviderURL.String() != expected { t.Fatalf("config.LogsProviderURL, want: %s, got: %s\n", expected, config.LogsProviderURL) }