Skip to content

Commit

Permalink
most of the ring buffer approach
Browse files Browse the repository at this point in the history
  • Loading branch information
rschalo committed Dec 19, 2024
1 parent 0f64abd commit dadbb2e
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 47 deletions.
5 changes: 1 addition & 4 deletions pkg/apis/v1/nodepool_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,14 @@ const (
// ConditionTypeNodeClassReady = "NodeClassReady" condition indicates that underlying nodeClass was resolved and is reporting as Ready
ConditionTypeNodeClassReady = "NodeClassReady"
// TODO
ConditionTypeDegraded = "Degraded"
ConditionTypeStable = "Stable"
)

// NodePoolStatus defines the observed state of NodePool
type NodePoolStatus struct {
// Resources is the list of resources that have been provisioned.
// +optional
Resources v1.ResourceList `json:"resources,omitempty"`
// FailedLaunches tracks the number of times a nodepool failed before being marked degraded
// +optional
FailedLaunches int `json:"failedlaunches,omitempty"`
// Conditions contains signals for health and readiness
// +optional
Conditions []status.Condition `json:"conditions,omitempty"`
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ func NewControllers(
nodepoolreadiness.NewController(kubeClient, cloudProvider),
nodepoolcounter.NewController(kubeClient, cloudProvider, cluster),
nodepoolvalidation.NewController(kubeClient, cloudProvider),
nodepooldegraded.NewController(kubeClient, cloudProvider),
nodepooldegraded.NewController(kubeClient, cloudProvider, cluster),
podevents.NewController(clock, kubeClient, cloudProvider),
nodeclaimconsistency.NewController(clock, kubeClient, cloudProvider, recorder),
nodeclaimlifecycle.NewController(clock, kubeClient, cloudProvider, recorder),
nodeclaimlifecycle.NewController(clock, kubeClient, cloudProvider, recorder, cluster),
nodeclaimgarbagecollection.NewController(clock, kubeClient, cloudProvider),
nodeclaimdisruption.NewController(clock, kubeClient, cloudProvider),
nodeclaimhydration.NewController(kubeClient, cloudProvider),
Expand Down
4 changes: 4 additions & 0 deletions pkg/controllers/metrics/pod/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
ownerSelfLink = "owner"
podHostName = "node"
podNodePool = "nodepool"
podNodePoolDegraded = "nodepool_degraded"
podHostZone = "zone"
podHostArchitecture = "arch"
podHostCapacityType = "capacity_type"
Expand Down Expand Up @@ -106,7 +107,9 @@ var (
Name: "unbound_time_seconds",
Help: "The time from pod creation until the pod is bound.",
},
// []string{podName, podNamespace, podNodePoolDegraded},
[]string{podName, podNamespace},
// podNodePoolDegraded == false
)
// Stage: alpha
PodProvisioningBoundDurationSeconds = opmetrics.NewPrometheusHistogram(
Expand All @@ -129,6 +132,7 @@ var (
Name: "provisioning_unbound_time_seconds",
Help: "The time from when Karpenter first thinks the pod can schedule until it binds. Note: this calculated from a point in memory, not by the pod creation timestamp.",
},
// []string{podName, podNamespace, podNodePoolDegraded},
[]string{podName, podNamespace},
)
// Stage: alpha
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/nodeclaim/garbagecollection/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
nodeclaimgarbagecollection "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/garbagecollection"
nodeclaimlifcycle "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/lifecycle"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/operator/options"
"sigs.k8s.io/karpenter/pkg/test"
Expand All @@ -48,6 +49,7 @@ var ctx context.Context
var nodeClaimController *nodeclaimlifcycle.Controller
var garbageCollectionController *nodeclaimgarbagecollection.Controller
var env *test.Environment
var cluster *state.Cluster
var fakeClock *clock.FakeClock
var cloudProvider *fake.CloudProvider

Expand All @@ -62,8 +64,9 @@ var _ = BeforeSuite(func() {
env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeProviderIDFieldIndexer(ctx)))
ctx = options.ToContext(ctx, test.Options())
cloudProvider = fake.NewCloudProvider()
cluster = state.NewCluster(fakeClock, env.Client, cloudProvider)
garbageCollectionController = nodeclaimgarbagecollection.NewController(fakeClock, env.Client, cloudProvider)
nodeClaimController = nodeclaimlifcycle.NewController(fakeClock, env.Client, cloudProvider, events.NewRecorder(&record.FakeRecorder{}))
nodeClaimController = nodeclaimlifcycle.NewController(fakeClock, env.Client, cloudProvider, events.NewRecorder(&record.FakeRecorder{}), cluster)
})

