Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add initial metrics #573

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
107 changes: 107 additions & 0 deletions cmd/kar-controllers/app/generic-server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package app

import (
"context"
"fmt"
"net"
"net/http"
"strconv"
"time"

logger "k8s.io/klog/v2"
)

type ServerOption func(*Server)

// WithTimeout sets the shutdown timeout for the server.
func WithTimeout(timeout time.Duration) ServerOption {
return func(s *Server) {
s.shutdownTimeout = timeout
}
}

type Server struct {
httpServer http.Server
listener net.Listener
endpoint string
shutdownTimeout time.Duration
}

func NewServer(port int, endpoint string, handler http.Handler, options ...ServerOption) (*Server, error) {
addr := "0"
if port != 0 {
addr = ":" + strconv.Itoa(port)
}

listener, err := newListener(addr)
if err != nil {
return nil, err
}

mux := http.NewServeMux()
mux.Handle(endpoint, handler)

s := &Server{
endpoint: endpoint,
listener: listener,
httpServer: http.Server{Handler: mux},
shutdownTimeout: 30 * time.Second, // Default value
}

for _, opt := range options {
opt(s)
}

return s, nil
}

func (s *Server) Start() (err error) {
if s.listener == nil {
logger.Infof("Serving endpoint %s is disabled", s.endpoint)
return
}

defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("serving endpoint %s failed: %v", s.endpoint, r)
}
}()

logger.Infof("Started serving endpoint %s at %s", s.endpoint, s.listener.Addr())
if e := s.httpServer.Serve(s.listener); e != http.ErrServerClosed {
return fmt.Errorf("serving endpoint %s failed: %v", s.endpoint, e)
}
return
}

func (s *Server) Shutdown() error {
if s.listener == nil {
return nil
}

logger.Info("Stopping server")

shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Try graceful shutdown
if err := s.httpServer.Shutdown(shutdownCtx); err != nil {
return fmt.Errorf("failed to shutdown server gracefully: %v", err)
}
return s.httpServer.Shutdown(shutdownCtx)
}

// newListener creates a new TCP listener bound to the given address.
func newListener(addr string) (net.Listener, error) {
// Add a case to disable serving altogether
if addr == "0" {
return nil, nil
}

listener, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to create listener: %v", err)
}

return listener, nil
}
7 changes: 5 additions & 2 deletions cmd/kar-controllers/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ type ServerOption struct {
HeadOfLineHoldingTime int
QuotaEnabled bool // Controller is to evaluate quota per request
QuotaRestURL string
HealthProbeListenAddr string
HealthProbeListenPort int
DispatchResourceReservationTimeout int64
MetricsListenPort int
}

// NewServerOption creates a new CMServer with a default config.
Expand All @@ -80,7 +81,9 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) {
fs.BoolVar(&s.QuotaEnabled, "quotaEnabled", s.QuotaEnabled, "Enable quota policy evaluation. Default is false.")
fs.StringVar(&s.QuotaRestURL, "quotaURL", s.QuotaRestURL, "URL for ReST quota management. Default is none.")
fs.IntVar(&s.SecurePort, "secure-port", 6443, "The port on which to serve secured, authenticated access for metrics.")
fs.StringVar(&s.HealthProbeListenAddr, "healthProbeListenAddr", ":8081", "Listen address for health probes. Defaults to ':8081'")
fs.IntVar(&s.HealthProbeListenPort, "healthProbeListenPort", 8081, "Listen port for health probes. Defaults to ':8081'")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eranra Helm chart may be producing a string here. Could you check and validate/fix helm chart. See deployment/mcad-controller

// using port 8083 for metrics as 8082 is used by `custom-metrics-apiserver`
fs.IntVar(&s.MetricsListenPort, "metricsListenPort", 8083, "Listen port for metrics. Defaults to ':8083'")
fs.Int64Var(&s.DispatchResourceReservationTimeout, "dispatchResourceReservationTimeout", s.DispatchResourceReservationTimeout, "Resource reservation timeout for pods to be created once AppWrapper is dispatched, in millisecond. Defaults to '300000', 5 minutes")
}

Expand Down
50 changes: 37 additions & 13 deletions cmd/kar-controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ limitations under the License.
package app

import (
"context"
"fmt"
"net/http"

"golang.org/x/sync/errgroup"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"net/http"

"github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options"
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejob"
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/health"
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/metrics"

_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)
Expand All @@ -49,41 +54,60 @@ func buildConfig(master, kubeconfig string) (*rest.Config, error) {
return rest.InClusterConfig()
}

