Skip to content

Commit

Permalink
feat: only operate on cloudprovider managed resources
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Nov 17, 2024
1 parent 3e0c51a commit 4071259
Show file tree
Hide file tree
Showing 30 changed files with 581 additions and 303 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/v1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
// Karpenter specific domains and labels
const (
NodePoolLabelKey = apis.Group + "/nodepool"
NodeClassGroup = apis.Group + "/nodeclass-group"
NodeClassKind = apis.Group + "/nodeclass-kind"
NodeInitializedLabelKey = apis.Group + "/initialized"
NodeRegisteredLabelKey = apis.Group + "/registered"
CapacityTypeLabelKey = apis.Group + "/capacity-type"
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/v1/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ limitations under the License.
package v1

import (
"fmt"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)

// NodeClaimSpec describes the desired state of the NodeClaim
Expand Down Expand Up @@ -108,6 +111,13 @@ type NodeClassReference struct {
Group string `json:"group"`
}

func (ncr *NodeClassReference) GroupKind() schema.GroupKind {
return schema.GroupKind{
Group: ncr.Group,
Kind: ncr.Kind,
}
}

// +kubebuilder:object:generate=false
type Provider = runtime.RawExtension

Expand Down
17 changes: 0 additions & 17 deletions pkg/apis/v1/nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,23 +290,6 @@ type NodePoolList struct {
Items []NodePool `json:"items"`
}

// OrderByWeight orders the NodePools in the NodePoolList by their priority weight in-place.
// This priority evaluates the following things in precedence order:
// 1. NodePools that have a larger weight are ordered first
// 2. If two NodePools have the same weight, then the NodePool with the name later in the alphabet will come first
func (nl *NodePoolList) OrderByWeight() {
sort.Slice(nl.Items, func(a, b int) bool {
weightA := lo.FromPtr(nl.Items[a].Spec.Weight)
weightB := lo.FromPtr(nl.Items[b].Spec.Weight)

if weightA == weightB {
// Order NodePools by name for a consistent ordering when sorting equal weight
return nl.Items[a].Name > nl.Items[b].Name
}
return weightA > weightB
})
}

// MustGetAllowedDisruptions calls GetAllowedDisruptionsByReason if the error is not nil. This reduces the
// amount of state that the disruption controller must reconcile, while allowing the GetAllowedDisruptionsByReason()
// to bubble up any errors in validation.
Expand Down
44 changes: 0 additions & 44 deletions pkg/apis/v1/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,48 +55,4 @@ var _ = AfterSuite(func() {
})

var _ = Describe("OrderByWeight", func() {
It("should order the NodePools by weight", func() {
// Generate 10 NodePools that have random weights, some might have the same weights
var nodePools []v1.NodePool
for i := 0; i < 10; i++ {
np := test.NodePool(v1.NodePool{
Spec: v1.NodePoolSpec{
Weight: lo.ToPtr[int32](int32(rand.Intn(100) + 1)), //nolint:gosec
},
})
nodePools = append(nodePools, *np)
}

nodePools = lo.Shuffle(nodePools)
nodePoolList := v1.NodePoolList{Items: nodePools}
nodePoolList.OrderByWeight()

lastWeight := 101 // This is above the allowed weight values
for _, np := range nodePoolList.Items {
Expect(lo.FromPtr(np.Spec.Weight)).To(BeNumerically("<=", lastWeight))
lastWeight = int(lo.FromPtr(np.Spec.Weight))
}
})
It("should order the NodePools by name when the weights are the same", func() {
// Generate 10 NodePools with the same weight
var nodePools []v1.NodePool
for i := 0; i < 10; i++ {
np := test.NodePool(v1.NodePool{
Spec: v1.NodePoolSpec{
Weight: lo.ToPtr[int32](10),
},
})
nodePools = append(nodePools, *np)
}

nodePools = lo.Shuffle(nodePools)
nodePoolList := v1.NodePoolList{Items: nodePools}
nodePoolList.OrderByWeight()

lastName := "zzzzzzzzzzzzzzzzzzzzzzzz" // large string value
for _, np := range nodePoolList.Items {
Expect(np.Name < lastName).To(BeTrue())
lastName = np.Name
}
})
})
18 changes: 9 additions & 9 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,22 @@ func NewControllers(
disruption.NewController(clock, kubeClient, p, cloudProvider, recorder, cluster, disruptionQueue),
provisioning.NewPodController(kubeClient, p),
provisioning.NewNodeController(kubeClient, p),
nodepoolhash.NewController(kubeClient),
expiration.NewController(clock, kubeClient),
nodepoolhash.NewController(kubeClient, cloudProvider),
expiration.NewController(clock, kubeClient, cloudProvider),
informer.NewDaemonSetController(kubeClient, cluster),
informer.NewNodeController(kubeClient, cluster),
informer.NewPodController(kubeClient, cluster),
informer.NewNodePoolController(kubeClient, cluster),
informer.NewNodeClaimController(kubeClient, cluster),
informer.NewNodePoolController(kubeClient, cloudProvider, cluster),
informer.NewNodeClaimController(kubeClient, cloudProvider, cluster),
termination.NewController(clock, kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, recorder), recorder),
metricspod.NewController(kubeClient),
metricsnodepool.NewController(kubeClient),
metricsnodepool.NewController(kubeClient, cloudProvider),
metricsnode.NewController(cluster),
nodepoolreadiness.NewController(kubeClient, cloudProvider),
nodepoolcounter.NewController(kubeClient, cluster),
nodepoolvalidation.NewController(kubeClient),
podevents.NewController(clock, kubeClient),
nodeclaimconsistency.NewController(clock, kubeClient, recorder),
nodepoolcounter.NewController(kubeClient, cloudProvider, cluster),
nodepoolvalidation.NewController(kubeClient, cloudProvider),
podevents.NewController(clock, kubeClient, cloudProvider),
nodeclaimconsistency.NewController(clock, kubeClient, cloudProvider, recorder),
nodeclaimlifecycle.NewController(clock, kubeClient, cloudProvider, recorder),
nodeclaimgarbagecollection.NewController(clock, kubeClient, cloudProvider),
nodeclaimdisruption.NewController(clock, kubeClient, cloudProvider),
Expand Down
7 changes: 4 additions & 3 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"sigs.k8s.io/karpenter/pkg/utils/nodepool"
"sigs.k8s.io/karpenter/pkg/utils/pretty"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
Expand Down Expand Up @@ -284,13 +285,13 @@ func (c *Controller) logAbnormalRuns(ctx context.Context) {

// logInvalidBudgets will log if there are any invalid schedules detected
func (c *Controller) logInvalidBudgets(ctx context.Context) {
nodePoolList := &v1.NodePoolList{}
if err := c.kubeClient.List(ctx, nodePoolList); err != nil {
nodePools, err := nodepool.ListManagedNodePools(ctx, c.kubeClient, c.cloudProvider)
if err != nil {
log.FromContext(ctx).Error(err, "failed listing nodepools")
return
}
var buf bytes.Buffer
for _, np := range nodePoolList.Items {
for _, np := range nodePools {
// Use a dummy value of 100 since we only care if this errors.
for _, method := range c.methods {
if _, err := np.GetAllowedDisruptionsByReason(c.clock, 100, method.Reason()); err != nil {
Expand Down
20 changes: 10 additions & 10 deletions pkg/controllers/disruption/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"sigs.k8s.io/karpenter/pkg/metrics"
operatorlogging "sigs.k8s.io/karpenter/pkg/operator/logging"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
"sigs.k8s.io/karpenter/pkg/utils/nodepool"
"sigs.k8s.io/karpenter/pkg/utils/pdb"
)

Expand Down Expand Up @@ -162,13 +163,13 @@ func GetCandidates(ctx context.Context, cluster *state.Cluster, kubeClient clien
// BuildNodePoolMap builds a provName -> nodePool map and a provName -> instanceName -> instance type map
func BuildNodePoolMap(ctx context.Context, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) (map[string]*v1.NodePool, map[string]map[string]*cloudprovider.InstanceType, error) {
nodePoolMap := map[string]*v1.NodePool{}
nodePoolList := &v1.NodePoolList{}
if err := kubeClient.List(ctx, nodePoolList); err != nil {
nodePools, err := nodepool.ListManagedNodePools(ctx, kubeClient, cloudProvider)
if err != nil {
return nil, nil, fmt.Errorf("listing node pools, %w", err)
}

nodePoolToInstanceTypesMap := map[string]map[string]*cloudprovider.InstanceType{}
for i := range nodePoolList.Items {
np := &nodePoolList.Items[i]
for _, np := range nodePools {
nodePoolMap[np.Name] = np

nodePoolInstanceTypes, err := cloudProvider.GetInstanceTypes(ctx, np)
Expand All @@ -193,7 +194,7 @@ func BuildNodePoolMap(ctx context.Context, kubeClient client.Client, cloudProvid
// We calculate allowed disruptions by taking the max disruptions allowed by disruption reason and subtracting the number of nodes that are NotReady and already being deleted by that disruption reason.
//
//nolint:gocyclo
func BuildDisruptionBudgetMapping(ctx context.Context, cluster *state.Cluster, clk clock.Clock, kubeClient client.Client, recorder events.Recorder, reason v1.DisruptionReason) (map[string]int, error) {
func BuildDisruptionBudgetMapping(ctx context.Context, cluster *state.Cluster, clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, recorder events.Recorder, reason v1.DisruptionReason) (map[string]int, error) {
disruptionBudgetMapping := map[string]int{}
numNodes := map[string]int{} // map[nodepool] -> node count in nodepool
disrupting := map[string]int{} // map[nodepool] -> nodes undergoing disruption
Expand Down Expand Up @@ -226,19 +227,18 @@ func BuildDisruptionBudgetMapping(ctx context.Context, cluster *state.Cluster, c
disrupting[nodePool]++
}
}
nodePoolList := &v1.NodePoolList{}
if err := kubeClient.List(ctx, nodePoolList); err != nil {
nodePools, err := nodepool.ListManagedNodePools(ctx, kubeClient, cloudProvider)
if err != nil {
return disruptionBudgetMapping, fmt.Errorf("listing node pools, %w", err)
}
for _, nodePool := range nodePoolList.Items {
for _, nodePool := range nodePools {
allowedDisruptions := nodePool.MustGetAllowedDisruptions(clk, numNodes[nodePool.Name], reason)
disruptionBudgetMapping[nodePool.Name] = lo.Max([]int{allowedDisruptions - disrupting[nodePool.Name], 0})

NodePoolAllowedDisruptions.Set(float64(allowedDisruptions), map[string]string{
metrics.NodePoolLabel: nodePool.Name, metrics.ReasonLabel: string(reason),
})
if allowedDisruptions == 0 {
recorder.Publish(disruptionevents.NodePoolBlockedForDisruptionReason(lo.ToPtr(nodePool), reason))
recorder.Publish(disruptionevents.NodePoolBlockedForDisruptionReason(nodePool, reason))
}
}
return disruptionBudgetMapping, nil
Expand Down
17 changes: 11 additions & 6 deletions pkg/controllers/metrics/nodepool/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/operator/injection"
nodepoolutils "sigs.k8s.io/karpenter/pkg/utils/nodepool"
)

const (
Expand Down Expand Up @@ -72,15 +75,17 @@ var (
)

type Controller struct {
kubeClient client.Client
metricStore *metrics.Store
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
metricStore *metrics.Store
}

// NewController constructs a controller instance
func NewController(kubeClient client.Client) *Controller {
func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller {
return &Controller{
kubeClient: kubeClient,
metricStore: metrics.NewStore(),
kubeClient: kubeClient,
cloudProvider: cloudProvider,
metricStore: metrics.NewStore(),
}
}

Expand Down Expand Up @@ -133,6 +138,6 @@ func makeLabels(nodePool *v1.NodePool, resourceTypeName string) prometheus.Label
func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("metrics.nodepool").
For(&v1.NodePool{}).
For(&v1.NodePool{}, builder.WithPredicates(nodepoolutils.IsMangedPredicates(c.cloudProvider))).
Complete(c)
}
16 changes: 14 additions & 2 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

"github.com/awslabs/operatorpkg/status"
"github.com/samber/lo"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
Expand All @@ -38,6 +39,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
"sigs.k8s.io/karpenter/pkg/utils/pretty"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
Expand Down Expand Up @@ -87,7 +89,17 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
if !controllerutil.ContainsFinalizer(node, v1.TerminationFinalizer) {
return reconcile.Result{}, nil
}
nodeClaims, err := nodeutils.GetNodeClaims(ctx, node, c.kubeClient)

// Only reconcile against nodes which are managed by this instance of Karpenter. This label is guaranateed to have
// been successfully propagated since it's propagated by the same update as the termination finalizer.
if !lo.ContainsBy(c.cloudProvider.GetSupportedNodeClasses(), func(nc status.Object) bool {
_, ok := node.Labels[nodeclaim.NodeClassLabelKeyFromGroupKind(nc.GetObjectKind().GroupVersionKind().GroupKind())]
return ok
}) {
return reconcile.Result{}, nil
}

nodeClaims, err := nodeutils.GetNodeClaims(ctx, node, c.kubeClient, nodeclaim.WithManagedNodeClaimFilter(c.cloudProvider))
if err != nil {
return reconcile.Result{}, fmt.Errorf("listing nodeclaims, %w", err)
}
Expand Down Expand Up @@ -143,7 +155,7 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
}
nodeClaims, err = nodeutils.GetNodeClaims(ctx, node, c.kubeClient)
nodeClaims, err = nodeutils.GetNodeClaims(ctx, node, c.kubeClient, nodeclaim.WithManagedNodeClaimFilter(c.cloudProvider))
if err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
}
Expand Down
28 changes: 16 additions & 12 deletions pkg/controllers/nodeclaim/consistency/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,27 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/operator/injection"
nodeclaimutil "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
)

type Controller struct {
clock clock.Clock
kubeClient client.Client
checks []Check
recorder events.Recorder
lastScanned *cache.Cache
clock clock.Clock
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
checks []Check
recorder events.Recorder
lastScanned *cache.Cache
}

type Issue string
Expand All @@ -59,12 +62,13 @@ type Check interface {
// scanPeriod is how often we inspect and report issues that are found.
const scanPeriod = 10 * time.Minute

func NewController(clk clock.Clock, kubeClient client.Client, recorder events.Recorder) *Controller {
func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, recorder events.Recorder) *Controller {
return &Controller{
clock: clk,
kubeClient: kubeClient,
recorder: recorder,
lastScanned: cache.New(scanPeriod, 1*time.Minute),
clock: clk,
kubeClient: kubeClient,
cloudProvider: cloudProvider,
recorder: recorder,
lastScanned: cache.New(scanPeriod, 1*time.Minute),
checks: []Check{
NewTermination(clk, kubeClient),
NewNodeShape(),
Expand Down Expand Up @@ -140,10 +144,10 @@ func (c *Controller) checkConsistency(ctx context.Context, nodeClaim *v1.NodeCla
func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("nodeclaim.consistency").
For(&v1.NodeClaim{}).
For(&v1.NodeClaim{}, builder.WithPredicates(nodeclaimutil.IsMangedPredicates(c.cloudProvider))).
Watches(
&corev1.Node{},
nodeclaimutil.NodeEventHandler(c.kubeClient),
nodeclaimutil.NodeEventHandler(c.kubeClient, nodeclaimutil.WithManagedNodeClaimFilter(c.cloudProvider)),
).
WithOptions(controller.Options{MaxConcurrentReconciles: 10}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
Expand Down
Loading

0 comments on commit 4071259

Please sign in to comment.