Skip to content

Commit

Permalink
Limit node repair based by nodepool
Browse files Browse the repository at this point in the history
  • Loading branch information
engedaam committed Nov 25, 2024
1 parent c221747 commit bc5346f
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/v1/nodepool_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
ConditionTypeValidationSucceeded = "ValidationSucceeded"
// ConditionTypeNodeClassReady = "NodeClassReady" condition indicates that underlying nodeClass was resolved and is reporting as Ready
ConditionTypeNodeClassReady = "NodeClassReady"
// ConditionTypeUnhealthy = "Unhealthy" condition indicates when the nodepool has more 20% of nodes in an unhealthy state
ConditionTypeUnhealthy = "Unhealthy"
)

// NodePoolStatus defines the observed state of NodePool
Expand Down
83 changes: 83 additions & 0 deletions pkg/controllers/node/health/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ package health

import (
"context"
"fmt"
"time"

"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"

"k8s.io/klog/v2"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand All @@ -37,6 +42,8 @@ import (
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
)

var allowedUnhealthyPercent = intstr.FromString("20%")

// Controller for the resource
type Controller struct {
clock clock.Clock
Expand Down Expand Up @@ -70,6 +77,26 @@ func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcil
return reconcile.Result{}, nodeutils.IgnoreNodeClaimNotFoundError(err)
}

nodePoolName, found := nodeClaim.Labels[v1.NodePoolLabelKey]
if found {
nodePoolHealthy, err := c.isNodePoolHealthy(ctx, nodePoolName)
if err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
if !nodePoolHealthy {
return reconcile.Result{RequeueAfter: time.Minute}, nil
}
} else {
clusterHealthy, err := c.isClusterHealthy(ctx)
if err != nil {
return reconcile.Result{}, err
}
if !clusterHealthy {
log.FromContext(ctx).V(1).Info(fmt.Sprintf("more then %s nodes are unhealthy", allowedUnhealthyPercent.String()))
return reconcile.Result{RequeueAfter: time.Minute}, nil
}
}

unhealthyNodeCondition, policyTerminationDuration := c.findUnhealthyConditions(node)
if unhealthyNodeCondition == nil {
return reconcile.Result{}, nil
Expand Down Expand Up @@ -130,3 +157,59 @@ func (c *Controller) annotateTerminationGracePeriod(ctx context.Context, nodeCla

return nil
}

// isNodePoolHealthy checks if the number of unhealthy nodes managed by the given NodePool exceeds the health threshold.
// defined by the cloud provider
// Up to 20% of Nodes may be unhealthy before the NodePool becomes unhealthy (or the nearest whole number, rounding up).
// For example, given a NodePool with three nodes, one may be unhealthy without rendering the NodePool unhealthy, even though that's 33% of the total nodes.
// This is analogous to how minAvailable and maxUnavailable work for PodDisruptionBudgets: https://kubernetes.io/docs/tasks/run-application/configure-pdb/#rounding-logic-when-specifying-percentages.
func (c *Controller) isNodePoolHealthy(ctx context.Context, nodePoolName string) (bool, error) {
nodeList := &corev1.NodeList{}
if err := c.kubeClient.List(ctx, nodeList, client.MatchingLabels(map[string]string{v1.NodePoolLabelKey: nodePoolName})); err != nil {
return false, err
}
nodePool := &v1.NodePool{}
if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil {
return false, err
}
stored := nodePool.DeepCopy()

healthy := c.validateHealthyCloudProviderHealthCondition(nodeList.Items)
if !healthy {
nodePool.StatusConditions().SetTrueWithReason(v1.ConditionTypeUnhealthy, "Unhealthy", fmt.Sprintf("more then %s nodes are unhealthy", allowedUnhealthyPercent.String()))
} else {
nodePool.StatusConditions().Clear(v1.ConditionTypeUnhealthy)

Check failure on line 181 in pkg/controllers/node/health/controller.go

View workflow job for this annotation

GitHub Actions / presubmit (1.25.x)

Error return value of `(github.com/awslabs/operatorpkg/status.ConditionSet).Clear` is not checked (errcheck)

Check failure on line 181 in pkg/controllers/node/health/controller.go

View workflow job for this annotation

GitHub Actions / presubmit (1.26.x)

Error return value of `(github.com/awslabs/operatorpkg/status.ConditionSet).Clear` is not checked (errcheck)

Check failure on line 181 in pkg/controllers/node/health/controller.go

View workflow job for this annotation

GitHub Actions / presubmit (1.27.x)

Error return value of `(github.com/awslabs/operatorpkg/status.ConditionSet).Clear` is not checked (errcheck)

Check failure on line 181 in pkg/controllers/node/health/controller.go

View workflow job for this annotation

GitHub Actions / presubmit (1.28.x)

Error return value of `(github.com/awslabs/operatorpkg/status.ConditionSet).Clear` is not checked (errcheck)

Check failure on line 181 in pkg/controllers/node/health/controller.go

View workflow job for this annotation

GitHub Actions / presubmit (1.29.x)

Error return value of `(github.com/awslabs/operatorpkg/status.ConditionSet).Clear` is not checked (errcheck)

Check failure on line 181 in pkg/controllers/node/health/controller.go

View workflow job for this annotation

GitHub Actions / presubmit (1.30.x)

Error return value of `(github.com/awslabs/operatorpkg/status.ConditionSet).Clear` is not checked (errcheck)

Check failure on line 181 in pkg/controllers/node/health/controller.go

View workflow job for this annotation

GitHub Actions / presubmit (1.31.x)

Error return value of `(github.com/awslabs/operatorpkg/status.ConditionSet).Clear` is not checked (errcheck)
}

if !equality.Semantic.DeepEqual(stored, nodePool) {
if err := c.kubeClient.Patch(ctx, nodePool, client.MergeFrom(stored)); err != nil {
return false, err
}
}

return true, nil
}

func (c *Controller) isClusterHealthy(ctx context.Context) (bool, error) {
nodeList := &corev1.NodeList{}
if err := c.kubeClient.List(ctx, nodeList); err != nil {
return false, err
}

return c.validateHealthyCloudProviderHealthCondition(nodeList.Items), nil
}

func (c *Controller) validateHealthyCloudProviderHealthCondition(nodes []corev1.Node) bool {
for _, policy := range c.cloudProvider.RepairPolicies() {
unhealthyNodeCount := lo.CountBy(nodes, func(node corev1.Node) bool {
nodeCondition := nodeutils.GetCondition(lo.ToPtr(node), policy.ConditionType)
return nodeCondition.Status == policy.ConditionStatus
})

threshold := lo.Must(intstr.GetScaledValueFromIntOrPercent(lo.ToPtr(allowedUnhealthyPercent), len(nodes), true))
if unhealthyNodeCount > threshold {
return false
}
}
return true
}
55 changes: 55 additions & 0 deletions pkg/controllers/node/health/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,61 @@ var _ = Describe("Node Health", func() {
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.DeletionTimestamp).ToNot(BeNil())
})
It("should ignore unhealthy nodes if more then 20% of the nodes are unhealthy", func() {
ExpectApplied(ctx, env.Client, nodePool)
nodeClaims := []*v1.NodeClaim{}
nodes := []*corev1.Node{}
for i := range 10 {
nodeClaim, node = test.NodeClaimAndNode(v1.NodeClaim{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1.TerminationFinalizer}}})
if i < 3 {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "BadNode",
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Time{Time: fakeClock.Now()},
})
}
node.Labels[v1.NodePoolLabelKey] = nodePool.Name
nodeClaim.Labels[v1.NodePoolLabelKey] = nodePool.Name
nodeClaims = append(nodeClaims, nodeClaim)
nodes = append(nodes, node)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node)
}