func Run(opt *options.ServerOption) error {
func Run(ctx context.Context, opt *options.ServerOption) error {
config, err := buildConfig(opt.Master, opt.Kubeconfig)
if err != nil {
return err
}

neverStop := make(chan struct{})

config.QPS = 100.0
config.Burst = 200.0

jobctrl := queuejob.NewJobController(config, opt)
if jobctrl == nil {
return nil
return fmt.Errorf("failed to create a job controller")
}
jobctrl.Run(neverStop)

// This call is blocking (unless an error occurs) which equates to <-neverStop
err = listenHealthProbe(opt)
go jobctrl.Run(ctx.Done())

err = startHealthAndMetricsServers(ctx, opt)
if err != nil {
return err
}

<-ctx.Done()
return nil
}

func healthHandler() http.Handler {
healthHandler := http.NewServeMux()
healthHandler.Handle("/healthz", &health.Handler{})
return healthHandler
}

// Starts the health probe listener
func listenHealthProbe(opt *options.ServerOption) error {
handler := http.NewServeMux()
handler.Handle("/healthz", &health.Handler{})
err := http.ListenAndServe(opt.HealthProbeListenAddr, handler)
func startHealthAndMetricsServers(ctx context.Context, opt *options.ServerOption) error {
g, ctx := errgroup.WithContext(ctx)

// metrics server
metricsServer, err := NewServer(opt.MetricsListenPort, "/metrics", metrics.Handler())
if err != nil {
return err
}

healthServer, err := NewServer(opt.HealthProbeListenPort, "/healthz", healthHandler())
if err != nil {
return err
}

g.Go(metricsServer.Start)
g.Go(healthServer.Start)

go func() {
<-ctx.Done()
metricsServer.Shutdown()
healthServer.Shutdown()
}()

return nil
}

11 changes: 10 additions & 1 deletion cmd/kar-controllers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (

"github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app"
"github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options"

"k8s.io/apiserver/pkg/server"
)

func main() {
Expand All @@ -49,8 +51,15 @@ func main() {
s.AddFlags(flagSet)
flag.Parse()

if err := app.Run(s); err != nil {
ctx := server.SetupSignalContext()

// Run the server
if err := app.Run(ctx, s); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}

<-ctx.Done()
fmt.Println("Shutting down gracefully")

}
18 changes: 18 additions & 0 deletions deployment/mcad-controller/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,25 @@ spec:
- name: http
port: 80
targetPort: 8080
- name: metrics
port: 8083
targetPort: 8083
selector:
app: custom-metrics-apiserver
---
apiVersion: v1
kind: Service
metadata:
name: metrics
namespace: kube-system
spec:
ports:
- name: metrics
port: 8083
targetPort: 8083
selector:
app: metrics
---
#{{ if .Values.configMap.quotaRestUrl }}
apiVersion: v1
kind: Service
Expand Down Expand Up @@ -352,6 +368,8 @@ spec:
name: https
- containerPort: 8080
name: http
- containerPort: 8083
name: metrics
volumeMounts:
- mountPath: /tmp
name: temp-vol
Expand Down
8 changes: 7 additions & 1 deletion pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ import (
"github.com/hashicorp/go-multierror"
qmutils "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/util"

"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota"
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota/quotaforestmanager"
dto "github.com/prometheus/client_model/go"

"github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options"
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/metrics/adapter"
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota"
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/metrics"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"

Expand Down Expand Up @@ -124,6 +125,9 @@ type XController struct {
// Metrics API Server
metricsAdapter *adapter.MetricsAdapter

// Cluster Metrics Manager
clusterMetricsManager *metrics.ClusterMetricsManager

// EventQueueforAgent
agentEventQueue *cache.FIFO

Expand Down Expand Up @@ -171,6 +175,8 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
}
cc.metricsAdapter = adapter.New(serverOption, config, cc.cache)

cc.clusterMetricsManager = metrics.NewClusterMetricsManager(cc.cache)

cc.genericresources = genericresource.NewAppWrapperGenericResource(config)

cc.qjobResControls = map[arbv1.ResourceType]queuejobresources.Interface{}
Expand Down
78 changes: 78 additions & 0 deletions pkg/metrics/cluster_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// ------------------------------------------------------ {COPYRIGHT-TOP} ---
// Copyright 2023 The Multi-Cluster App Dispatcher Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------ {COPYRIGHT-END} ---
package metrics

import (
"time"

"k8s.io/klog/v2"

clusterstatecache "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/clusterstate/cache"
"github.com/prometheus/client_golang/prometheus"
)

var (
unallocatedCPUGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "unallocated_cpu",
Help: "Unalocated CPU (in Milicores)",
})
unallocatedMemoryGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "unallocated_memory",
Help: "Unalocated Memory (in TBD)",
})
unallocatedGPUGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "unallocated_gpu",
Help: "Unalocated GPU (in TBD)",
})
)

// register the cluster metrics
func registerClusterMetrics() {

globalPromRegistry.MustRegister(unallocatedCPUGauge)
globalPromRegistry.MustRegister(unallocatedMemoryGauge)
globalPromRegistry.MustRegister(unallocatedGPUGauge)
}

type ClusterMetricsManager struct {
Message string
}

func NewClusterMetricsManager(clusterStateCache clusterstatecache.Cache) *ClusterMetricsManager {
clusterMetricsManager := &ClusterMetricsManager{}

// register cluster metrics
registerClusterMetrics()

// update cluster metrics
go foreverUpdateClusterMetrics(clusterStateCache)

return clusterMetricsManager
}

// forever thread that updates the cluster metrics
func foreverUpdateClusterMetrics(clusterStateCache clusterstatecache.Cache) {

for {
resources := clusterStateCache.GetUnallocatedResources()
klog.V(9).Infof("[GetExternalMetric] Cache resources: %f", resources)

unallocatedCPUGauge.Set(float64(resources.MilliCPU))
unallocatedMemoryGauge.Set(float64(resources.GPU))
unallocatedGPUGauge.Set(float64(resources.GPU))
time.Sleep(time.Second)
}
}
Loading