Skip to content

Commit

Permalink
chore: Limit Node Repair based by Nodepool (#1831)
Browse files Browse the repository at this point in the history
  • Loading branch information
engedaam authored Nov 26, 2024
1 parent c8b7b66 commit 6efbae4
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewControllers(

// The cloud provider must define status conditions for the node repair controller to use to detect unhealthy nodes
if len(cloudProvider.RepairPolicies()) != 0 && options.FromContext(ctx).FeatureGates.NodeRepair {
controllers = append(controllers, health.NewController(kubeClient, cloudProvider, clock))
controllers = append(controllers, health.NewController(kubeClient, cloudProvider, clock, recorder))
}

return controllers
Expand Down
82 changes: 80 additions & 2 deletions pkg/controllers/node/health/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,46 @@ package health

import (
"context"
"fmt"
"time"

"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"

"k8s.io/klog/v2"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/operator/injection"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
)

var allowedUnhealthyPercent = intstr.FromString("20%")

// Controller for the resource
type Controller struct {
clock clock.Clock
recorder events.Recorder
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
}

// NewController constructs a controller instance
func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, clock clock.Clock) *Controller {
func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, clock clock.Clock, recorder events.Recorder) *Controller {
return &Controller{
clock: clock,
recorder: recorder,
kubeClient: kubeClient,
cloudProvider: cloudProvider,
}
Expand All @@ -56,7 +66,7 @@ func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudPr
func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("node.health").
For(&corev1.Node{}).
For(&corev1.Node{}, builder.WithPredicates(nodeutils.IsManagedPredicateFuncs(c.cloudProvider))).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}

Expand All @@ -70,6 +80,29 @@ func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcil
return reconcile.Result{}, nodeutils.IgnoreNodeClaimNotFoundError(err)
}

// If a nodeclaim does has a nodepool label, validate the nodeclaims inside the nodepool are healthy (i.e bellow the allowed threshold)
// In the case of standalone nodeclaim, validate the nodes inside the cluster are healthy before proceeding
// to repair the nodes
nodePoolName, found := nodeClaim.Labels[v1.NodePoolLabelKey]
if found {
nodePoolHealthy, err := c.isNodePoolHealthy(ctx, nodePoolName)
if err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
if !nodePoolHealthy {
return reconcile.Result{}, c.publishNodePoolHealthEvent(ctx, node, nodeClaim, nodePoolName)
}
} else {
clusterHealthy, err := c.isClusterHealthy(ctx)
if err != nil {
return reconcile.Result{}, err
}
if !clusterHealthy {
c.recorder.Publish(NodeRepairBlockedUnmanagedNodeClaim(node, nodeClaim, fmt.Sprintf("more then %s nodes are unhealthy in the cluster", allowedUnhealthyPercent.String()))...)
return reconcile.Result{}, nil
}
}

unhealthyNodeCondition, policyTerminationDuration := c.findUnhealthyConditions(node)
if unhealthyNodeCondition == nil {
return reconcile.Result{}, nil
Expand Down Expand Up @@ -130,3 +163,48 @@ func (c *Controller) annotateTerminationGracePeriod(ctx context.Context, nodeCla

return nil
}

// isNodePoolHealthy checks if the number of unhealthy nodes managed by the given NodePool exceeds the health threshold.
// defined by the cloud provider
// Up to 20% of Nodes may be unhealthy before the NodePool becomes unhealthy (or the nearest whole number, rounding up).
// For example, given a NodePool with three nodes, one may be unhealthy without rendering the NodePool unhealthy, even though that's 33% of the total nodes.
// This is analogous to how minAvailable and maxUnavailable work for PodDisruptionBudgets: https://kubernetes.io/docs/tasks/run-application/configure-pdb/#rounding-logic-when-specifying-percentages.
func (c *Controller) isNodePoolHealthy(ctx context.Context, nodePoolName string) (bool, error) {
nodeList := &corev1.NodeList{}
if err := c.kubeClient.List(ctx, nodeList, client.MatchingLabels(map[string]string{v1.NodePoolLabelKey: nodePoolName})); err != nil {
return false, err
}

return c.isHealthyForNodes(nodeList.Items), nil
}

func (c *Controller) isClusterHealthy(ctx context.Context) (bool, error) {
nodeList := &corev1.NodeList{}
if err := c.kubeClient.List(ctx, nodeList); err != nil {
return false, err
}

return c.isHealthyForNodes(nodeList.Items), nil
}

func (c *Controller) isHealthyForNodes(nodes []corev1.Node) bool {
unhealthyNodeCount := lo.CountBy(nodes, func(node corev1.Node) bool {
_, found := lo.Find(c.cloudProvider.RepairPolicies(), func(policy cloudprovider.RepairPolicy) bool {
nodeCondition := nodeutils.GetCondition(lo.ToPtr(node), policy.ConditionType)
return nodeCondition.Status == policy.ConditionStatus
})
return found
})

threshold := lo.Must(intstr.GetScaledValueFromIntOrPercent(lo.ToPtr(allowedUnhealthyPercent), len(nodes), true))
return unhealthyNodeCount <= threshold
}

func (c *Controller) publishNodePoolHealthEvent(ctx context.Context, node *corev1.Node, nodeClaim *v1.NodeClaim, npName string) error {
np := &v1.NodePool{}
if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: npName}, np); err != nil {
return client.IgnoreNotFound(err)
}
c.recorder.Publish(NodeRepairBlocked(node, nodeClaim, np, fmt.Sprintf("more then %s nodes are unhealthy in the nodepool", allowedUnhealthyPercent.String()))...)
return nil
}
76 changes: 76 additions & 0 deletions pkg/controllers/node/health/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package health