fakeClock.Step(60 * time.Minute)

// Determine to delete unhealthy nodes
for i := range 4 {
res := ExpectObjectReconciled(ctx, env.Client, healthController, nodes[i])
nodeClaim = ExpectExists(ctx, env.Client, nodeClaims[i])
Expect(nodeClaim.DeletionTimestamp).To(BeNil())
Expect(res.RequeueAfter).To(BeNumerically("~", time.Minute*1, time.Second))
}
})
It("should consider round up when there is a low number of nodes for a nodepool", func() {
nodeClaims := []*v1.NodeClaim{}
nodes := []*corev1.Node{}
for i := range 3 {
nodeClaim, node = test.NodeClaimAndNode(v1.NodeClaim{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1.TerminationFinalizer}}})
if i == 0 {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "BadNode",
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Time{Time: fakeClock.Now()},
})
}
node.Labels[v1.NodePoolLabelKey] = nodePool.Name
nodeClaim.Labels[v1.NodePoolLabelKey] = nodePool.Name
nodeClaims = append(nodeClaims, nodeClaim)
nodes = append(nodes, node)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node)
}

fakeClock.Step(60 * time.Minute)
// Determine to delete unhealthy nodes
ExpectObjectReconciled(ctx, env.Client, healthController, nodes[0])
nodeClaim = ExpectExists(ctx, env.Client, nodeClaims[0])
Expect(nodeClaim.DeletionTimestamp).ToNot(BeNil())
})
})
Context("Metrics", func() {
It("should fire a karpenter_nodeclaims_disrupted_total metric when unhealthy", func() {
Expand Down

0 comments on commit bc5346f

Please sign in to comment.