Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Limit Node Repair based by Nodepool #1831

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewControllers(

// The cloud provider must define status conditions for the node repair controller to use to detect unhealthy nodes
if len(cloudProvider.RepairPolicies()) != 0 && options.FromContext(ctx).FeatureGates.NodeRepair {
controllers = append(controllers, health.NewController(kubeClient, cloudProvider, clock))
controllers = append(controllers, health.NewController(kubeClient, cloudProvider, clock, recorder))
}

return controllers
Expand Down
82 changes: 80 additions & 2 deletions pkg/controllers/node/health/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,46 @@ package health

import (
"context"
"fmt"
"time"

"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"

"k8s.io/klog/v2"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/operator/injection"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
)

var allowedUnhealthyPercent = intstr.FromString("20%")

// Controller for the resource
type Controller struct {
clock clock.Clock
recorder events.Recorder
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
}

// NewController constructs a controller instance
func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, clock clock.Clock) *Controller {
func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, clock clock.Clock, recorder events.Recorder) *Controller {
return &Controller{
clock: clock,
recorder: recorder,
kubeClient: kubeClient,
cloudProvider: cloudProvider,
}
Expand All @@ -56,7 +66,7 @@ func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudPr
func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("node.health").
For(&corev1.Node{}).
For(&corev1.Node{}, builder.WithPredicates(nodeutils.IsManagedPredicateFuncs(c.cloudProvider))).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}

Expand All @@ -70,6 +80,29 @@ func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcil
return reconcile.Result{}, nodeutils.IgnoreNodeClaimNotFoundError(err)
}

// If a nodeclaim does has a nodepool label, validate the nodeclaims inside the nodepool are healthy (i.e bellow the allowed threshold)
// In the case of standalone nodeclaim, validate the nodes inside the cluster are healthy before proceeding
// to repair the nodes
nodePoolName, found := nodeClaim.Labels[v1.NodePoolLabelKey]
if found {
engedaam marked this conversation as resolved.
Show resolved Hide resolved
nodePoolHealthy, err := c.isNodePoolHealthy(ctx, nodePoolName)
if err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
if !nodePoolHealthy {
return reconcile.Result{}, c.publishNodePoolHealthEvent(ctx, node, nodeClaim, nodePoolName)
}
} else {
clusterHealthy, err := c.isClusterHealthy(ctx)
if err != nil {
return reconcile.Result{}, err
}
if !clusterHealthy {
c.recorder.Publish(NodeRepairBlockedUnmanagedNodeClaim(node, nodeClaim, fmt.Sprintf("more then %s nodes are unhealthy in the cluster", allowedUnhealthyPercent.String()))...)
return reconcile.Result{}, nil
}
}

unhealthyNodeCondition, policyTerminationDuration := c.findUnhealthyConditions(node)
if unhealthyNodeCondition == nil {
return reconcile.Result{}, nil
Expand Down Expand Up @@ -130,3 +163,48 @@ func (c *Controller) annotateTerminationGracePeriod(ctx context.Context, nodeCla

return nil
}

// isNodePoolHealthy checks if the number of unhealthy nodes managed by the given NodePool exceeds the health threshold.
// defined by the cloud provider
// Up to 20% of Nodes may be unhealthy before the NodePool becomes unhealthy (or the nearest whole number, rounding up).
// For example, given a NodePool with three nodes, one may be unhealthy without rendering the NodePool unhealthy, even though that's 33% of the total nodes.
// This is analogous to how minAvailable and maxUnavailable work for PodDisruptionBudgets: https://kubernetes.io/docs/tasks/run-application/configure-pdb/#rounding-logic-when-specifying-percentages.
func (c *Controller) isNodePoolHealthy(ctx context.Context, nodePoolName string) (bool, error) {
nodeList := &corev1.NodeList{}
if err := c.kubeClient.List(ctx, nodeList, client.MatchingLabels(map[string]string{v1.NodePoolLabelKey: nodePoolName})); err != nil {
engedaam marked this conversation as resolved.
Show resolved Hide resolved
return false, err
}

return c.isHealthyForNodes(nodeList.Items), nil
}

func (c *Controller) isClusterHealthy(ctx context.Context) (bool, error) {
nodeList := &corev1.NodeList{}
if err := c.kubeClient.List(ctx, nodeList); err != nil {
return false, err
}

return c.isHealthyForNodes(nodeList.Items), nil
}

func (c *Controller) isHealthyForNodes(nodes []corev1.Node) bool {
unhealthyNodeCount := lo.CountBy(nodes, func(node corev1.Node) bool {
_, found := lo.Find(c.cloudProvider.RepairPolicies(), func(policy cloudprovider.RepairPolicy) bool {
nodeCondition := nodeutils.GetCondition(lo.ToPtr(node), policy.ConditionType)
return nodeCondition.Status == policy.ConditionStatus
})
return found
})

threshold := lo.Must(intstr.GetScaledValueFromIntOrPercent(lo.ToPtr(allowedUnhealthyPercent), len(nodes), true))
return unhealthyNodeCount <= threshold
}

func (c *Controller) publishNodePoolHealthEvent(ctx context.Context, node *corev1.Node, nodeClaim *v1.NodeClaim, npName string) error {
np := &v1.NodePool{}
if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: npName}, np); err != nil {
return client.IgnoreNotFound(err)
}
c.recorder.Publish(NodeRepairBlocked(node, nodeClaim, np, fmt.Sprintf("more then %s nodes are unhealthy in the nodepool", allowedUnhealthyPercent.String()))...)
return nil
}
76 changes: 76 additions & 0 deletions pkg/controllers/node/health/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Copyright The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package health

