Skip to content

Commit

Permalink
Fix idle duration timeout for pod/node batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed Jul 10, 2024
1 parent bcd33e9 commit 2e30a05
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 12 deletions.
35 changes: 29 additions & 6 deletions pkg/controllers/provisioning/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,61 @@ package provisioning

import (
"context"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/sets"

"sigs.k8s.io/karpenter/pkg/operator/options"
)

// Batcher separates a stream of Trigger() calls into windowed slices. The
// window is dynamic and will be extended if additional items are added up to a
// maximum batch duration.
type Batcher struct {
type Batcher[T comparable] struct {
trigger chan struct{}

mu sync.RWMutex
triggeredOnElems sets.Set[T]
}

// NewBatcher is a constructor for the Batcher
func NewBatcher() *Batcher {
return &Batcher{
trigger: make(chan struct{}, 1),
func NewBatcher[T comparable]() *Batcher[T] {
return &Batcher[T]{
trigger: make(chan struct{}, 1),
triggeredOnElems: sets.New[T](),
}
}

// Trigger causes the batcher to start a batching window, or extend the current batching window if it hasn't reached the
// maximum length.
func (b *Batcher) Trigger() {
func (b *Batcher[T]) Trigger(triggeredOn T) {
// Don't trigger if we've already triggered for this element
b.mu.RLock()
if b.triggeredOnElems.Has(triggeredOn) {
b.mu.RUnlock()
return
}
b.mu.RUnlock()
// The trigger is idempotently armed. This statement never blocks
select {
case b.trigger <- struct{}{}:
default:
}
b.mu.Lock()
b.triggeredOnElems.Insert(triggeredOn)
b.mu.Unlock()
}

// Wait starts a batching window and continues waiting as long as it continues receiving triggers within
// the idleDuration, up to the maxDuration
func (b *Batcher) Wait(ctx context.Context) bool {
func (b *Batcher[T]) Wait(ctx context.Context) bool {
// Ensure that we always reset our tracked elements at the end of a Wait() statement
defer func() {
b.mu.Lock()
b.triggeredOnElems.Clear()
b.mu.RUnlock()
}()
select {
case <-b.trigger:
// start the batching window after the first item is received
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *PodController) Reconcile(ctx context.Context, p *v1.Pod) (reconcile.Res
if !pod.IsProvisionable(p) {
return reconcile.Result{}, nil
}
c.provisioner.Trigger()
c.provisioner.Trigger(p.UID)
// Continue to requeue until the pod is no longer provisionable. Pods may
// not be scheduled as expected if new pods are created while nodes are
// coming online. Even if a provisioning loop is successful, the pod may
Expand Down Expand Up @@ -96,7 +96,7 @@ func (c *NodeController) Reconcile(ctx context.Context, n *v1.Node) (reconcile.R
if !lo.Contains(n.Spec.Taints, v1beta1.DisruptionNoScheduleTaint) {
return reconcile.Result{}, nil
}
c.provisioner.Trigger()
c.provisioner.Trigger(n.UID)
// Continue to requeue until the node is no longer provisionable. Pods may
// not be scheduled as expected if new pods are created while nodes are
// coming online. Even if a provisioning loop is successful, the pod may
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func WithReason(reason string) func(*LaunchOptions) {
type Provisioner struct {
cloudProvider cloudprovider.CloudProvider
kubeClient client.Client
batcher *Batcher
batcher *Batcher[types.UID]
volumeTopology *scheduler.VolumeTopology
cluster *state.Cluster
recorder events.Recorder
Expand All @@ -89,7 +89,7 @@ func NewProvisioner(kubeClient client.Client, recorder events.Recorder,
cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster,
) *Provisioner {
p := &Provisioner{
batcher: NewBatcher(),
batcher: NewBatcher[types.UID](),
cloudProvider: cloudProvider,
kubeClient: kubeClient,
volumeTopology: scheduler.NewVolumeTopology(kubeClient),
Expand All @@ -100,8 +100,8 @@ func NewProvisioner(kubeClient client.Client, recorder events.Recorder,
return p
}

func (p *Provisioner) Trigger() {
p.batcher.Trigger()
func (p *Provisioner) Trigger(uid types.UID) {
p.batcher.Trigger(uid)
}

func (p *Provisioner) Register(_ context.Context, m manager.Manager) error {
Expand Down

0 comments on commit 2e30a05

Please sign in to comment.