From 72fb3492570cc1c8412a951b06a72d06af1d46de Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Wed, 9 Aug 2023 11:57:36 +0100 Subject: [PATCH 01/10] refactor: addition of root ctx to main --- cmd/kar-controllers/main.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cmd/kar-controllers/main.go b/cmd/kar-controllers/main.go index 8f259746a..b257403eb 100644 --- a/cmd/kar-controllers/main.go +++ b/cmd/kar-controllers/main.go @@ -36,6 +36,7 @@ import ( "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app" "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options" + "k8s.io/apiserver/pkg/server" "os" ) @@ -49,8 +50,15 @@ func main() { // flag.InitFlags() s.CheckOptionOrDie() - if err := app.Run(s); err != nil { + ctx := server.SetupSignalContext() + + // Run the server + if err := app.Run(ctx, s); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } + + <-ctx.Done() + fmt.Println("Shutting down gracefully") + } From 81c2ff6fe660a9c7c6ed668689f37cc024f4e52a Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Wed, 9 Aug 2023 11:58:23 +0100 Subject: [PATCH 02/10] refactor: addition of metrics address --- cmd/kar-controllers/app/options/options.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/kar-controllers/app/options/options.go b/cmd/kar-controllers/app/options/options.go index ada825490..40227b2d4 100644 --- a/cmd/kar-controllers/app/options/options.go +++ b/cmd/kar-controllers/app/options/options.go @@ -56,6 +56,7 @@ type ServerOption struct { QuotaRestURL string HealthProbeListenAddr string DispatchResourceReservationTimeout int64 + MetricsListenAddr string } // NewServerOption creates a new CMServer with a default config. @@ -82,6 +83,8 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) { fs.StringVar(&s.QuotaRestURL, "quotaURL", s.QuotaRestURL, "URL for ReST quota management. Default is none.") fs.IntVar(&s.SecurePort, "secure-port", 6443, "The port on which to serve secured, authenticated access for metrics.") fs.StringVar(&s.HealthProbeListenAddr, "healthProbeListenAddr", ":8081", "Listen address for health probes. Defaults to ':8081'") + // using port 8083 for metrics as 8082 is used by `custom-metrics-apiserver` + fs.StringVar(&s.MetricsListenAddr, "metricsListenAddr", ":8083", "Listen address for metrics. Defaults to ':8083'") fs.Int64Var(&s.DispatchResourceReservationTimeout, "dispatchResourceReservationTimeout", s.DispatchResourceReservationTimeout, "Resource reservation timeout for pods to be created once AppWrapper is dispatched, in millisecond. Defaults to '300000', 5 minutes") flag.Parse() klog.V(4).Infof("[AddFlags] Controller configuration: %#v", s) From f11e9aad2860f2bd5b7308eecd734a25b1474a6e Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Wed, 9 Aug 2023 11:59:13 +0100 Subject: [PATCH 03/10] refactor: edit of the deployment and service to expose metrics ports --- .../mcad-controller/templates/deployment.yaml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/deployment/mcad-controller/templates/deployment.yaml b/deployment/mcad-controller/templates/deployment.yaml index cfaa0a887..2adea3096 100644 --- a/deployment/mcad-controller/templates/deployment.yaml +++ b/deployment/mcad-controller/templates/deployment.yaml @@ -11,9 +11,25 @@ spec: - name: http port: 80 targetPort: 8080 + - name: metrics + port: 8083 + targetPort: 8083 selector: app: custom-metrics-apiserver --- +apiVersion: v1 +kind: Service +metadata: + name: metrics + namespace: kube-system +spec: + ports: + - name: metrics + port: 8083 + targetPort: 8083 + selector: + app: metrics +--- #{{ if .Values.configMap.quotaRestUrl }} apiVersion: v1 kind: Service @@ -352,6 +368,8 @@ spec: name: https - containerPort: 8080 name: http + - containerPort: 8083 + name: metrics volumeMounts: - mountPath: /tmp name: temp-vol From cb5f17845c8baebd64027dae69c570d77cf94569 Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Wed, 9 Aug 2023 12:05:33 +0100 Subject: [PATCH 04/10] refactor: edit of run to start controller and health + metrics concurrently this commit also edits the startHealthServer to also start collecting of default prometheus metrics. channels are used to collect potential errors and allow for graceful shutdown of servers --- cmd/kar-controllers/app/server.go | 109 ++++++++++++++++++++++++++---- 1 file changed, 94 insertions(+), 15 deletions(-) diff --git a/cmd/kar-controllers/app/server.go b/cmd/kar-controllers/app/server.go index dc72179dd..53872d4c9 100644 --- a/cmd/kar-controllers/app/server.go +++ b/cmd/kar-controllers/app/server.go @@ -31,13 +31,20 @@ limitations under the License. package app import ( + "context" + "fmt" + "net/http" + "time" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "net/http" "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejob" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/health" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) @@ -49,41 +56,113 @@ func buildConfig(master, kubeconfig string) (*rest.Config, error) { return rest.InClusterConfig() } -func Run(opt *options.ServerOption) error { +func Run(ctx context.Context, opt *options.ServerOption) error { config, err := buildConfig(opt.Master, opt.Kubeconfig) if err != nil { return err } - neverStop := make(chan struct{}) - config.QPS = 100.0 config.Burst = 200.0 jobctrl := queuejob.NewJobController(config, opt) if jobctrl == nil { - return nil + return fmt.Errorf("failed to create a job controller") } - jobctrl.Run(neverStop) - // This call is blocking (unless an error occurs) which equates to <-neverStop - err = listenHealthProbe(opt) + stopCh := make(chan struct{}) + + go func() { + defer close(stopCh) + <-ctx.Done() + }() + + go jobctrl.Run(stopCh) + + err = startHealthAndMetricsServers(ctx, opt) if err != nil { return err } + <-ctx.Done() return nil } // Starts the health probe listener -func listenHealthProbe(opt *options.ServerOption) error { - handler := http.NewServeMux() - handler.Handle("/healthz", &health.Handler{}) - err := http.ListenAndServe(opt.HealthProbeListenAddr, handler) - if err != nil { - return err +func startHealthAndMetricsServers(ctx context.Context, opt *options.ServerOption) error { + + // Create a new registry. + reg := prometheus.NewRegistry() + + // Add Go module build info. + reg.MustRegister(collectors.NewBuildInfoCollector()) + reg.MustRegister(collectors.NewGoCollector()) + reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + + metricsHandler := http.NewServeMux() + + // Use the HTTPErrorOnError option for the Prometheus handler + handlerOpts := promhttp.HandlerOpts{ + ErrorHandling: promhttp.HTTPErrorOnError, } - return nil + metricsHandler.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, handlerOpts)) + + healthHandler := http.NewServeMux() + healthHandler.Handle("/healthz", &health.Handler{}) + + metricsServer := &http.Server{ + Addr: opt.MetricsListenAddr, + Handler: metricsHandler, + } + + healthServer := &http.Server{ + Addr: opt.HealthProbeListenAddr, + Handler: healthHandler, + } + + // make a channel for errors for each server + metricsServerErrChan := make(chan error) + healthServerErrChan := make(chan error) + + // start servers in their own goroutines + go func() { + defer close(metricsServerErrChan) + err := metricsServer.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + metricsServerErrChan <- err + } + }() + + go func() { + defer close(healthServerErrChan) + err := healthServer.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + healthServerErrChan <- err + } + }() + + // use select to wait for either a shutdown signal or an error + select { + case <-ctx.Done(): + // received an OS shutdown signal, shut down servers gracefully + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + errM := metricsServer.Shutdown(ctx) + if errM != nil { + return fmt.Errorf("metrics server shutdown error: %v", errM) + } + errH := healthServer.Shutdown(ctx) + if errH != nil { + return fmt.Errorf("health server shutdown error: %v", errH) + } + case err := <-metricsServerErrChan: + return fmt.Errorf("metrics server error: %v", err) + case err := <-healthServerErrChan: + return fmt.Errorf("health server error: %v", err) + } + + return nil } From 96ffc09f2b6471d4aab30b2b8992438746f78db8 Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Wed, 9 Aug 2023 12:22:11 +0100 Subject: [PATCH 05/10] refactor: making sure that the health and metrics servers only start shutdown once jobctrl is shut down --- cmd/kar-controllers/app/server.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/cmd/kar-controllers/app/server.go b/cmd/kar-controllers/app/server.go index 53872d4c9..f02570618 100644 --- a/cmd/kar-controllers/app/server.go +++ b/cmd/kar-controllers/app/server.go @@ -71,13 +71,22 @@ func Run(ctx context.Context, opt *options.ServerOption) error { } stopCh := make(chan struct{}) + // this channel is used to signal that the job controller is done + jobctrlDoneCh := make(chan struct{}) go func() { defer close(stopCh) <-ctx.Done() }() - go jobctrl.Run(stopCh) + go func() { + jobctrl.Run(stopCh) + // close the jobctrlDoneCh channel when the job controller is done + close(jobctrlDoneCh) + }() + + // wait for the job controller to be done before shutting down the server + <-jobctrlDoneCh err = startHealthAndMetricsServers(ctx, opt) if err != nil { From 33fc50b2d4e79dba4aa7337516dd1bb956e0de1b Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Fri, 11 Aug 2023 14:28:06 +0100 Subject: [PATCH 06/10] refactor: update health and metric port defaults from strings to ints --- cmd/kar-controllers/app/options/options.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/cmd/kar-controllers/app/options/options.go b/cmd/kar-controllers/app/options/options.go index 40227b2d4..049463b32 100644 --- a/cmd/kar-controllers/app/options/options.go +++ b/cmd/kar-controllers/app/options/options.go @@ -32,10 +32,11 @@ package options import ( "flag" - klog "k8s.io/klog/v2" "os" "strconv" "strings" + + klog "k8s.io/klog/v2" ) // ServerOption is the main context object for the controller manager. @@ -51,12 +52,12 @@ type ServerOption struct { BackoffTime int // Number of seconds a job will go away for, if it can not be scheduled. Default is 20. // Head of line job will not be bumped away for at least HeadOfLineHoldingTime seconds by higher priority jobs. // Default setting to 0 disables this mechanism. - HeadOfLineHoldingTime int - QuotaEnabled bool // Controller is to evaluate quota per request - QuotaRestURL string - HealthProbeListenAddr string + HeadOfLineHoldingTime int + QuotaEnabled bool // Controller is to evaluate quota per request + QuotaRestURL string + HealthProbeListenPort int DispatchResourceReservationTimeout int64 - MetricsListenAddr string + MetricsListenPort int } // NewServerOption creates a new CMServer with a default config. @@ -79,12 +80,12 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) { fs.BoolVar(&s.Preemption, "preemption", s.Preemption, "Set controller to allow preemption if set to true. Note: when set to true, the Kubernetes Scheduler must be configured to enable preemption. Default is false.") fs.IntVar(&s.BackoffTime, "backofftime", s.BackoffTime, "Number of seconds a job will go away for, if it can not be scheduled. Default is 20.") fs.IntVar(&s.HeadOfLineHoldingTime, "headoflineholdingtime", s.HeadOfLineHoldingTime, "Number of seconds a job can stay at the Head Of Line without being bumped. Default is 0.") - fs.BoolVar(&s.QuotaEnabled,"quotaEnabled", s.QuotaEnabled,"Enable quota policy evaluation. Default is false.") + fs.BoolVar(&s.QuotaEnabled, "quotaEnabled", s.QuotaEnabled, "Enable quota policy evaluation. Default is false.") fs.StringVar(&s.QuotaRestURL, "quotaURL", s.QuotaRestURL, "URL for ReST quota management. Default is none.") fs.IntVar(&s.SecurePort, "secure-port", 6443, "The port on which to serve secured, authenticated access for metrics.") - fs.StringVar(&s.HealthProbeListenAddr, "healthProbeListenAddr", ":8081", "Listen address for health probes. Defaults to ':8081'") + fs.IntVar(&s.HealthProbeListenPort, "healthProbeListenPort", 8081, "Listen port for health probes. Defaults to ':8081'") // using port 8083 for metrics as 8082 is used by `custom-metrics-apiserver` - fs.StringVar(&s.MetricsListenAddr, "metricsListenAddr", ":8083", "Listen address for metrics. Defaults to ':8083'") + fs.IntVar(&s.MetricsListenPort, "metricsListenPort", 8083, "Listen port for metrics. Defaults to ':8083'") fs.Int64Var(&s.DispatchResourceReservationTimeout, "dispatchResourceReservationTimeout", s.DispatchResourceReservationTimeout, "Resource reservation timeout for pods to be created once AppWrapper is dispatched, in millisecond. Defaults to '300000', 5 minutes") flag.Parse() klog.V(4).Infof("[AddFlags] Controller configuration: %#v", s) From 1cd33a78d1f06b6351ab0b0a84a0b4bed89fecba Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Fri, 11 Aug 2023 14:35:56 +0100 Subject: [PATCH 07/10] refactor: addition of a generic server to use for health, metrics etc --- cmd/kar-controllers/app/generic-server.go | 107 ++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 cmd/kar-controllers/app/generic-server.go diff --git a/cmd/kar-controllers/app/generic-server.go b/cmd/kar-controllers/app/generic-server.go new file mode 100644 index 000000000..d04da8a3e --- /dev/null +++ b/cmd/kar-controllers/app/generic-server.go @@ -0,0 +1,107 @@ +package app + +import ( + "context" + "fmt" + "net" + "net/http" + "strconv" + "time" + + logger "k8s.io/klog/v2" +) + +type ServerOption func(*Server) + +// WithTimeout sets the shutdown timeout for the server. +func WithTimeout(timeout time.Duration) ServerOption { + return func(s *Server) { + s.shutdownTimeout = timeout + } +} + +type Server struct { + httpServer http.Server + listener net.Listener + endpoint string + shutdownTimeout time.Duration +} + +func NewServer(port int, endpoint string, handler http.Handler, options ...ServerOption) (*Server, error) { + addr := "0" + if port != 0 { + addr = ":" + strconv.Itoa(port) + } + + listener, err := newListener(addr) + if err != nil { + return nil, err + } + + mux := http.NewServeMux() + mux.Handle(endpoint, handler) + + s := &Server{ + endpoint: endpoint, + listener: listener, + httpServer: http.Server{Handler: mux}, + shutdownTimeout: 30 * time.Second, // Default value + } + + for _, opt := range options { + opt(s) + } + + return s, nil +} + +func (s *Server) Start() (err error) { + if s.listener == nil { + logger.Infof("Serving endpoint %s is disabled", s.endpoint) + return + } + + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("serving endpoint %s failed: %v", s.endpoint, r) + } + }() + + logger.Infof("Started serving endpoint %s at %s", s.endpoint, s.listener.Addr()) + if e := s.httpServer.Serve(s.listener); e != http.ErrServerClosed { + return fmt.Errorf("serving endpoint %s failed: %v", s.endpoint, e) + } + return +} + +func (s *Server) Shutdown() error { + if s.listener == nil { + return nil + } + + logger.Info("Stopping server") + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Try graceful shutdown + if err := s.httpServer.Shutdown(shutdownCtx); err != nil { + return fmt.Errorf("failed to shutdown server gracefully: %v", err) + } + return s.httpServer.Shutdown(shutdownCtx) +} + +// newListener creates a new TCP listener bound to the given address. +func newListener(addr string) (net.Listener, error) { + // Add a case to disable serving altogether + if addr == "0" { + return nil, nil + } + + listener, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("failed to create listener: %v", err) + } + + return listener, nil +} From d1703e6cfd6714eea9fb87298391f4b0dee2e14c Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Fri, 11 Aug 2023 16:44:33 +0100 Subject: [PATCH 08/10] refactor: use errgroup and new generic-server --- cmd/kar-controllers/app/server.go | 131 ++++++++++-------------------- 1 file changed, 43 insertions(+), 88 deletions(-) diff --git a/cmd/kar-controllers/app/server.go b/cmd/kar-controllers/app/server.go index f02570618..771ff3357 100644 --- a/cmd/kar-controllers/app/server.go +++ b/cmd/kar-controllers/app/server.go @@ -34,8 +34,8 @@ import ( "context" "fmt" "net/http" - "time" + "golang.org/x/sync/errgroup" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -49,6 +49,9 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) +// Global Prometheus Registry +var globalPromRegistry = prometheus.NewRegistry() + func buildConfig(master, kubeconfig string) (*rest.Config, error) { if master != "" || kubeconfig != "" { return clientcmd.BuildConfigFromFlags(master, kubeconfig) @@ -70,23 +73,7 @@ func Run(ctx context.Context, opt *options.ServerOption) error { return fmt.Errorf("failed to create a job controller") } - stopCh := make(chan struct{}) - // this channel is used to signal that the job controller is done - jobctrlDoneCh := make(chan struct{}) - - go func() { - defer close(stopCh) - <-ctx.Done() - }() - - go func() { - jobctrl.Run(stopCh) - // close the jobctrlDoneCh channel when the job controller is done - close(jobctrlDoneCh) - }() - - // wait for the job controller to be done before shutting down the server - <-jobctrlDoneCh + go jobctrl.Run(ctx.Done()) err = startHealthAndMetricsServers(ctx, opt) if err != nil { @@ -97,81 +84,49 @@ func Run(ctx context.Context, opt *options.ServerOption) error { return nil } -// Starts the health probe listener -func startHealthAndMetricsServers(ctx context.Context, opt *options.ServerOption) error { - - // Create a new registry. - reg := prometheus.NewRegistry() - +// metricsHandler returns a http.Handler that serves the prometheus metrics +func prometheusHandler() http.Handler { // Add Go module build info. - reg.MustRegister(collectors.NewBuildInfoCollector()) - reg.MustRegister(collectors.NewGoCollector()) - reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) - - metricsHandler := http.NewServeMux() + globalPromRegistry.MustRegister(collectors.NewBuildInfoCollector()) + globalPromRegistry.MustRegister(collectors.NewGoCollector()) + globalPromRegistry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) - // Use the HTTPErrorOnError option for the Prometheus handler handlerOpts := promhttp.HandlerOpts{ ErrorHandling: promhttp.HTTPErrorOnError, } - metricsHandler.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, handlerOpts)) - - healthHandler := http.NewServeMux() - healthHandler.Handle("/healthz", &health.Handler{}) - - metricsServer := &http.Server{ - Addr: opt.MetricsListenAddr, - Handler: metricsHandler, - } - - healthServer := &http.Server{ - Addr: opt.HealthProbeListenAddr, - Handler: healthHandler, - } - - // make a channel for errors for each server - metricsServerErrChan := make(chan error) - healthServerErrChan := make(chan error) - - // start servers in their own goroutines - go func() { - defer close(metricsServerErrChan) - err := metricsServer.ListenAndServe() - if err != nil && err != http.ErrServerClosed { - metricsServerErrChan <- err - } - }() - - go func() { - defer close(healthServerErrChan) - err := healthServer.ListenAndServe() - if err != nil && err != http.ErrServerClosed { - healthServerErrChan <- err - } - }() - - // use select to wait for either a shutdown signal or an error - select { - case <-ctx.Done(): - // received an OS shutdown signal, shut down servers gracefully - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - errM := metricsServer.Shutdown(ctx) - if errM != nil { - return fmt.Errorf("metrics server shutdown error: %v", errM) - } - errH := healthServer.Shutdown(ctx) - if errH != nil { - return fmt.Errorf("health server shutdown error: %v", errH) - } - case err := <-metricsServerErrChan: - return fmt.Errorf("metrics server error: %v", err) - case err := <-healthServerErrChan: - return fmt.Errorf("health server error: %v", err) - } - - return nil + return promhttp.HandlerFor(globalPromRegistry, handlerOpts) } +func healthHandler() http.Handler { + healthHandler := http.NewServeMux() + healthHandler.Handle("/healthz", &health.Handler{}) + return healthHandler +} + +// Starts the health probe listener +func startHealthAndMetricsServers(ctx context.Context, opt *options.ServerOption) error { + g, ctx := errgroup.WithContext(ctx) + + // metrics server + metricsServer, err := NewServer(opt.MetricsListenPort, "/metrics", prometheusHandler()) + if err != nil { + return err + } + + healthServer, err := NewServer(opt.HealthProbeListenPort, "/healthz", healthHandler()) + if err != nil { + return err + } + + g.Go(metricsServer.Start) + g.Go(healthServer.Start) + + go func() { + <-ctx.Done() + metricsServer.Shutdown() + healthServer.Shutdown() + }() + + return nil +} From 3f5b9571b4dfa2fc1a8f6ce21ac6b5981ec47a73 Mon Sep 17 00:00:00 2001 From: Eran Raichstein Date: Sun, 13 Aug 2023 12:34:18 +0300 Subject: [PATCH 09/10] Initial version of custom metrics --- cmd/kar-controllers/app/options/options.go | 6 +- cmd/kar-controllers/app/server.go | 23 +------ pkg/metrics/metrics.go | 77 ++++++++++++++++++++++ 3 files changed, 81 insertions(+), 25 deletions(-) create mode 100644 pkg/metrics/metrics.go diff --git a/cmd/kar-controllers/app/options/options.go b/cmd/kar-controllers/app/options/options.go index 509541501..7e8aa7971 100644 --- a/cmd/kar-controllers/app/options/options.go +++ b/cmd/kar-controllers/app/options/options.go @@ -35,8 +35,6 @@ import ( "os" "strconv" "strings" - - klog "k8s.io/klog/v2" ) // ServerOption is the main context object for the controller manager. @@ -55,9 +53,9 @@ type ServerOption struct { HeadOfLineHoldingTime int QuotaEnabled bool // Controller is to evaluate quota per request QuotaRestURL string - HealthProbeListenPort int + HealthProbeListenPort int DispatchResourceReservationTimeout int64 - MetricsListenPort int + MetricsListenPort int } // NewServerOption creates a new CMServer with a default config. diff --git a/cmd/kar-controllers/app/server.go b/cmd/kar-controllers/app/server.go index 771ff3357..784058df9 100644 --- a/cmd/kar-controllers/app/server.go +++ b/cmd/kar-controllers/app/server.go @@ -42,16 +42,11 @@ import ( "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejob" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/health" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/collectors" - "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/metrics" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) -// Global Prometheus Registry -var globalPromRegistry = prometheus.NewRegistry() - func buildConfig(master, kubeconfig string) (*rest.Config, error) { if master != "" || kubeconfig != "" { return clientcmd.BuildConfigFromFlags(master, kubeconfig) @@ -84,20 +79,6 @@ func Run(ctx context.Context, opt *options.ServerOption) error { return nil } -// metricsHandler returns a http.Handler that serves the prometheus metrics -func prometheusHandler() http.Handler { - // Add Go module build info. - globalPromRegistry.MustRegister(collectors.NewBuildInfoCollector()) - globalPromRegistry.MustRegister(collectors.NewGoCollector()) - globalPromRegistry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) - - handlerOpts := promhttp.HandlerOpts{ - ErrorHandling: promhttp.HTTPErrorOnError, - } - - return promhttp.HandlerFor(globalPromRegistry, handlerOpts) -} - func healthHandler() http.Handler { healthHandler := http.NewServeMux() healthHandler.Handle("/healthz", &health.Handler{}) @@ -109,7 +90,7 @@ func startHealthAndMetricsServers(ctx context.Context, opt *options.ServerOption g, ctx := errgroup.WithContext(ctx) // metrics server - metricsServer, err := NewServer(opt.MetricsListenPort, "/metrics", prometheusHandler()) + metricsServer, err := NewServer(opt.MetricsListenPort, "/metrics", metrics.Handler()) if err != nil { return err } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 000000000..89584e0fd --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,77 @@ +// ------------------------------------------------------ {COPYRIGHT-TOP} --- +// Copyright 2022 The Multi-Cluster App Dispatcher Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------ {COPYRIGHT-END} --- +package metrics + +import ( + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +var ( + dummyCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "dummy_counter", + Help: "A dummy Prometheus counter", + }) + + gpuGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "gpu", + Help: "Number of Available GPUs", + }) +) + +// Global Prometheus Registry +var globalPromRegistry = prometheus.NewRegistry() + +// MetricsHandler returns a http.Handler that serves the prometheus metrics +func Handler() http.Handler { + + // register standrad metrics + globalPromRegistry.MustRegister(collectors.NewBuildInfoCollector()) + globalPromRegistry.MustRegister(collectors.NewGoCollector()) + globalPromRegistry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + + // register custom metrics + registerCustomMetrics() + + // update custom metrics + go foreverUpdateCustomMetrics() + + handlerOpts := promhttp.HandlerOpts{ + ErrorHandling: promhttp.HTTPErrorOnError, + } + + return promhttp.HandlerFor(globalPromRegistry, handlerOpts) +} + +// register the custom metrics +func registerCustomMetrics() { + + globalPromRegistry.MustRegister(dummyCounter) + globalPromRegistry.MustRegister(gpuGauge) +} + +// forever thread that updates the custom metrics +func foreverUpdateCustomMetrics() { + for { + dummyCounter.Inc() + gpuGauge.Inc() + time.Sleep(time.Second) + } +} From 7e33c69e8964d01874c58f4acf0b818579c8671f Mon Sep 17 00:00:00 2001 From: Eran Raichstein Date: Sun, 20 Aug 2023 10:28:54 +0300 Subject: [PATCH 10/10] add update prometheous cluster metrics --- .../queuejob/queuejob_controller_ex.go | 8 +- pkg/metrics/cluster_metrics.go | 78 +++++++++++++++++++ pkg/metrics/metrics.go | 37 +-------- 3 files changed, 86 insertions(+), 37 deletions(-) create mode 100644 pkg/metrics/cluster_metrics.go diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 2146a4ac6..5eeb57f59 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -35,12 +35,13 @@ import ( "github.com/hashicorp/go-multierror" qmutils "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/util" + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota/quotaforestmanager" dto "github.com/prometheus/client_model/go" "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/metrics/adapter" - "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota" + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/metrics" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -124,6 +125,9 @@ type XController struct { // Metrics API Server metricsAdapter *adapter.MetricsAdapter + // Cluster Metrics Manager + clusterMetricsManager *metrics.ClusterMetricsManager + // EventQueueforAgent agentEventQueue *cache.FIFO @@ -171,6 +175,8 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) * } cc.metricsAdapter = adapter.New(serverOption, config, cc.cache) + cc.clusterMetricsManager = metrics.NewClusterMetricsManager(cc.cache) + cc.genericresources = genericresource.NewAppWrapperGenericResource(config) cc.qjobResControls = map[arbv1.ResourceType]queuejobresources.Interface{} diff --git a/pkg/metrics/cluster_metrics.go b/pkg/metrics/cluster_metrics.go new file mode 100644 index 000000000..8adcb5fd1 --- /dev/null +++ b/pkg/metrics/cluster_metrics.go @@ -0,0 +1,78 @@ +// ------------------------------------------------------ {COPYRIGHT-TOP} --- +// Copyright 2023 The Multi-Cluster App Dispatcher Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------ {COPYRIGHT-END} --- +package metrics + +import ( + "time" + + "k8s.io/klog/v2" + + clusterstatecache "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/clusterstate/cache" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + unallocatedCPUGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "unallocated_cpu", + Help: "Unalocated CPU (in Milicores)", + }) + unallocatedMemoryGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "unallocated_memory", + Help: "Unalocated Memory (in TBD)", + }) + unallocatedGPUGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "unallocated_gpu", + Help: "Unalocated GPU (in TBD)", + }) +) + +// register the cluster metrics +func registerClusterMetrics() { + + globalPromRegistry.MustRegister(unallocatedCPUGauge) + globalPromRegistry.MustRegister(unallocatedMemoryGauge) + globalPromRegistry.MustRegister(unallocatedGPUGauge) +} + +type ClusterMetricsManager struct { + Message string +} + +func NewClusterMetricsManager(clusterStateCache clusterstatecache.Cache) *ClusterMetricsManager { + clusterMetricsManager := &ClusterMetricsManager{} + + // register cluster metrics + registerClusterMetrics() + + // update cluster metrics + go foreverUpdateClusterMetrics(clusterStateCache) + + return clusterMetricsManager +} + +// forever thread that updates the cluster metrics +func foreverUpdateClusterMetrics(clusterStateCache clusterstatecache.Cache) { + + for { + resources := clusterStateCache.GetUnallocatedResources() + klog.V(9).Infof("[GetExternalMetric] Cache resources: %f", resources) + + unallocatedCPUGauge.Set(float64(resources.MilliCPU)) + unallocatedMemoryGauge.Set(float64(resources.GPU)) + unallocatedGPUGauge.Set(float64(resources.GPU)) + time.Sleep(time.Second) + } +} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 89584e0fd..24b158149 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -1,5 +1,5 @@ // ------------------------------------------------------ {COPYRIGHT-TOP} --- -// Copyright 2022 The Multi-Cluster App Dispatcher Authors. +// Copyright 2023 The Multi-Cluster App Dispatcher Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,25 +17,12 @@ package metrics import ( "net/http" - "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" ) -var ( - dummyCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "dummy_counter", - Help: "A dummy Prometheus counter", - }) - - gpuGauge = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "gpu", - Help: "Number of Available GPUs", - }) -) - // Global Prometheus Registry var globalPromRegistry = prometheus.NewRegistry() @@ -47,31 +34,9 @@ func Handler() http.Handler { globalPromRegistry.MustRegister(collectors.NewGoCollector()) globalPromRegistry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) - // register custom metrics - registerCustomMetrics() - - // update custom metrics - go foreverUpdateCustomMetrics() - handlerOpts := promhttp.HandlerOpts{ ErrorHandling: promhttp.HTTPErrorOnError, } return promhttp.HandlerFor(globalPromRegistry, handlerOpts) } - -// register the custom metrics -func registerCustomMetrics() { - - globalPromRegistry.MustRegister(dummyCounter) - globalPromRegistry.MustRegister(gpuGauge) -} - -// forever thread that updates the custom metrics -func foreverUpdateCustomMetrics() { - for { - dummyCounter.Inc() - gpuGauge.Inc() - time.Sleep(time.Second) - } -}