var _ = AfterSuite(func() {
Expand Down
11 changes: 7 additions & 4 deletions pkg/controllers/nodeclaim/lifecycle/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events"
"sigs.k8s.io/karpenter/pkg/controllers/state"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
Expand All @@ -64,23 +65,25 @@ type Controller struct {
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
recorder events.Recorder
cluster *state.Cluster

launch *Launch
registration *Registration
initialization *Initialization
liveness *Liveness
}

func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, recorder events.Recorder) *Controller {
func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, recorder events.Recorder, cluster *state.Cluster) *Controller {
return &Controller{
kubeClient: kubeClient,
cloudProvider: cloudProvider,
recorder: recorder,
cluster: cluster,

launch: &Launch{kubeClient: kubeClient, cloudProvider: cloudProvider, cache: cache.New(time.Minute, time.Second*10), recorder: recorder},
registration: &Registration{kubeClient: kubeClient},
initialization: &Initialization{kubeClient: kubeClient},
liveness: &Liveness{clock: clk, kubeClient: kubeClient, cloudProvider: cloudProvider},
initialization: &Initialization{kubeClient: kubeClient, cluster: cluster},
liveness: &Liveness{clock: clk, kubeClient: kubeClient, cloudProvider: cloudProvider, cluster: cluster},
}
}

Expand All @@ -89,7 +92,7 @@ func (c *Controller) Register(_ context.Context, m manager.Manager) error {
Named(c.Name()).
For(&v1.NodeClaim{}, builder.WithPredicates(nodeclaimutils.IsManagedPredicateFuncs(c.cloudProvider))).
Watches(&corev1.Node{}, nodeclaimutils.NodeEventHandler(c.kubeClient, c.cloudProvider)).
Watches(&v1.NodePool{}, nodeclaimutils.NodePoolEventHandler(c.kubeClient, c.cloudProvider)).
// Watches(&v1.NodePool{}, nodeclaimutils.NodePoolEventHandler(c.kubeClient, c.cloudProvider)).
WithOptions(controller.Options{
RateLimiter: workqueue.NewTypedMaxOfRateLimiter[reconcile.Request](
// back off until last attempt occurs ~90 seconds before nodeclaim expiration
Expand Down
12 changes: 12 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/scheduling"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
Expand All @@ -37,6 +39,7 @@ import (

type Initialization struct {
kubeClient client.Client
cluster *state.Cluster
}

// Reconcile checks for initialization based on if:
Expand Down Expand Up @@ -83,6 +86,15 @@ func (i *Initialization) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim)
return reconcile.Result{}, err
}
}
nodePoolName, ok := nodeClaim.Labels[v1.NodePoolLabelKey]
if !ok {
return reconcile.Result{}, nil
}
nodePool := &v1.NodePool{}
if err := i.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
i.cluster.NodePoolLaunchesFor(string(nodePool.UID)).Push(1)
log.FromContext(ctx).WithValues("allocatable", node.Status.Allocatable).Info("initialized nodeclaim")
nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeInitialized)
return reconcile.Result{}, nil
Expand Down
32 changes: 4 additions & 28 deletions pkg/controllers/nodeclaim/lifecycle/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -30,18 +28,20 @@ import (

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/metrics"
)

type Liveness struct {
clock clock.Clock
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
cluster *state.Cluster
}

// registrationTTL is a heuristic time that we expect the node to register within
// If we don't see the node within this time, then we should delete the NodeClaim and try again
const registrationTTL = time.Millisecond * 15
const registrationTTL = 15 * time.Minute

// nolint:gocyclo
func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reconcile.Result, error) {
Expand All @@ -57,48 +57,24 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco
if err := l.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
// if we ever succeed registration, reset failures, old nodeclaim can incorrectly reset this
if registered.IsTrue() {
nodePool.Status.FailedLaunches = 0
if err := l.kubeClient.Status().Update(ctx, nodePool); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, client.IgnoreNotFound(err)
}
return reconcile.Result{}, nil
}
// If the Registered statusCondition hasn't gone True during the TTL since we first updated it, we should terminate the NodeClaim
// NOTE: ttl has to be stored and checked in the same place since l.clock can advance after the check causing a race
// If the nodepool is degraded, requeue for the remaining TTL.
if ttl := registrationTTL - l.clock.Since(registered.LastTransitionTime.Time); ttl > 0 {
return reconcile.Result{RequeueAfter: ttl}, nil
}
// Delete the NodeClaim if we believe the NodeClaim won't register since we haven't seen the node
// Here we delete the nodeclaim if the node failed to register, we want to retry against the nodeClaim's nodeClass/nodePool 3x.
// store against a nodepool since nodeclass is not available? nodeclass ref on nodepool, nodepool is 1:1 with nodeclass anyway
stored := nodePool.DeepCopy()
nodePool.Status.FailedLaunches += 1
log.FromContext(ctx).V(1).WithValues("failures", nodePool.Status.FailedLaunches).Info("failed launches so far")
if !equality.Semantic.DeepEqual(stored, nodePool) {
if err := l.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, client.IgnoreNotFound(err)
}
}
log.FromContext(ctx).V(1).WithValues("failures", nodePool.Status.FailedLaunches).Info("somehow passing")

if err := l.kubeClient.Delete(ctx, nodeClaim); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
l.cluster.NodePoolLaunchesFor(string(nodePool.UID)).Push(-1)
log.FromContext(ctx).V(1).WithValues("ttl", registrationTTL).Info("terminating due to registration ttl")
metrics.NodeClaimsDisruptedTotal.Inc(map[string]string{
metrics.ReasonLabel: "liveness",
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
metrics.CapacityTypeLabel: nodeClaim.Labels[v1.CapacityTypeLabelKey],
})

