From d22742d579885a226d144fc6824dc73a5720e5da Mon Sep 17 00:00:00 2001 From: Nick Tran <10810510+njtran@users.noreply.github.com> Date: Fri, 8 Nov 2024 11:42:44 -0800 Subject: [PATCH] feat: add instance type drift for instance types that are no longer discoverable (#1787) --- pkg/controllers/disruption/consolidation.go | 17 +++++ .../disruption/consolidation_test.go | 39 +++++++++++ pkg/controllers/disruption/helpers.go | 4 +- pkg/controllers/disruption/suite_test.go | 24 +++---- pkg/controllers/disruption/types.go | 10 +-- .../disruption/consolidation_test.go | 2 +- pkg/controllers/nodeclaim/disruption/drift.go | 39 ++++++++++- .../nodeclaim/disruption/drift_test.go | 68 ++++++++++++++++--- .../nodeclaim/disruption/suite_test.go | 39 ++++++++++- pkg/controllers/state/statenode.go | 13 +--- 10 files changed, 205 insertions(+), 50 deletions(-) diff --git a/pkg/controllers/disruption/consolidation.go b/pkg/controllers/disruption/consolidation.go index e3539be371..69ec01cad5 100644 --- a/pkg/controllers/disruption/consolidation.go +++ b/pkg/controllers/disruption/consolidation.go @@ -85,6 +85,23 @@ func (c *consolidation) markConsolidated() { // ShouldDisrupt is a predicate used to filter candidates func (c *consolidation) ShouldDisrupt(_ context.Context, cn *Candidate) bool { + // We need the following to know what the price of the instance for price comparison. If one of these doesn't exist, we can't + // compute consolidation decisions for this candidate. + // 1. Instance Type + // 2. Capacity Type + // 3. Zone + if cn.instanceType == nil { + c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("Instance Type %q not found", cn.Labels()[corev1.LabelInstanceTypeStable]))...) + return false + } + if _, ok := cn.Labels()[v1.CapacityTypeLabelKey]; !ok { + c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("Node does not have label %q", v1.CapacityTypeLabelKey))...) + return false + } + if _, ok := cn.Labels()[corev1.LabelTopologyZone]; !ok { + c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("Node does not have label %q", corev1.LabelTopologyZone))...) + return false + } if cn.nodePool.Spec.Disruption.ConsolidateAfter.Duration == nil { c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("NodePool %q has consolidation disabled", cn.nodePool.Name))...) return false diff --git a/pkg/controllers/disruption/consolidation_test.go b/pkg/controllers/disruption/consolidation_test.go index e67813657c..d342a6136f 100644 --- a/pkg/controllers/disruption/consolidation_test.go +++ b/pkg/controllers/disruption/consolidation_test.go @@ -137,6 +137,45 @@ var _ = Describe("Consolidation", func() { // and each of the consolidation mechanisms specifies that this event should be fired Expect(recorder.Calls("Unconsolidatable")).To(Equal(6)) }) + It("should fire an event when a candidate does not have a resolvable instance type", func() { + pod := test.Pod() + delete(nodeClaim.Labels, corev1.LabelInstanceTypeStable) + delete(node.Labels, corev1.LabelInstanceTypeStable) + + ExpectApplied(ctx, env.Client, pod, node, nodeClaim, nodePool) + ExpectManualBinding(ctx, env.Client, pod, node) + + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + ExpectSingletonReconciled(ctx, disruptionController) + // We get four calls since we only care about this since we don't emit for empty node consolidation + Expect(recorder.Calls("Unconsolidatable")).To(Equal(4)) + }) + It("should fire an event when a candidate does not have the capacity type label", func() { + pod := test.Pod() + delete(nodeClaim.Labels, v1.CapacityTypeLabelKey) + delete(node.Labels, v1.CapacityTypeLabelKey) + + ExpectApplied(ctx, env.Client, pod, node, nodeClaim, nodePool) + ExpectManualBinding(ctx, env.Client, pod, node) + + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + ExpectSingletonReconciled(ctx, disruptionController) + // We get four calls since we only care about this since we don't emit for empty node consolidation + Expect(recorder.Calls("Unconsolidatable")).To(Equal(4)) + }) + It("should fire an event when a candidate does not have the zone label", func() { + pod := test.Pod() + delete(nodeClaim.Labels, corev1.LabelTopologyZone) + delete(node.Labels, corev1.LabelTopologyZone) + + ExpectApplied(ctx, env.Client, pod, node, nodeClaim, nodePool) + ExpectManualBinding(ctx, env.Client, pod, node) + + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + ExpectSingletonReconciled(ctx, disruptionController) + // We get four calls since we only care about this since we don't emit for empty node consolidation + Expect(recorder.Calls("Unconsolidatable")).To(Equal(4)) + }) }) Context("Metrics", func() { It("should correctly report eligible nodes", func() { diff --git a/pkg/controllers/disruption/helpers.go b/pkg/controllers/disruption/helpers.go index 9af0db2fc6..84836639f0 100644 --- a/pkg/controllers/disruption/helpers.go +++ b/pkg/controllers/disruption/helpers.go @@ -141,7 +141,7 @@ func instanceTypesAreSubset(lhs []*cloudprovider.InstanceType, rhs []*cloudprovi // GetCandidates returns nodes that appear to be currently deprovisionable based off of their nodePool func GetCandidates(ctx context.Context, cluster *state.Cluster, kubeClient client.Client, recorder events.Recorder, clk clock.Clock, - cloudProvider cloudprovider.CloudProvider, shouldDeprovision CandidateFilter, disruptionClass string, queue *orchestration.Queue, + cloudProvider cloudprovider.CloudProvider, shouldDisrupt CandidateFilter, disruptionClass string, queue *orchestration.Queue, ) ([]*Candidate, error) { nodePoolMap, nodePoolToInstanceTypesMap, err := BuildNodePoolMap(ctx, kubeClient, cloudProvider) if err != nil { @@ -156,7 +156,7 @@ func GetCandidates(ctx context.Context, cluster *state.Cluster, kubeClient clien return cn, e == nil }) // Filter only the valid candidates that we should disrupt - return lo.Filter(candidates, func(c *Candidate, _ int) bool { return shouldDeprovision(ctx, c) }), nil + return lo.Filter(candidates, func(c *Candidate, _ int) bool { return shouldDisrupt(ctx, c) }), nil } // BuildNodePoolMap builds a provName -> nodePool map and a provName -> instanceName -> instance type map diff --git a/pkg/controllers/disruption/suite_test.go b/pkg/controllers/disruption/suite_test.go index 40338a0a48..4b9eba9d3e 100644 --- a/pkg/controllers/disruption/suite_test.go +++ b/pkg/controllers/disruption/suite_test.go @@ -1676,7 +1676,7 @@ var _ = Describe("Candidate Filtering", func() { Expect(err.Error()).To(Equal(fmt.Sprintf("nodepool %q can't be resolved for state node", nodePool.Name))) Expect(recorder.DetectedEvent(fmt.Sprintf("Cannot disrupt Node: NodePool %q not found", nodePool.Name))).To(BeTrue()) }) - It("should not consider candidates that do not have the karpenter.sh/capacity-type label", func() { + It("should consider candidates that do not have the karpenter.sh/capacity-type label", func() { nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -1691,11 +1691,9 @@ var _ = Describe("Candidate Filtering", func() { Expect(cluster.Nodes()).To(HaveLen(1)) _, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal(`state node doesn't have required label "karpenter.sh/capacity-type"`)) - Expect(recorder.DetectedEvent(`Cannot disrupt Node: state node doesn't have required label "karpenter.sh/capacity-type"`)).To(BeTrue()) + Expect(err).ToNot(HaveOccurred()) }) - It("should not consider candidates that do not have the topology.kubernetes.io/zone label", func() { + It("should consider candidates that do not have the topology.kubernetes.io/zone label", func() { nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -1710,11 +1708,9 @@ var _ = Describe("Candidate Filtering", func() { Expect(cluster.Nodes()).To(HaveLen(1)) _, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal(`state node doesn't have required label "topology.kubernetes.io/zone"`)) - Expect(recorder.DetectedEvent(`Cannot disrupt Node: state node doesn't have required label "topology.kubernetes.io/zone"`)).To(BeTrue()) + Expect(err).ToNot(HaveOccurred()) }) - It("should not consider candidates that do not have the node.kubernetes.io/instance-type label", func() { + It("should consider candidates that do not have the node.kubernetes.io/instance-type label", func() { nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -1729,11 +1725,9 @@ var _ = Describe("Candidate Filtering", func() { Expect(cluster.Nodes()).To(HaveLen(1)) _, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal(`state node doesn't have required label "node.kubernetes.io/instance-type"`)) - Expect(recorder.DetectedEvent(`Cannot disrupt Node: state node doesn't have required label "node.kubernetes.io/instance-type"`)).To(BeTrue()) + Expect(err).ToNot((HaveOccurred())) }) - It("should not consider candidates that have an instance type that cannot be resolved", func() { + It("should consider candidates that have an instance type that cannot be resolved", func() { nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -1752,9 +1746,7 @@ var _ = Describe("Candidate Filtering", func() { Expect(cluster.Nodes()).To(HaveLen(1)) _, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal(fmt.Sprintf("instance type %q can't be resolved", mostExpensiveInstance.Name))) - Expect(recorder.DetectedEvent(fmt.Sprintf("Cannot disrupt Node: Instance Type %q not found", mostExpensiveInstance.Name))).To(BeTrue()) + Expect(err).ToNot(HaveOccurred()) }) It("should not consider candidates that are actively being processed in the queue", func() { nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{ diff --git a/pkg/controllers/disruption/types.go b/pkg/controllers/disruption/types.go index a2b53b86dc..b30f8d7aac 100644 --- a/pkg/controllers/disruption/types.go +++ b/pkg/controllers/disruption/types.go @@ -90,12 +90,8 @@ func NewCandidate(ctx context.Context, kubeClient client.Client, recorder events recorder.Publish(disruptionevents.Blocked(node.Node, node.NodeClaim, fmt.Sprintf("NodePool %q not found", nodePoolName))...) return nil, fmt.Errorf("nodepool %q can't be resolved for state node", nodePoolName) } + // We only care if instanceType in non-empty consolidation to do price-comparison. instanceType := instanceTypeMap[node.Labels()[corev1.LabelInstanceTypeStable]] - // skip any candidates that we can't determine the instance of - if instanceType == nil { - recorder.Publish(disruptionevents.Blocked(node.Node, node.NodeClaim, fmt.Sprintf("Instance Type %q not found", node.Labels()[corev1.LabelInstanceTypeStable]))...) - return nil, fmt.Errorf("instance type %q can't be resolved", node.Labels()[corev1.LabelInstanceTypeStable]) - } if pods, err = node.ValidatePodsDisruptable(ctx, kubeClient, pdbs); err != nil { // if the disruption class is not eventual or the nodepool has no TerminationGracePeriod, block disruption of pods // if the error is anything but a PodBlockEvictionError, also block disruption of pods @@ -149,8 +145,8 @@ func (c Command) String() string { fmt.Fprint(&buf, ", ") } fmt.Fprintf(&buf, "%s", old.Name()) - fmt.Fprintf(&buf, "/%s", old.instanceType.Name) - fmt.Fprintf(&buf, "/%s", old.capacityType) + fmt.Fprintf(&buf, "/%s", old.Labels()[corev1.LabelInstanceTypeStable]) + fmt.Fprintf(&buf, "/%s", old.Labels()[v1.CapacityTypeLabelKey]) } if len(c.replacements) == 0 { return buf.String() diff --git a/pkg/controllers/nodeclaim/disruption/consolidation_test.go b/pkg/controllers/nodeclaim/disruption/consolidation_test.go index 62d76ad573..821874b11e 100644 --- a/pkg/controllers/nodeclaim/disruption/consolidation_test.go +++ b/pkg/controllers/nodeclaim/disruption/consolidation_test.go @@ -40,7 +40,7 @@ var _ = Describe("Underutilized", func() { ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1.NodePoolLabelKey: nodePool.Name, - corev1.LabelInstanceTypeStable: "default-instance-type", // need the instance type for the cluster state update + corev1.LabelInstanceTypeStable: it.Name, }, }, }) diff --git a/pkg/controllers/nodeclaim/disruption/drift.go b/pkg/controllers/nodeclaim/disruption/drift.go index 4030aa2cb3..879433b19a 100644 --- a/pkg/controllers/nodeclaim/disruption/drift.go +++ b/pkg/controllers/nodeclaim/disruption/drift.go @@ -25,14 +25,17 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" + corev1 "k8s.io/api/core/v1" + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/scheduling" ) const ( - NodePoolDrifted cloudprovider.DriftReason = "NodePoolDrifted" - RequirementsDrifted cloudprovider.DriftReason = "RequirementsDrifted" + NodePoolDrifted cloudprovider.DriftReason = "NodePoolDrifted" + RequirementsDrifted cloudprovider.DriftReason = "RequirementsDrifted" + InstanceTypeNotFound cloudprovider.DriftReason = "InstanceTypeNotFound" ) // Drift is a nodeclaim sub-controller that adds or removes status conditions on drifted nodeclaims @@ -81,6 +84,15 @@ func (d *Drift) isDrifted(ctx context.Context, nodePool *v1.NodePool, nodeClaim }); reason != "" { return reason, nil } + // Include instance type checking separate from the other two to reduce the amount of times we grab the instance types. + its, err := d.cloudProvider.GetInstanceTypes(ctx, nodePool) + if err != nil { + return "", err + } + if reason := instanceTypeNotFound(its, nodeClaim); reason != "" { + return reason, nil + } + // Then check if it's drifted from the cloud provider side. driftedReason, err := d.cloudProvider.IsDrifted(ctx, nodeClaim) if err != nil { return "", err @@ -88,6 +100,29 @@ func (d *Drift) isDrifted(ctx context.Context, nodePool *v1.NodePool, nodeClaim return driftedReason, nil } +// InstanceType Offerings should return the full list of allowed instance types, even if they're temporarily +// unavailable. If we can't find the instance type that the NodeClaim is running with, or if we don't find +// a compatible offering for that given instance type (zone and capacity type being the only added in requirements), +// then we consider it drifted, since we don't have the data for this instance type anymore. +// Note this is different than RequirementDrift, where we only compare if the NodePool is compatible with the NodeClaim +// based on the requirements constructed from the NodeClaim Labels. +// InstanceTypeNotFoundDrift is computed by looking directly at the list of instance types and comparing the NodeClaim +// to the instance types and its offerings. +// 1. The NodeClaim doesn't have the instance type label +// 2. The NodeClaim has an instance type that doesn't exist in the cloudprovider instance types +// 3. There are no offerings that match the requirements +func instanceTypeNotFound(its []*cloudprovider.InstanceType, nodeClaim *v1.NodeClaim) cloudprovider.DriftReason { + it, ok := lo.Find(its, func(it *cloudprovider.InstanceType) bool { + return it.Name == nodeClaim.Labels[corev1.LabelInstanceTypeStable] + }) + // Offerings should in most cases only have zone and capacity type. This likely shouldn't differ + // across cloud providers. + if !ok || !it.Offerings.HasCompatible(scheduling.NewLabelRequirements(nodeClaim.Labels)) { + return InstanceTypeNotFound + } + return "" +} + // Eligible fields for drift are described in the docs // https://karpenter.sh/docs/concepts/deprovisioning/#drift func areStaticFieldsDrifted(nodePool *v1.NodePool, nodeClaim *v1.NodeClaim) cloudprovider.DriftReason { diff --git a/pkg/controllers/nodeclaim/disruption/drift_test.go b/pkg/controllers/nodeclaim/disruption/drift_test.go index 65fc57db5b..e33f940466 100644 --- a/pkg/controllers/nodeclaim/disruption/drift_test.go +++ b/pkg/controllers/nodeclaim/disruption/drift_test.go @@ -27,8 +27,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/disruption" "sigs.k8s.io/karpenter/pkg/controllers/nodepool/hash" + "sigs.k8s.io/karpenter/pkg/scheduling" "sigs.k8s.io/karpenter/pkg/test" . "sigs.k8s.io/karpenter/pkg/test/expectations" ) @@ -43,7 +45,9 @@ var _ = Describe("Drift", func() { ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1.NodePoolLabelKey: nodePool.Name, - corev1.LabelInstanceTypeStable: test.RandomName(), + corev1.LabelInstanceTypeStable: it.Name, + corev1.LabelTopologyZone: "test-zone-1a", + v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, }, Annotations: map[string]string{ v1.NodePoolHashAnnotationKey: nodePool.Hash(), @@ -61,6 +65,47 @@ var _ = Describe("Drift", func() { nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeTrue()) }) + It("should detect stale instance type drift if the instance type label doesn't exist", func() { + delete(nodeClaim.Labels, corev1.LabelInstanceTypeStable) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim) + ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeTrue()) + }) + It("should detect stale instance type drift if the instance type doesn't exist", func() { + cp.InstanceTypes = nil + ExpectApplied(ctx, env.Client, nodePool, nodeClaim) + ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeTrue()) + }) + It("should detect stale instance type drift if the instance type offerings doesn't exist", func() { + cp.InstanceTypes = lo.Map(cp.InstanceTypes, func(it *cloudprovider.InstanceType, _ int) *cloudprovider.InstanceType { + it.Offerings = cloudprovider.Offerings{} + return it + }) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim) + ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeTrue()) + }) + It("should detect stale instance type drift if the instance type offerings aren't compatible with the nodeclaim", func() { + cp.InstanceTypes = lo.Map(cp.InstanceTypes, func(it *cloudprovider.InstanceType, _ int) *cloudprovider.InstanceType { + if it.Name == nodeClaim.Labels[corev1.LabelInstanceTypeStable] { + newLabels := lo.Keys(nodeClaim.Labels) + it.Requirements = scheduling.NewLabelRequirements(map[string]string{newLabels[0]: test.RandomName()}) + } + return it + }) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim) + ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeTrue()) + }) It("should detect static drift before cloud provider drift", func() { cp.Drifted = "drifted" nodePool.Annotations = lo.Assign(nodePool.Annotations, map[string]string{ @@ -141,6 +186,7 @@ var _ = Describe("Drift", func() { DescribeTable("", func(oldNodePoolReq []v1.NodeSelectorRequirementWithMinValues, newNodePoolReq []v1.NodeSelectorRequirementWithMinValues, labels map[string]string, drifted bool) { cp.Drifted = "" + nodePool.Spec.Template.Spec.Requirements = oldNodePoolReq nodeClaim.Labels = lo.Assign(nodeClaim.Labels, labels) @@ -257,18 +303,18 @@ var _ = Describe("Drift", func() { true, ), Entry( - "should not return drifted if a nodeClaim is grater then node requirement", + "should not return drifted if a nodeClaim is greater then node requirement", []v1.NodeSelectorRequirementWithMinValues{ {NodeSelectorRequirement: corev1.NodeSelectorRequirement{Key: v1.CapacityTypeLabelKey, Operator: corev1.NodeSelectorOpIn, Values: []string{v1.CapacityTypeOnDemand}}}, - {NodeSelectorRequirement: corev1.NodeSelectorRequirement{Key: corev1.LabelInstanceTypeStable, Operator: corev1.NodeSelectorOpGt, Values: []string{"2"}}}, + {NodeSelectorRequirement: corev1.NodeSelectorRequirement{Key: "test-label", Operator: corev1.NodeSelectorOpGt, Values: []string{"2"}}}, }, []v1.NodeSelectorRequirementWithMinValues{ {NodeSelectorRequirement: corev1.NodeSelectorRequirement{Key: v1.CapacityTypeLabelKey, Operator: corev1.NodeSelectorOpIn, Values: []string{v1.CapacityTypeOnDemand}}}, - {NodeSelectorRequirement: corev1.NodeSelectorRequirement{Key: corev1.LabelInstanceTypeStable, Operator: corev1.NodeSelectorOpGt, Values: []string{"10"}}}, + {NodeSelectorRequirement: corev1.NodeSelectorRequirement{Key: "test-label", Operator: corev1.NodeSelectorOpGt, Values: []string{"10"}}}, }, map[string]string{ - v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, - corev1.LabelInstanceTypeStable: "5", + v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, + "test-label": "5", }, true, ), @@ -276,15 +322,15 @@ var _ = Describe("Drift", func() { "should not return drifted if a nodeClaim is less then node requirement", []v1.NodeSelectorRequirementWithMinValues{ {NodeSelectorRequirement: corev1.NodeSelectorRequirement{Key: v1.CapacityTypeLabelKey, Operator: corev1.NodeSelectorOpIn, Values: []string{v1.CapacityTypeOnDemand}}}, - {NodeSelectorRequirement: corev1.NodeSelectorRequirement{Key: corev1.LabelInstanceTypeStable, Operator: corev1.NodeSelectorOpLt, Values: []string{"5"}}}, + {NodeSelectorRequirement: corev1.NodeSelectorRequirement{Key: "test-label", Operator: corev1.NodeSelectorOpLt, Values: []string{"5"}}}, }, []v1.NodeSelectorRequirementWithMinValues{ {NodeSelectorRequirement: corev1.NodeSelectorRequirement{Key: v1.CapacityTypeLabelKey, Operator: corev1.NodeSelectorOpIn, Values: []string{v1.CapacityTypeOnDemand}}}, - {NodeSelectorRequirement: corev1.NodeSelectorRequirement{Key: corev1.LabelInstanceTypeStable, Operator: corev1.NodeSelectorOpLt, Values: []string{"1"}}}, + {NodeSelectorRequirement: corev1.NodeSelectorRequirement{Key: "test-label", Operator: corev1.NodeSelectorOpLt, Values: []string{"1"}}}, }, map[string]string{ - v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, - corev1.LabelInstanceTypeStable: "2", + v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, + "test-label": "2", }, true, ), @@ -303,7 +349,7 @@ var _ = Describe("Drift", func() { ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1.NodePoolLabelKey: nodePool.Name, - corev1.LabelInstanceTypeStable: test.RandomName(), + corev1.LabelInstanceTypeStable: it.Name, v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelOSStable: string(corev1.Windows), }, diff --git a/pkg/controllers/nodeclaim/disruption/suite_test.go b/pkg/controllers/nodeclaim/disruption/suite_test.go index 29d361a76a..0e4fe6d19e 100644 --- a/pkg/controllers/nodeclaim/disruption/suite_test.go +++ b/pkg/controllers/nodeclaim/disruption/suite_test.go @@ -24,16 +24,20 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" clock "k8s.io/utils/clock/testing" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/karpenter/pkg/apis" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/cloudprovider/fake" nodeclaimdisruption "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/disruption" "sigs.k8s.io/karpenter/pkg/operator/options" + "sigs.k8s.io/karpenter/pkg/scheduling" "sigs.k8s.io/karpenter/pkg/test" . "sigs.k8s.io/karpenter/pkg/test/expectations" "sigs.k8s.io/karpenter/pkg/test/v1alpha1" @@ -45,6 +49,7 @@ var nodeClaimDisruptionController *nodeclaimdisruption.Controller var env *test.Environment var fakeClock *clock.FakeClock var cp *fake.CloudProvider +var it *cloudprovider.InstanceType func TestAPIs(t *testing.T) { ctx = TestContextWithLogger(t) @@ -69,6 +74,35 @@ var _ = AfterSuite(func() { }) var _ = BeforeEach(func() { + resources := corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("100Gi"), + } + it = fake.NewInstanceType(fake.InstanceTypeOptions{ + Name: test.RandomName(), + Architecture: "arch", + Resources: resources, + OperatingSystems: sets.New(string(corev1.Linux)), + Offerings: []cloudprovider.Offering{ + { + Requirements: scheduling.NewLabelRequirements(map[string]string{ + v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, + corev1.LabelTopologyZone: "test-zone-1a", + }), + Price: fake.PriceFromResources(resources), + Available: true, + }, + { + Requirements: scheduling.NewLabelRequirements(map[string]string{ + v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, + corev1.LabelTopologyZone: "test-zone-1a", + }), + Price: fake.PriceFromResources(resources), + Available: true, + }, + }, + }) + cp.InstanceTypes = append(cp.InstanceTypes, it) ctx = options.ToContext(ctx, test.Options()) fakeClock.SetTime(time.Now()) }) @@ -87,7 +121,10 @@ var _ = Describe("Disruption", func() { nodePool = test.NodePool() nodeClaim, node = test.NodeClaimAndNode(v1.NodeClaim{ ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{v1.NodePoolLabelKey: nodePool.Name}, + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + corev1.LabelInstanceTypeStable: it.Name, + }, }, }) // set the lastPodEvent to 5 minutes in the past diff --git a/pkg/controllers/state/statenode.go b/pkg/controllers/state/statenode.go index 7d86eee50c..08c8b51694 100644 --- a/pkg/controllers/state/statenode.go +++ b/pkg/controllers/state/statenode.go @@ -193,16 +193,9 @@ func (in *StateNode) ValidateNodeDisruptable(ctx context.Context, kubeClient cli if in.Annotations()[v1.DoNotDisruptAnnotationKey] == "true" { return fmt.Errorf("disruption is blocked through the %q annotation", v1.DoNotDisruptAnnotationKey) } - // check whether the node has all the labels we need - for _, label := range []string{ - v1.CapacityTypeLabelKey, - corev1.LabelTopologyZone, - corev1.LabelInstanceTypeStable, - v1.NodePoolLabelKey, - } { - if _, ok := in.Labels()[label]; !ok { - return fmt.Errorf("state node doesn't have required label %q", label) - } + // check whether the node has the NodePool label + if _, ok := in.Labels()[v1.NodePoolLabelKey]; !ok { + return fmt.Errorf("state node doesn't have required label %q", v1.NodePoolLabelKey) } return nil }