Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add consolidation policies for WhenCheaper, WhenUnderutilizedOrCheaper #1429

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions kwok/charts/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,15 @@ spec:
pattern: ^(([0-9]+(s|m|h))+)|(Never)$
type: string
consolidationPolicy:
default: WhenUnderutilized
default: WhenUnderutilizedOrCheaper
description: |-
ConsolidationPolicy describes which nodes Karpenter can disrupt through its consolidation
algorithm. This policy defaults to "WhenUnderutilized" if not specified
algorithm. This policy defaults to "WhenUnderutilizedOrCheaper" if not specified
enum:
- WhenEmpty
- WhenUnderutilized
- WhenCheaper
- WhenUnderutilizedOrCheaper
type: string
expireAfter:
default: 720h
Expand Down
6 changes: 4 additions & 2 deletions pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,15 @@ spec:
pattern: ^(([0-9]+(s|m|h))+)|(Never)$
type: string
consolidationPolicy:
default: WhenUnderutilized
default: WhenUnderutilizedOrCheaper
description: |-
ConsolidationPolicy describes which nodes Karpenter can disrupt through its consolidation
algorithm. This policy defaults to "WhenUnderutilized" if not specified
algorithm. This policy defaults to "WhenUnderutilizedOrCheaper" if not specified
enum:
- WhenEmpty
- WhenUnderutilized
- WhenCheaper
- WhenUnderutilizedOrCheaper
type: string
expireAfter:
default: 720h
Expand Down
15 changes: 8 additions & 7 deletions pkg/apis/v1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ const (

// Karpenter specific annotations
const (
DoNotDisruptAnnotationKey = apis.Group + "/do-not-disrupt"
ProviderCompatabilityAnnotationKey = apis.CompatabilityGroup + "/provider"
ManagedByAnnotationKey = apis.Group + "/managed-by"
NodePoolHashAnnotationKey = apis.Group + "/nodepool-hash"
NodePoolHashVersionAnnotationKey = apis.Group + "/nodepool-hash-version"
KubeletCompatabilityAnnotationKey = apis.CompatabilityGroup + "/v1beta1-kubelet-conversion"
NodeClaimTerminationTimestampAnnotationKey = apis.Group + "/nodeclaim-termination-timestamp"
DoNotDisruptAnnotationKey = apis.Group + "/do-not-disrupt"
ProviderCompatabilityAnnotationKey = apis.CompatabilityGroup + "/provider"
ManagedByAnnotationKey = apis.Group + "/managed-by"
NodePoolHashAnnotationKey = apis.Group + "/nodepool-hash"
NodePoolHashVersionAnnotationKey = apis.Group + "/nodepool-hash-version"
KubeletCompatabilityAnnotationKey = apis.CompatabilityGroup + "/v1beta1-kubelet-conversion"
NodeClaimTerminationTimestampAnnotationKey = apis.Group + "/nodeclaim-termination-timestamp"
ConsolidationPolicyCompatabilityAnnotationKey = apis.CompatabilityGroup + "/v1beta1-consolidation-policy-conversion"
)

// Karpenter specific finalizers
Expand Down
12 changes: 7 additions & 5 deletions pkg/apis/v1/nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ type Disruption struct {
// +optional
ConsolidateAfter *NillableDuration `json:"consolidateAfter,omitempty"`
// ConsolidationPolicy describes which nodes Karpenter can disrupt through its consolidation
// algorithm. This policy defaults to "WhenUnderutilized" if not specified
// +kubebuilder:default:="WhenUnderutilized"
// +kubebuilder:validation:Enum:={WhenEmpty,WhenUnderutilized}
// algorithm. This policy defaults to "WhenUnderutilizedOrCheaper" if not specified
// +kubebuilder:default:="WhenUnderutilizedOrCheaper"
// +kubebuilder:validation:Enum:={WhenEmpty,WhenUnderutilized,WhenCheaper,WhenUnderutilizedOrCheaper}
// +optional
ConsolidationPolicy ConsolidationPolicy `json:"consolidationPolicy,omitempty"`
// ExpireAfter is the duration the controller will wait
Expand Down Expand Up @@ -137,8 +137,10 @@ type Budget struct {
type ConsolidationPolicy string

const (
ConsolidationPolicyWhenEmpty ConsolidationPolicy = "WhenEmpty"
ConsolidationPolicyWhenUnderutilized ConsolidationPolicy = "WhenUnderutilized"
ConsolidationPolicyWhenEmpty ConsolidationPolicy = "WhenEmpty"
ConsolidationPolicyWhenUnderutilized ConsolidationPolicy = "WhenUnderutilized"
ConsolidationPolicyWhenCheaper ConsolidationPolicy = "WhenCheaper"
ConsolidationPolicyWhenUnderutilizedOrCheaper ConsolidationPolicy = "WhenUnderutilizedOrCheaper"
)

// DisruptionReason defines valid reasons for disruption budgets.
Expand Down
44 changes: 34 additions & 10 deletions pkg/apis/v1/nodepool_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,30 @@ func (in *NodePool) ConvertTo(ctx context.Context, to apis.Convertible) error {
// Convert v1 status
v1beta1NP.Status.Resources = in.Status.Resources
v1beta1NP.Status.Conditions = in.Status.Conditions
return in.Spec.convertTo(ctx, &v1beta1NP.Spec, in.Annotations[KubeletCompatabilityAnnotationKey])
if v1beta1NP.Annotations == nil {
v1beta1NP.Annotations = map[string]string{}
}
return in.Spec.convertTo(ctx, &v1beta1NP.Spec, v1beta1NP.Annotations)
}

func (in *NodePoolSpec) convertTo(ctx context.Context, v1beta1np *v1beta1.NodePoolSpec, kubeletAnnotation string) error {
func (in *NodePoolSpec) convertTo(ctx context.Context, v1beta1np *v1beta1.NodePoolSpec, annotations map[string]string) error {
v1beta1np.Weight = in.Weight
v1beta1np.Limits = v1beta1.Limits(in.Limits)
in.Disruption.convertTo(&v1beta1np.Disruption)
return in.Template.convertTo(ctx, &v1beta1np.Template, kubeletAnnotation)
annotations[ConsolidationPolicyCompatabilityAnnotationKey] = string(in.Disruption.ConsolidationPolicy)
return in.Template.convertTo(ctx, &v1beta1np.Template, annotations[KubeletCompatabilityAnnotationKey])
}

func (in *Disruption) convertTo(v1beta1np *v1beta1.Disruption) {
v1beta1np.ConsolidateAfter = (*v1beta1.NillableDuration)(in.ConsolidateAfter)
v1beta1np.ConsolidationPolicy = v1beta1.ConsolidationPolicy(in.ConsolidationPolicy)
switch in.ConsolidationPolicy {
case ConsolidationPolicyWhenUnderutilizedOrCheaper:
fallthrough
case ConsolidationPolicyWhenCheaper:
v1beta1np.ConsolidationPolicy = v1beta1.ConsolidationPolicyWhenUnderutilized
default:
v1beta1np.ConsolidationPolicy = v1beta1.ConsolidationPolicy(in.ConsolidationPolicy)
}
v1beta1np.ExpireAfter = v1beta1.NillableDuration(in.ExpireAfter)
v1beta1np.Budgets = lo.Map(in.Budgets, func(v1budget Budget, _ int) v1beta1.Budget {
return v1beta1.Budget{
Expand Down Expand Up @@ -123,24 +134,37 @@ func (in *NodePool) ConvertFrom(ctx context.Context, v1beta1np apis.Convertible)
in.Status.Resources = v1beta1NP.Status.Resources
in.Status.Conditions = v1beta1NP.Status.Conditions

kubeletAnnotation, err := in.Spec.convertFrom(ctx, &v1beta1NP.Spec)
kubeletAnnotation, err := in.Spec.convertFrom(ctx, &v1beta1NP.Spec, in.Annotations[ConsolidationPolicyCompatabilityAnnotationKey])
if err != nil {
return err
}
in.Annotations = lo.Assign(in.Annotations, map[string]string{KubeletCompatabilityAnnotationKey: kubeletAnnotation})
in.Annotations = lo.Assign(in.Annotations, map[string]string{KubeletCompatabilityAnnotationKey: kubeletAnnotation, ConsolidationPolicyCompatabilityAnnotationKey: string(in.Spec.Disruption.ConsolidationPolicy)})
return nil
}

func (in *NodePoolSpec) convertFrom(ctx context.Context, v1beta1np *v1beta1.NodePoolSpec) (string, error) {
func (in *NodePoolSpec) convertFrom(ctx context.Context, v1beta1np *v1beta1.NodePoolSpec, consolidationPolicy string) (string, error) {
in.Weight = v1beta1np.Weight
in.Limits = Limits(v1beta1np.Limits)
in.Disruption.convertFrom(&v1beta1np.Disruption)
in.Disruption.convertFrom(&v1beta1np.Disruption, consolidationPolicy)
return in.Template.convertFrom(ctx, &v1beta1np.Template)
}

func (in *Disruption) convertFrom(v1beta1np *v1beta1.Disruption) {
func (in *Disruption) convertFrom(v1beta1np *v1beta1.Disruption, consolidationPolicy string) {
in.ConsolidateAfter = (*NillableDuration)(v1beta1np.ConsolidateAfter)
in.ConsolidationPolicy = ConsolidationPolicy(v1beta1np.ConsolidationPolicy)
if consolidationPolicy != "" {
switch consolidationPolicy {
case string(ConsolidationPolicyWhenUnderutilizedOrCheaper):
in.ConsolidationPolicy = ConsolidationPolicyWhenUnderutilizedOrCheaper
case string(ConsolidationPolicyWhenUnderutilized):
in.ConsolidationPolicy = ConsolidationPolicyWhenUnderutilized
case string(ConsolidationPolicyWhenCheaper):
in.ConsolidationPolicy = ConsolidationPolicyWhenCheaper
case string(ConsolidationPolicyWhenEmpty):
in.ConsolidationPolicy = ConsolidationPolicyWhenEmpty
}
} else {
in.ConsolidationPolicy = ConsolidationPolicy(v1beta1np.ConsolidationPolicy)
}
in.ExpireAfter = NillableDuration(v1beta1np.ExpireAfter)
in.Budgets = lo.Map(v1beta1np.Budgets, func(v1beta1budget v1beta1.Budget, _ int) Budget {
return Budget{
Expand Down
14 changes: 12 additions & 2 deletions pkg/apis/v1/nodepool_conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var _ = Describe("Convert V1 to V1beta1 NodePool API", func() {
It("should convert v1 nodepool metadata", func() {
v1nodepool.ObjectMeta = test.ObjectMeta()
Expect(v1nodepool.ConvertFrom(ctx, v1beta1nodepool)).To(Succeed())
v1beta1nodepool.Annotations = map[string]string{KubeletCompatabilityAnnotationKey: "null"}
v1beta1nodepool.Annotations = map[string]string{KubeletCompatabilityAnnotationKey: "null", ConsolidationPolicyCompatabilityAnnotationKey: ""}
Expect(v1beta1nodepool.ObjectMeta).To(BeEquivalentTo(v1nodepool.ObjectMeta))
})
Context("NodePool Spec", func() {
Expand Down Expand Up @@ -262,6 +262,11 @@ var _ = Describe("Convert V1 to V1beta1 NodePool API", func() {
Expect(v1beta1nodepool.Spec.Disruption.Budgets[i].Reasons).To(BeEquivalentTo(expected))
}
})
It("should convert v1 nodepool consolidationPolicy", func() {
v1nodepool.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenCheaper
Expect(v1nodepool.ConvertTo(ctx, v1beta1nodepool)).To(Succeed())
Expect(v1beta1nodepool.Spec.Disruption.ConsolidationPolicy).To(Equal(v1beta1.ConsolidationPolicyWhenUnderutilized))
})
})
})
})
Expand Down Expand Up @@ -320,7 +325,7 @@ var _ = Describe("Convert V1beta1 to V1 NodePool API", func() {
It("should convert v1beta1 nodepool metadata", func() {
v1beta1nodepool.ObjectMeta = test.ObjectMeta()
Expect(v1nodepool.ConvertFrom(ctx, v1beta1nodepool)).To(Succeed())
v1beta1nodepool.Annotations = map[string]string{KubeletCompatabilityAnnotationKey: "null"}
v1beta1nodepool.Annotations = map[string]string{KubeletCompatabilityAnnotationKey: "null", ConsolidationPolicyCompatabilityAnnotationKey: ""}
Expect(v1nodepool.ObjectMeta).To(BeEquivalentTo(v1beta1nodepool.ObjectMeta))
})
Context("NodePool Spec", func() {
Expand Down Expand Up @@ -535,6 +540,11 @@ var _ = Describe("Convert V1beta1 to V1 NodePool API", func() {
Expect(v1nodepool.Spec.Disruption.Budgets[i].Reasons).To(BeEquivalentTo(expected))
}
})
It("should convert v1beta1 nodepool consolidationPolicy", func() {
v1beta1nodepool.Annotations = map[string]string{ConsolidationPolicyCompatabilityAnnotationKey: string(ConsolidationPolicyWhenCheaper)}
Expect(v1nodepool.ConvertFrom(ctx, v1beta1nodepool)).To(Succeed())
Expect(v1nodepool.Spec.Disruption.ConsolidationPolicy).To(Equal(ConsolidationPolicyWhenCheaper))
})
})
})
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ func (c *consolidation) markConsolidated() {

// ShouldDisrupt is a predicate used to filter candidates
func (c *consolidation) ShouldDisrupt(_ context.Context, cn *Candidate) bool {
// If we don't have the "WhenUnderutilized" policy set, we should not do any of the consolidation methods, but
// If we have "WhenEmpty" policy set, we should not do any of the consolidation methods, but
// we should also not fire an event here to users since this can be confusing when the field on the NodePool
// is named "consolidationPolicy"
if cn.nodePool.Spec.Disruption.ConsolidationPolicy != v1.ConsolidationPolicyWhenUnderutilized {
if cn.nodePool.Spec.Disruption.ConsolidationPolicy == v1.ConsolidationPolicyWhenEmpty {
return false
}
if cn.nodePool.Spec.Disruption.ConsolidateAfter != nil && cn.nodePool.Spec.Disruption.ConsolidateAfter.Duration == nil {
Expand Down
15 changes: 8 additions & 7 deletions pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (

var _ = Describe("Consolidation", func() {
var nodePool *v1.NodePool

var nodeClaim, spotNodeClaim *v1.NodeClaim
var node, spotNode *corev1.Node
var labels = map[string]string{
Expand All @@ -59,7 +60,7 @@ var _ = Describe("Consolidation", func() {
nodePool = test.NodePool(v1.NodePool{
Spec: v1.NodePoolSpec{
Disruption: v1.Disruption{
ConsolidationPolicy: v1.ConsolidationPolicyWhenUnderutilized,
ConsolidationPolicy: v1.ConsolidationPolicyWhenUnderutilizedOrCheaper,
// Disrupt away!
Budgets: []v1.Budget{{
Nodes: "100%",
Expand Down Expand Up @@ -382,7 +383,7 @@ var _ = Describe("Consolidation", func() {
nps := test.NodePools(10, v1.NodePool{
Spec: v1.NodePoolSpec{
Disruption: v1.Disruption{
ConsolidationPolicy: v1.ConsolidationPolicyWhenUnderutilized,
ConsolidationPolicy: v1.ConsolidationPolicyWhenUnderutilizedOrCheaper,
Budgets: []v1.Budget{{
// 1/2 of 3 nodes == 1.5 nodes. This should round up to 2.
Nodes: "50%",
Expand Down Expand Up @@ -447,7 +448,7 @@ var _ = Describe("Consolidation", func() {
nps := test.NodePools(10, v1.NodePool{
Spec: v1.NodePoolSpec{
Disruption: v1.Disruption{
ConsolidationPolicy: v1.ConsolidationPolicyWhenUnderutilized,
ConsolidationPolicy: v1.ConsolidationPolicyWhenUnderutilizedOrCheaper,
Budgets: []v1.Budget{{
Nodes: "100%",
}},
Expand Down Expand Up @@ -511,7 +512,7 @@ var _ = Describe("Consolidation", func() {
nps := test.NodePools(10, v1.NodePool{
Spec: v1.NodePoolSpec{
Disruption: v1.Disruption{
ConsolidationPolicy: v1.ConsolidationPolicyWhenUnderutilized,
ConsolidationPolicy: v1.ConsolidationPolicyWhenUnderutilizedOrCheaper,
Budgets: []v1.Budget{{
Nodes: "0%",
}},
Expand Down Expand Up @@ -602,7 +603,7 @@ var _ = Describe("Consolidation", func() {
nps := test.NodePools(10, v1.NodePool{
Spec: v1.NodePoolSpec{
Disruption: v1.Disruption{
ConsolidationPolicy: v1.ConsolidationPolicyWhenUnderutilized,
ConsolidationPolicy: v1.ConsolidationPolicyWhenUnderutilizedOrCheaper,
Budgets: []v1.Budget{{
Nodes: "0%",
}},
Expand Down Expand Up @@ -693,7 +694,7 @@ var _ = Describe("Consolidation", func() {
nps := test.NodePools(10, v1.NodePool{
Spec: v1.NodePoolSpec{
Disruption: v1.Disruption{
ConsolidationPolicy: v1.ConsolidationPolicyWhenUnderutilized,
ConsolidationPolicy: v1.ConsolidationPolicyWhenUnderutilizedOrCheaper,
Budgets: []v1.Budget{{
Nodes: "0%",
}},
Expand Down Expand Up @@ -784,7 +785,7 @@ var _ = Describe("Consolidation", func() {
nps := test.NodePools(10, v1.NodePool{
Spec: v1.NodePoolSpec{
Disruption: v1.Disruption{
ConsolidationPolicy: v1.ConsolidationPolicyWhenUnderutilized,
ConsolidationPolicy: v1.ConsolidationPolicyWhenUnderutilizedOrCheaper,
Budgets: []v1.Budget{{
Nodes: "0%",
}},
Expand Down
9 changes: 9 additions & 0 deletions pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func NewMultiNodeConsolidation(consolidation consolidation) *MultiNodeConsolidat
return &MultiNodeConsolidation{consolidation: consolidation}
}

// ComputeCommand generates a disruption command given candidates
// nolint:gocyclo
func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]map[v1.DisruptionReason]int, candidates ...*Candidate) (Command, scheduling.Results, error) {
if m.IsConsolidated() {
return Command{}, scheduling.Results{}, nil
Expand All @@ -60,6 +62,13 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
disruptableCandidates := make([]*Candidate, 0, len(candidates))
constrainedByBudgets := false
for _, candidate := range candidates {
switch candidate.nodePool.Spec.Disruption.ConsolidationPolicy {
case v1.ConsolidationPolicyWhenEmpty:
continue
case v1.ConsolidationPolicyWhenCheaper:
continue
}

// If there's disruptions allowed for the candidate's nodepool,
// add it to the list of candidates, and decrement the budget.
if disruptionBudgetMapping[candidate.nodePool.Name][v1.DisruptionReasonUnderutilized] == 0 {
Expand Down
7 changes: 7 additions & 0 deletions pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
constrainedByBudgets := false
// binary search to find the maximum number of NodeClaims we can terminate
for i, candidate := range candidates {
switch candidate.nodePool.Spec.Disruption.ConsolidationPolicy {
case v1.ConsolidationPolicyWhenEmpty:
continue
case v1.ConsolidationPolicyWhenUnderutilized:
continue
}

// If the disruption budget doesn't allow this candidate to be disrupted,
// continue to the next candidate. We don't need to decrement any budget
// counter since single node consolidation commands can only have one candidate.
Expand Down
3 changes: 1 addition & 2 deletions pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,6 @@ var _ = Describe("Disruption Taints", func() {
Expect(node.Spec.Taints).ToNot(ContainElement(v1.DisruptionNoScheduleTaint))
})
It("should add and remove taints from NodeClaims that fail to disrupt", func() {
nodePool.Spec.Disruption.ConsolidationPolicy = v1.ConsolidationPolicyWhenUnderutilized
pod := test.Pod(test.PodOptions{
ResourceRequirements: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
Expand Down Expand Up @@ -1759,7 +1758,7 @@ var _ = Describe("Metrics", func() {
nodePool = test.NodePool(v1.NodePool{
Spec: v1.NodePoolSpec{
Disruption: v1.Disruption{
ConsolidationPolicy: v1.ConsolidationPolicyWhenUnderutilized,
ConsolidationPolicy: v1.ConsolidationPolicyWhenUnderutilizedOrCheaper,
// Disrupt away!
Budgets: []v1.Budget{{
Nodes: "100%",
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (v *Validation) ValidateCandidates(ctx context.Context, candidates ...*Cand

// ShouldDisrupt is a predicate used to filter candidates
func (v *Validation) ShouldDisrupt(_ context.Context, c *Candidate) bool {
return c.nodePool.Spec.Disruption.ConsolidationPolicy == v1.ConsolidationPolicyWhenUnderutilized
return c.nodePool.Spec.Disruption.ConsolidationPolicy != v1.ConsolidationPolicyWhenEmpty
}

// ValidateCommand validates a command for a Method
Expand Down
3 changes: 3 additions & 0 deletions pkg/test/nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ func NodePool(overrides ...v1.NodePool) *v1.NodePool {
if override.Spec.Template.Spec.Requirements == nil {
override.Spec.Template.Spec.Requirements = []v1.NodeSelectorRequirementWithMinValues{}
}
if override.Spec.Disruption.ConsolidationPolicy == "" {
override.Spec.Disruption.ConsolidationPolicy = v1.ConsolidationPolicyWhenUnderutilizedOrCheaper
}
if override.Status.Conditions == nil {
override.StatusConditions().SetTrue(v1.ConditionTypeValidationSucceeded)
override.StatusConditions().SetTrue(v1.ConditionTypeNodeClassReady)
Expand Down
Loading