import (
"time"

corev1 "k8s.io/api/core/v1"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/events"
)

func NodeRepairBlocked(node *corev1.Node, nodeClaim *v1.NodeClaim, nodePool *v1.NodePool, reason string) []events.Event {
return []events.Event{
{
InvolvedObject: node,
Type: corev1.EventTypeWarning,
Reason: "NodeRepairBlocked",
Message: reason,
DedupeValues: []string{string(node.UID)},
DedupeTimeout: time.Minute * 15,
},
{
InvolvedObject: node,
Type: corev1.EventTypeWarning,
Reason: "NodeRepairBlocked",
Message: reason,
DedupeValues: []string{string(nodeClaim.UID)},
DedupeTimeout: time.Minute * 15,
},
{
InvolvedObject: node,
Type: corev1.EventTypeWarning,
Reason: "NodeRepairBlocked",
Message: reason,
DedupeValues: []string{string(nodePool.UID)},
DedupeTimeout: time.Minute * 15,
},
}
}

func NodeRepairBlockedUnmanagedNodeClaim(node *corev1.Node, nodeClaim *v1.NodeClaim, reason string) []events.Event {
return []events.Event{
{
InvolvedObject: node,
Type: corev1.EventTypeWarning,
Reason: "NodeRepairBlocked",
Message: reason,
DedupeValues: []string{string(node.UID)},
DedupeTimeout: time.Minute * 15,
},
{
InvolvedObject: node,
Type: corev1.EventTypeWarning,
Reason: "NodeRepairBlocked",
Message: reason,
DedupeValues: []string{string(nodeClaim.UID)},
DedupeTimeout: time.Minute * 15,
},
}
}
56 changes: 55 additions & 1 deletion pkg/controllers/node/health/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var _ = BeforeSuite(func() {
cloudProvider = fake.NewCloudProvider()
recorder = test.NewEventRecorder()
queue = terminator.NewTestingQueue(env.Client, recorder)
healthController = health.NewController(env.Client, cloudProvider, fakeClock)
healthController = health.NewController(env.Client, cloudProvider, fakeClock, recorder)
})

var _ = AfterSuite(func() {
Expand Down Expand Up @@ -271,6 +271,60 @@ var _ = Describe("Node Health", func() {
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.DeletionTimestamp).ToNot(BeNil())
})
It("should ignore unhealthy nodes if more then 20% of the nodes are unhealthy", func() {
ExpectApplied(ctx, env.Client, nodePool)
nodeClaims := []*v1.NodeClaim{}
nodes := []*corev1.Node{}
for i := range 10 {
nodeClaim, node = test.NodeClaimAndNode(v1.NodeClaim{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1.TerminationFinalizer}}})
if i < 3 {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "BadNode",
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Time{Time: fakeClock.Now()},
})
}
node.Labels[v1.NodePoolLabelKey] = nodePool.Name
nodeClaim.Labels[v1.NodePoolLabelKey] = nodePool.Name
nodeClaims = append(nodeClaims, nodeClaim)
nodes = append(nodes, node)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node)
}

fakeClock.Step(60 * time.Minute)

// Determine to delete unhealthy nodes
for i := range 4 {
ExpectObjectReconciled(ctx, env.Client, healthController, nodes[i])
nodeClaim = ExpectExists(ctx, env.Client, nodeClaims[i])
Expect(nodeClaim.DeletionTimestamp).To(BeNil())
}
})
It("should consider round up when there is a low number of nodes for a nodepool", func() {
nodeClaims := []*v1.NodeClaim{}
nodes := []*corev1.Node{}
for i := range 3 {
nodeClaim, node = test.NodeClaimAndNode(v1.NodeClaim{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1.TerminationFinalizer}}})
if i == 0 {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "BadNode",
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Time{Time: fakeClock.Now()},
})
}
node.Labels[v1.NodePoolLabelKey] = nodePool.Name
nodeClaim.Labels[v1.NodePoolLabelKey] = nodePool.Name
nodeClaims = append(nodeClaims, nodeClaim)
nodes = append(nodes, node)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node)
}

fakeClock.Step(60 * time.Minute)
// Determine to delete unhealthy nodes
ExpectObjectReconciled(ctx, env.Client, healthController, nodes[0])
nodeClaim = ExpectExists(ctx, env.Client, nodeClaims[0])
Expect(nodeClaim.DeletionTimestamp).ToNot(BeNil())
})
})
Context("Metrics", func() {
It("should fire a karpenter_nodeclaims_disrupted_total metric when unhealthy", func() {
Expand Down

0 comments on commit 6efbae4

Please sign in to comment.