return reconcile.Result{}, nil
}
5 changes: 4 additions & 1 deletion pkg/controllers/nodeclaim/lifecycle/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
nodeclaimlifecycle "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/lifecycle"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/operator/options"
"sigs.k8s.io/karpenter/pkg/test"
Expand All @@ -47,6 +48,7 @@ import (
var ctx context.Context
var nodeClaimController *nodeclaimlifecycle.Controller
var env *test.Environment
var cluster *state.Cluster
var fakeClock *clock.FakeClock
var cloudProvider *fake.CloudProvider

Expand Down Expand Up @@ -74,7 +76,8 @@ var _ = BeforeSuite(func() {
ctx = options.ToContext(ctx, test.Options())

cloudProvider = fake.NewCloudProvider()
nodeClaimController = nodeclaimlifecycle.NewController(fakeClock, env.Client, cloudProvider, events.NewRecorder(&record.FakeRecorder{}))
cluster = state.NewCluster(fakeClock, env.Client, cloudProvider)
nodeClaimController = nodeclaimlifecycle.NewController(fakeClock, env.Client, cloudProvider, events.NewRecorder(&record.FakeRecorder{}), cluster)
})

var _ = AfterSuite(func() {
Expand Down
24 changes: 17 additions & 7 deletions pkg/controllers/nodepool/degraded/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,44 @@ import (
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/operator/injection"
nodepoolutils "sigs.k8s.io/karpenter/pkg/utils/nodepool"
)

type Controller struct {
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
cluster *state.Cluster
}

func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller {
func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster) *Controller {
return &Controller{
kubeClient: kubeClient,
cloudProvider: cloudProvider,
cluster: cluster,
}
}

func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "nodepool.degraded")
stored := nodePool.DeepCopy()
if nodePool.Status.FailedLaunches >= 3 {
nodePool.StatusConditions().SetTrueWithReason(v1.ConditionTypeDegraded, "NodeRegistrationFailures",
"Node registration failing for nodepool, verify cluster networking is configured correctly")
} else {
nodePool.StatusConditions().SetFalse(v1.ConditionTypeDegraded, "NodeLaunchSuccess", "")
nodePool.StatusConditions().SetUnknown(v1.ConditionTypeStable)
score, scored := c.cluster.NodePoolLaunchesFor(string(nodePool.UID)).Evaluate()
if !scored {
// no-op for an evaluation that doesn't exist
return reconcile.Result{RequeueAfter: 1 * time.Minute}, nil
}
if score < 0 {
nodePool.StatusConditions().SetFalse(v1.ConditionTypeStable, "unhealthy", "")
} else if score > 0 {
nodePool.StatusConditions().SetTrue(v1.ConditionTypeStable)
}
if !equality.Semantic.DeepEqual(stored, nodePool) {
if err := c.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
Expand All @@ -64,7 +73,8 @@ func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reco
return reconcile.Result{}, client.IgnoreNotFound(err)
}
}
return reconcile.Result{RequeueAfter: 15 * time.Second}, nil
log.FromContext(ctx).WithValues("score: ", score).Info("end of rec loop")
return reconcile.Result{RequeueAfter: 1 * time.Minute}, nil
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
Expand Down
12 changes: 12 additions & 0 deletions pkg/controllers/state/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"sigs.k8s.io/karpenter/pkg/scheduling"
nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
podutils "sigs.k8s.io/karpenter/pkg/utils/pod"
"sigs.k8s.io/karpenter/pkg/utils/ring"
)

// Cluster maintains cluster state that is often needed but expensive to compute.
Expand All @@ -55,6 +56,7 @@ type Cluster struct {
nodeNameToProviderID map[string]string // node name -> provider id
nodeClaimNameToProviderID map[string]string // node claim name -> provider id
daemonSetPods sync.Map // daemonSet -> existing pod
nodePoolLaunches map[string]*ring.Buffer // nodePoolUID -> recent launch results

podAcks sync.Map // pod namespaced name -> time when Karpenter first saw the pod as pending
podsSchedulingAttempted sync.Map // pod namespaced name -> time when Karpenter tried to schedule a pod
Expand All @@ -81,6 +83,7 @@ func NewCluster(clk clock.Clock, client client.Client, cloudProvider cloudprovid
daemonSetPods: sync.Map{},
nodeNameToProviderID: map[string]string{},
nodeClaimNameToProviderID: map[string]string{},
nodePoolLaunches: map[string]*ring.Buffer{},
podAcks: sync.Map{},
podsSchedulableTimes: sync.Map{},
podsSchedulingAttempted: sync.Map{},
Expand Down Expand Up @@ -676,3 +679,12 @@ func (c *Cluster) triggerConsolidationOnChange(old, new *StateNode) {
return
}
}

func (c *Cluster) NodePoolLaunchesFor(np string) *ring.Buffer {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.nodePoolLaunches[np]; !ok {
c.nodePoolLaunches[np] = ring.New(10)
}
return c.nodePoolLaunches[np]
}
Loading

0 comments on commit dadbb2e

Please sign in to comment.