import (
"time"

corev1 "k8s.io/api/core/v1"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/events"
)

func NodeRepairBlocked(node *corev1.Node, nodeClaim *v1.NodeClaim, nodePool *v1.NodePool, reason string) []events.Event {
return []events.Event{
{
InvolvedObject: node,
Type: corev1.EventTypeWarning,
Reason: "NodeRepairBlocked",
Message: reason,
DedupeValues: []string{string(node.UID)},
DedupeTimeout: time.Minute * 15,
},
{
InvolvedObject: node,
Type: corev1.EventTypeWarning,
Reason: "NodeRepairBlocked",
Message: reason,
DedupeValues: []string{string(nodeClaim.UID)},
DedupeTimeout: time.Minute * 15,
},
{
InvolvedObject: node,
Type: corev1.EventTypeWarning,
Reason: "NodeRepairBlocked",
Message: reason,
DedupeValues: []string{string(nodePool.UID)},
DedupeTimeout: time.Minute * 15,
},
}
}

func NodeRepairBlockedUnmanagedNodeClaim(node *corev1.Node, nodeClaim *v1.NodeClaim, reason string) []events.Event {
return []events.Event{
{
InvolvedObject: node,
Type: corev1.EventTypeWarning,
Reason: "NodeRepairBlocked",
Message: reason,
DedupeValues: []string{string(node.UID)},
DedupeTimeout: time.Minute * 15,
},
{
InvolvedObject: node,
Type: corev1.EventTypeWarning,
Reason: "NodeRepairBlocked",
Message: reason,
DedupeValues: []string{string(nodeClaim.UID)},
DedupeTimeout: time.Minute * 15,
},
}
}
56 changes: 55 additions & 1 deletion pkg/controllers/node/health/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var _ = BeforeSuite(func() {
cloudProvider = fake.NewCloudProvider()
recorder = test.NewEventRecorder()
queue = terminator.NewTestingQueue(env.Client, recorder)
healthController = health.NewController(env.Client, cloudProvider, fakeClock)
healthController = health.NewController(env.Client, cloudProvider, fakeClock, recorder)
})

var _ = AfterSuite(func() {
Expand Down Expand Up @@ -271,6 +271,60 @@ var _ = Describe("Node Health", func() {
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.DeletionTimestamp).ToNot(BeNil())
})
It("should ignore unhealthy nodes if more then 20% of the nodes are unhealthy", func() {
ExpectApplied(ctx, env.Client, nodePool)
nodeClaims := []*v1.NodeClaim{}
nodes := []*corev1.Node{}
for i := range 10 {
nodeClaim, node = test.NodeClaimAndNode(v1.NodeClaim{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1.TerminationFinalizer}}})
if i < 3 {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "BadNode",
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Time{Time: fakeClock.Now()},
})
}
node.Labels[v1.NodePoolLabelKey] = nodePool.Name
nodeClaim.Labels[v1.NodePoolLabelKey] = nodePool.Name
nodeClaims = append(nodeClaims, nodeClaim)
nodes = append(nodes, node)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node)
}

fakeClock.Step(60 * time.Minute)

// Determine to delete unhealthy nodes
for i := range 4 {
ExpectObjectReconciled(ctx, env.Client, healthController, nodes[i])
nodeClaim = ExpectExists(ctx, env.Client, nodeClaims[i])
Expect(nodeClaim.DeletionTimestamp).To(BeNil())
}
})
It("should consider round up when there is a low number of nodes for a nodepool", func() {
nodeClaims := []*v1.NodeClaim{}
nodes := []*corev1.Node{}
for i := range 3 {
nodeClaim, node = test.NodeClaimAndNode(v1.NodeClaim{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1.TerminationFinalizer}}})
if i == 0 {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "BadNode",
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Time{Time: fakeClock.Now()},
})
}
node.Labels[v1.NodePoolLabelKey] = nodePool.Name
nodeClaim.Labels[v1.NodePoolLabelKey] = nodePool.Name
nodeClaims = append(nodeClaims, nodeClaim)
nodes = append(nodes, node)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node)
}

fakeClock.Step(60 * time.Minute)
// Determine to delete unhealthy nodes
ExpectObjectReconciled(ctx, env.Client, healthController, nodes[0])
nodeClaim = ExpectExists(ctx, env.Client, nodeClaims[0])
Expect(nodeClaim.DeletionTimestamp).ToNot(BeNil())
})
})
Context("Metrics", func() {
It("should fire a karpenter_nodeclaims_disrupted_total metric when unhealthy", func() {
Expand Down
Loading