diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index f7caafc463..cb1f9b12e1 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -98,7 +98,7 @@ func NewControllers( // The cloud provider must define status conditions for the node repair controller to use to detect unhealthy nodes if len(cloudProvider.RepairPolicies()) != 0 && options.FromContext(ctx).FeatureGates.NodeRepair { - controllers = append(controllers, health.NewController(kubeClient, cloudProvider, clock)) + controllers = append(controllers, health.NewController(kubeClient, cloudProvider, clock, recorder)) } return controllers diff --git a/pkg/controllers/node/health/controller.go b/pkg/controllers/node/health/controller.go index dd4258b0fd..0d26266e99 100644 --- a/pkg/controllers/node/health/controller.go +++ b/pkg/controllers/node/health/controller.go @@ -18,13 +18,18 @@ package health import ( "context" + "fmt" "time" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/klog/v2" "k8s.io/utils/clock" controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -32,22 +37,27 @@ import ( v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/cloudprovider" + "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/metrics" "sigs.k8s.io/karpenter/pkg/operator/injection" nodeutils "sigs.k8s.io/karpenter/pkg/utils/node" ) +var allowedUnhealthyPercent = intstr.FromString("20%") + // Controller for the resource type Controller struct { clock clock.Clock + recorder events.Recorder kubeClient client.Client cloudProvider cloudprovider.CloudProvider } // NewController constructs a controller instance -func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, clock clock.Clock) *Controller { +func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, clock clock.Clock, recorder events.Recorder) *Controller { return &Controller{ clock: clock, + recorder: recorder, kubeClient: kubeClient, cloudProvider: cloudProvider, } @@ -56,7 +66,7 @@ func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudPr func (c *Controller) Register(_ context.Context, m manager.Manager) error { return controllerruntime.NewControllerManagedBy(m). Named("node.health"). - For(&corev1.Node{}). + For(&corev1.Node{}, builder.WithPredicates(nodeutils.IsManagedPredicateFuncs(c.cloudProvider))). Complete(reconcile.AsReconciler(m.GetClient(), c)) } @@ -70,6 +80,29 @@ func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcil return reconcile.Result{}, nodeutils.IgnoreNodeClaimNotFoundError(err) } + // If a nodeclaim does has a nodepool label, validate the nodeclaims inside the nodepool are healthy (i.e bellow the allowed threshold) + // In the case of standalone nodeclaim, validate the nodes inside the cluster are healthy before proceeding + // to repair the nodes + nodePoolName, found := nodeClaim.Labels[v1.NodePoolLabelKey] + if found { + nodePoolHealthy, err := c.isNodePoolHealthy(ctx, nodePoolName) + if err != nil { + return reconcile.Result{}, client.IgnoreNotFound(err) + } + if !nodePoolHealthy { + return reconcile.Result{}, c.publishNodePoolHealthEvent(ctx, node, nodeClaim, nodePoolName) + } + } else { + clusterHealthy, err := c.isClusterHealthy(ctx) + if err != nil { + return reconcile.Result{}, err + } + if !clusterHealthy { + c.recorder.Publish(NodeRepairBlockedUnmanagedNodeClaim(node, nodeClaim, fmt.Sprintf("more then %s nodes are unhealthy in the cluster", allowedUnhealthyPercent.String()))...) + return reconcile.Result{}, nil + } + } + unhealthyNodeCondition, policyTerminationDuration := c.findUnhealthyConditions(node) if unhealthyNodeCondition == nil { return reconcile.Result{}, nil @@ -130,3 +163,48 @@ func (c *Controller) annotateTerminationGracePeriod(ctx context.Context, nodeCla return nil } + +// isNodePoolHealthy checks if the number of unhealthy nodes managed by the given NodePool exceeds the health threshold. +// defined by the cloud provider +// Up to 20% of Nodes may be unhealthy before the NodePool becomes unhealthy (or the nearest whole number, rounding up). +// For example, given a NodePool with three nodes, one may be unhealthy without rendering the NodePool unhealthy, even though that's 33% of the total nodes. +// This is analogous to how minAvailable and maxUnavailable work for PodDisruptionBudgets: https://kubernetes.io/docs/tasks/run-application/configure-pdb/#rounding-logic-when-specifying-percentages. +func (c *Controller) isNodePoolHealthy(ctx context.Context, nodePoolName string) (bool, error) { + nodeList := &corev1.NodeList{} + if err := c.kubeClient.List(ctx, nodeList, client.MatchingLabels(map[string]string{v1.NodePoolLabelKey: nodePoolName})); err != nil { + return false, err + } + + return c.isHealthyForNodes(nodeList.Items), nil +} + +func (c *Controller) isClusterHealthy(ctx context.Context) (bool, error) { + nodeList := &corev1.NodeList{} + if err := c.kubeClient.List(ctx, nodeList); err != nil { + return false, err + } + + return c.isHealthyForNodes(nodeList.Items), nil +} + +func (c *Controller) isHealthyForNodes(nodes []corev1.Node) bool { + unhealthyNodeCount := lo.CountBy(nodes, func(node corev1.Node) bool { + _, found := lo.Find(c.cloudProvider.RepairPolicies(), func(policy cloudprovider.RepairPolicy) bool { + nodeCondition := nodeutils.GetCondition(lo.ToPtr(node), policy.ConditionType) + return nodeCondition.Status == policy.ConditionStatus + }) + return found + }) + + threshold := lo.Must(intstr.GetScaledValueFromIntOrPercent(lo.ToPtr(allowedUnhealthyPercent), len(nodes), true)) + return unhealthyNodeCount <= threshold +} + +func (c *Controller) publishNodePoolHealthEvent(ctx context.Context, node *corev1.Node, nodeClaim *v1.NodeClaim, npName string) error { + np := &v1.NodePool{} + if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: npName}, np); err != nil { + return client.IgnoreNotFound(err) + } + c.recorder.Publish(NodeRepairBlocked(node, nodeClaim, np, fmt.Sprintf("more then %s nodes are unhealthy in the nodepool", allowedUnhealthyPercent.String()))...) + return nil +} diff --git a/pkg/controllers/node/health/events.go b/pkg/controllers/node/health/events.go new file mode 100644 index 0000000000..8dfddaf7ca --- /dev/null +++ b/pkg/controllers/node/health/events.go @@ -0,0 +1,76 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package health + +import ( + "time" + + corev1 "k8s.io/api/core/v1" + + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/events" +) + +func NodeRepairBlocked(node *corev1.Node, nodeClaim *v1.NodeClaim, nodePool *v1.NodePool, reason string) []events.Event { + return []events.Event{ + { + InvolvedObject: node, + Type: corev1.EventTypeWarning, + Reason: "NodeRepairBlocked", + Message: reason, + DedupeValues: []string{string(node.UID)}, + DedupeTimeout: time.Minute * 15, + }, + { + InvolvedObject: node, + Type: corev1.EventTypeWarning, + Reason: "NodeRepairBlocked", + Message: reason, + DedupeValues: []string{string(nodeClaim.UID)}, + DedupeTimeout: time.Minute * 15, + }, + { + InvolvedObject: node, + Type: corev1.EventTypeWarning, + Reason: "NodeRepairBlocked", + Message: reason, + DedupeValues: []string{string(nodePool.UID)}, + DedupeTimeout: time.Minute * 15, + }, + } +} + +func NodeRepairBlockedUnmanagedNodeClaim(node *corev1.Node, nodeClaim *v1.NodeClaim, reason string) []events.Event { + return []events.Event{ + { + InvolvedObject: node, + Type: corev1.EventTypeWarning, + Reason: "NodeRepairBlocked", + Message: reason, + DedupeValues: []string{string(node.UID)}, + DedupeTimeout: time.Minute * 15, + }, + { + InvolvedObject: node, + Type: corev1.EventTypeWarning, + Reason: "NodeRepairBlocked", + Message: reason, + DedupeValues: []string{string(nodeClaim.UID)}, + DedupeTimeout: time.Minute * 15, + }, + } +} diff --git a/pkg/controllers/node/health/suite_test.go b/pkg/controllers/node/health/suite_test.go index ee8d46b01d..77c0823c5d 100644 --- a/pkg/controllers/node/health/suite_test.go +++ b/pkg/controllers/node/health/suite_test.go @@ -66,7 +66,7 @@ var _ = BeforeSuite(func() { cloudProvider = fake.NewCloudProvider() recorder = test.NewEventRecorder() queue = terminator.NewTestingQueue(env.Client, recorder) - healthController = health.NewController(env.Client, cloudProvider, fakeClock) + healthController = health.NewController(env.Client, cloudProvider, fakeClock, recorder) }) var _ = AfterSuite(func() { @@ -271,6 +271,60 @@ var _ = Describe("Node Health", func() { nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) Expect(nodeClaim.DeletionTimestamp).ToNot(BeNil()) }) + It("should ignore unhealthy nodes if more then 20% of the nodes are unhealthy", func() { + ExpectApplied(ctx, env.Client, nodePool) + nodeClaims := []*v1.NodeClaim{} + nodes := []*corev1.Node{} + for i := range 10 { + nodeClaim, node = test.NodeClaimAndNode(v1.NodeClaim{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1.TerminationFinalizer}}}) + if i < 3 { + node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ + Type: "BadNode", + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Time{Time: fakeClock.Now()}, + }) + } + node.Labels[v1.NodePoolLabelKey] = nodePool.Name + nodeClaim.Labels[v1.NodePoolLabelKey] = nodePool.Name + nodeClaims = append(nodeClaims, nodeClaim) + nodes = append(nodes, node) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node) + } + + fakeClock.Step(60 * time.Minute) + + // Determine to delete unhealthy nodes + for i := range 4 { + ExpectObjectReconciled(ctx, env.Client, healthController, nodes[i]) + nodeClaim = ExpectExists(ctx, env.Client, nodeClaims[i]) + Expect(nodeClaim.DeletionTimestamp).To(BeNil()) + } + }) + It("should consider round up when there is a low number of nodes for a nodepool", func() { + nodeClaims := []*v1.NodeClaim{} + nodes := []*corev1.Node{} + for i := range 3 { + nodeClaim, node = test.NodeClaimAndNode(v1.NodeClaim{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1.TerminationFinalizer}}}) + if i == 0 { + node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ + Type: "BadNode", + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Time{Time: fakeClock.Now()}, + }) + } + node.Labels[v1.NodePoolLabelKey] = nodePool.Name + nodeClaim.Labels[v1.NodePoolLabelKey] = nodePool.Name + nodeClaims = append(nodeClaims, nodeClaim) + nodes = append(nodes, node) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node) + } + + fakeClock.Step(60 * time.Minute) + // Determine to delete unhealthy nodes + ExpectObjectReconciled(ctx, env.Client, healthController, nodes[0]) + nodeClaim = ExpectExists(ctx, env.Client, nodeClaims[0]) + Expect(nodeClaim.DeletionTimestamp).ToNot(BeNil()) + }) }) Context("Metrics", func() { It("should fire a karpenter_nodeclaims_disrupted_total metric when unhealthy", func() {