Skip to content

Commit

Permalink
feat: only operate on cloudprovider managed resources
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Nov 16, 2024
1 parent 3e0c51a commit 3b2a62b
Show file tree
Hide file tree
Showing 19 changed files with 392 additions and 208 deletions.
8 changes: 8 additions & 0 deletions pkg/apis/v1/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)

// NodeClaimSpec describes the desired state of the NodeClaim
Expand Down Expand Up @@ -108,6 +109,13 @@ type NodeClassReference struct {
Group string `json:"group"`
}

func (ncr *NodeClassReference) GroupKind() schema.GroupKind {
return schema.GroupKind{
Group: ncr.Group,
Kind: ncr.Kind,
}
}

// +kubebuilder:object:generate=false
type Provider = runtime.RawExtension

Expand Down
16 changes: 8 additions & 8 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,22 @@ func NewControllers(
disruption.NewController(clock, kubeClient, p, cloudProvider, recorder, cluster, disruptionQueue),
provisioning.NewPodController(kubeClient, p),
provisioning.NewNodeController(kubeClient, p),
nodepoolhash.NewController(kubeClient),
expiration.NewController(clock, kubeClient),
nodepoolhash.NewController(kubeClient, cloudProvider),
expiration.NewController(clock, kubeClient, cloudProvider),
informer.NewDaemonSetController(kubeClient, cluster),
informer.NewNodeController(kubeClient, cluster),
informer.NewPodController(kubeClient, cluster),
informer.NewNodePoolController(kubeClient, cluster),
informer.NewNodeClaimController(kubeClient, cluster),
informer.NewNodePoolController(kubeClient, cloudProvider, cluster),
informer.NewNodeClaimController(kubeClient, cloudProvider, cluster),
termination.NewController(clock, kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, recorder), recorder),
metricspod.NewController(kubeClient),
metricsnodepool.NewController(kubeClient),
metricsnodepool.NewController(kubeClient, cloudProvider),
metricsnode.NewController(cluster),
nodepoolreadiness.NewController(kubeClient, cloudProvider),
nodepoolcounter.NewController(kubeClient, cluster),
nodepoolvalidation.NewController(kubeClient),
nodepoolcounter.NewController(kubeClient, cloudProvider, cluster),
nodepoolvalidation.NewController(kubeClient, cloudProvider),
podevents.NewController(clock, kubeClient),
nodeclaimconsistency.NewController(clock, kubeClient, recorder),
nodeclaimconsistency.NewController(clock, kubeClient, cloudProvider, recorder),
nodeclaimlifecycle.NewController(clock, kubeClient, cloudProvider, recorder),
nodeclaimgarbagecollection.NewController(clock, kubeClient, cloudProvider),
nodeclaimdisruption.NewController(clock, kubeClient, cloudProvider),
Expand Down
7 changes: 4 additions & 3 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"sigs.k8s.io/karpenter/pkg/utils/nodepool"
"sigs.k8s.io/karpenter/pkg/utils/pretty"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
Expand Down Expand Up @@ -284,13 +285,13 @@ func (c *Controller) logAbnormalRuns(ctx context.Context) {

// logInvalidBudgets will log if there are any invalid schedules detected
func (c *Controller) logInvalidBudgets(ctx context.Context) {
nodePoolList := &v1.NodePoolList{}
if err := c.kubeClient.List(ctx, nodePoolList); err != nil {
nodePools, err := nodepool.ListManagedNodePools(ctx, c.kubeClient, c.cloudProvider)
if err != nil {
log.FromContext(ctx).Error(err, "failed listing nodepools")
return
}
var buf bytes.Buffer
for _, np := range nodePoolList.Items {
for _, np := range nodePools {
// Use a dummy value of 100 since we only care if this errors.
for _, method := range c.methods {
if _, err := np.GetAllowedDisruptionsByReason(c.clock, 100, method.Reason()); err != nil {
Expand Down
20 changes: 10 additions & 10 deletions pkg/controllers/disruption/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"sigs.k8s.io/karpenter/pkg/metrics"
operatorlogging "sigs.k8s.io/karpenter/pkg/operator/logging"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
"sigs.k8s.io/karpenter/pkg/utils/nodepool"
"sigs.k8s.io/karpenter/pkg/utils/pdb"
)

Expand Down Expand Up @@ -162,13 +163,13 @@ func GetCandidates(ctx context.Context, cluster *state.Cluster, kubeClient clien
// BuildNodePoolMap builds a provName -> nodePool map and a provName -> instanceName -> instance type map
func BuildNodePoolMap(ctx context.Context, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) (map[string]*v1.NodePool, map[string]map[string]*cloudprovider.InstanceType, error) {
nodePoolMap := map[string]*v1.NodePool{}
nodePoolList := &v1.NodePoolList{}
if err := kubeClient.List(ctx, nodePoolList); err != nil {
nodePools, err := nodepool.ListManagedNodePools(ctx, kubeClient, cloudProvider)
if err != nil {
return nil, nil, fmt.Errorf("listing node pools, %w", err)
}

nodePoolToInstanceTypesMap := map[string]map[string]*cloudprovider.InstanceType{}
for i := range nodePoolList.Items {
np := &nodePoolList.Items[i]
for _, np := range nodePools {
nodePoolMap[np.Name] = np

nodePoolInstanceTypes, err := cloudProvider.GetInstanceTypes(ctx, np)
Expand All @@ -193,7 +194,7 @@ func BuildNodePoolMap(ctx context.Context, kubeClient client.Client, cloudProvid
// We calculate allowed disruptions by taking the max disruptions allowed by disruption reason and subtracting the number of nodes that are NotReady and already being deleted by that disruption reason.
//
//nolint:gocyclo
func BuildDisruptionBudgetMapping(ctx context.Context, cluster *state.Cluster, clk clock.Clock, kubeClient client.Client, recorder events.Recorder, reason v1.DisruptionReason) (map[string]int, error) {
func BuildDisruptionBudgetMapping(ctx context.Context, cluster *state.Cluster, clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, recorder events.Recorder, reason v1.DisruptionReason) (map[string]int, error) {
disruptionBudgetMapping := map[string]int{}
numNodes := map[string]int{} // map[nodepool] -> node count in nodepool
disrupting := map[string]int{} // map[nodepool] -> nodes undergoing disruption
Expand Down Expand Up @@ -226,19 +227,18 @@ func BuildDisruptionBudgetMapping(ctx context.Context, cluster *state.Cluster, c
disrupting[nodePool]++
}
}
nodePoolList := &v1.NodePoolList{}
if err := kubeClient.List(ctx, nodePoolList); err != nil {
nodePools, err := nodepool.ListManagedNodePools(ctx, kubeClient, cloudProvider)
if err != nil {
return disruptionBudgetMapping, fmt.Errorf("listing node pools, %w", err)
}
for _, nodePool := range nodePoolList.Items {
for _, nodePool := range nodePools {
allowedDisruptions := nodePool.MustGetAllowedDisruptions(clk, numNodes[nodePool.Name], reason)
disruptionBudgetMapping[nodePool.Name] = lo.Max([]int{allowedDisruptions - disrupting[nodePool.Name], 0})

NodePoolAllowedDisruptions.Set(float64(allowedDisruptions), map[string]string{
metrics.NodePoolLabel: nodePool.Name, metrics.ReasonLabel: string(reason),
})
if allowedDisruptions == 0 {
recorder.Publish(disruptionevents.NodePoolBlockedForDisruptionReason(lo.ToPtr(nodePool), reason))
recorder.Publish(disruptionevents.NodePoolBlockedForDisruptionReason(nodePool, reason))
}
}
return disruptionBudgetMapping, nil
Expand Down
17 changes: 11 additions & 6 deletions pkg/controllers/metrics/nodepool/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/operator/injection"
nodepoolutils "sigs.k8s.io/karpenter/pkg/utils/nodepool"
)

const (
Expand Down Expand Up @@ -72,15 +75,17 @@ var (
)

type Controller struct {
kubeClient client.Client
metricStore *metrics.Store
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
metricStore *metrics.Store
}

// NewController constructs a controller instance
func NewController(kubeClient client.Client) *Controller {
func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller {
return &Controller{
kubeClient: kubeClient,
metricStore: metrics.NewStore(),
kubeClient: kubeClient,
cloudProvider: cloudProvider,
metricStore: metrics.NewStore(),
}
}

Expand Down Expand Up @@ -133,6 +138,6 @@ func makeLabels(nodePool *v1.NodePool, resourceTypeName string) prometheus.Label
func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("metrics.nodepool").
For(&v1.NodePool{}).
For(&v1.NodePool{}, builder.WithPredicates(nodepoolutils.IsMangedPredicates(c.cloudProvider))).
Complete(c)
}
28 changes: 16 additions & 12 deletions pkg/controllers/nodeclaim/consistency/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,27 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/operator/injection"
nodeclaimutil "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
)

type Controller struct {
clock clock.Clock
kubeClient client.Client
checks []Check
recorder events.Recorder
lastScanned *cache.Cache
clock clock.Clock
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
checks []Check
recorder events.Recorder
lastScanned *cache.Cache
}

type Issue string
Expand All @@ -59,12 +62,13 @@ type Check interface {
// scanPeriod is how often we inspect and report issues that are found.
const scanPeriod = 10 * time.Minute

func NewController(clk clock.Clock, kubeClient client.Client, recorder events.Recorder) *Controller {
func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, recorder events.Recorder) *Controller {
return &Controller{
clock: clk,
kubeClient: kubeClient,
recorder: recorder,
lastScanned: cache.New(scanPeriod, 1*time.Minute),
clock: clk,
kubeClient: kubeClient,
cloudProvider: cloudProvider,
recorder: recorder,
lastScanned: cache.New(scanPeriod, 1*time.Minute),
checks: []Check{
NewTermination(clk, kubeClient),
NewNodeShape(),
Expand Down Expand Up @@ -140,10 +144,10 @@ func (c *Controller) checkConsistency(ctx context.Context, nodeClaim *v1.NodeCla
func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("nodeclaim.consistency").
For(&v1.NodeClaim{}).
For(&v1.NodeClaim{}, builder.WithPredicates(nodeclaimutil.IsMangedPredicates(c.cloudProvider))).
Watches(
&corev1.Node{},
nodeclaimutil.NodeEventHandler(c.kubeClient),
nodeclaimutil.NodeEventHandler(c.kubeClient, nodeclaimutil.WithManagedNodeClaimFilter(c.cloudProvider)),
).
WithOptions(controller.Options{MaxConcurrentReconciles: 10}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
Expand Down
23 changes: 11 additions & 12 deletions pkg/controllers/nodeclaim/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -109,24 +110,22 @@ func (c *Controller) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (re
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
builder := controllerruntime.NewControllerManagedBy(m)
for _, nodeClass := range c.cloudProvider.GetSupportedNodeClasses() {
builder = builder.Watches(
nodeClass,
nodeclaimutil.NodeClassEventHandler(c.kubeClient),
)
}
return builder.
b := controllerruntime.NewControllerManagedBy(m).
Named("nodeclaim.disruption").
For(&v1.NodeClaim{}).
For(&v1.NodeClaim{}, builder.WithPredicates(nodeclaimutil.IsMangedPredicates(c.cloudProvider))).
WithOptions(controller.Options{MaxConcurrentReconciles: 10}).
// Note: We don't use the ManagedNodeClaim for NodePool updates because drift should be captured when updating
// a NodePool's NodeClassRef to an unsupported NodeClass.
Watches(
&v1.NodePool{},
nodeclaimutil.NodePoolEventHandler(c.kubeClient),
).
Watches(
&corev1.Pod{},
nodeclaimutil.PodEventHandler(c.kubeClient),
).
Complete(reconcile.AsReconciler(m.GetClient(), c))
nodeclaimutil.PodEventHandler(c.kubeClient, nodeclaimutil.WithManagedNodeClaimFilter(c.cloudProvider)),
)
for _, nodeClass := range c.cloudProvider.GetSupportedNodeClasses() {
b.Watches(nodeClass, nodeclaimutil.NodeClassEventHandler(c.kubeClient))
}
return b.Complete(reconcile.AsReconciler(m.GetClient(), c))
}
17 changes: 11 additions & 6 deletions pkg/controllers/nodeclaim/expiration/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,31 @@ import (

"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/metrics"
nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
)

// Expiration is a nodeclaim controller that deletes expired nodeclaims based on expireAfter
type Controller struct {
clock clock.Clock
kubeClient client.Client
clock clock.Clock
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
}

// NewController constructs a nodeclaim disruption controller
func NewController(clk clock.Clock, kubeClient client.Client) *Controller {
func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller {
return &Controller{
kubeClient: kubeClient,
clock: clk,
clock: clk,
kubeClient: kubeClient,
cloudProvider: cloudProvider,
}
}

Expand Down Expand Up @@ -81,6 +86,6 @@ func (c *Controller) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (re
func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("nodeclaim.expiration").
For(&v1.NodeClaim{}).
For(&v1.NodeClaim{}, builder.WithPredicates(nodeclaimutils.IsMangedPredicates(c.cloudProvider))).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}
5 changes: 3 additions & 2 deletions pkg/controllers/nodeclaim/lifecycle/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -86,10 +87,10 @@ func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider clou
func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named(c.Name()).
For(&v1.NodeClaim{}).
For(&v1.NodeClaim{}, builder.WithPredicates(nodeclaimutil.IsMangedPredicates(c.cloudProvider))).
Watches(
&corev1.Node{},
nodeclaimutil.NodeEventHandler(c.kubeClient),
nodeclaimutil.NodeEventHandler(c.kubeClient, nodeclaimutil.WithManagedNodeClaimFilter(c.cloudProvider)),
).
WithOptions(controller.Options{
RateLimiter: workqueue.NewTypedMaxOfRateLimiter[reconcile.Request](
Expand Down
Loading

0 comments on commit 3b2a62b

Please sign in to comment.