diff --git a/pkg/controllers/provisioning/batcher.go b/pkg/controllers/provisioning/batcher.go index 9d6cbcc792..056ef4aabf 100644 --- a/pkg/controllers/provisioning/batcher.go +++ b/pkg/controllers/provisioning/batcher.go @@ -18,38 +18,61 @@ package provisioning import ( "context" + "sync" "time" + "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/karpenter/pkg/operator/options" ) // Batcher separates a stream of Trigger() calls into windowed slices. The // window is dynamic and will be extended if additional items are added up to a // maximum batch duration. -type Batcher struct { +type Batcher[T comparable] struct { trigger chan struct{} + + mu sync.RWMutex + triggeredOnElems sets.Set[T] } // NewBatcher is a constructor for the Batcher -func NewBatcher() *Batcher { - return &Batcher{ - trigger: make(chan struct{}, 1), +func NewBatcher[T comparable]() *Batcher[T] { + return &Batcher[T]{ + trigger: make(chan struct{}, 1), + triggeredOnElems: sets.New[T](), } } // Trigger causes the batcher to start a batching window, or extend the current batching window if it hasn't reached the // maximum length. -func (b *Batcher) Trigger() { +func (b *Batcher[T]) Trigger(triggeredOn T) { + // Don't trigger if we've already triggered for this element + b.mu.RLock() + if b.triggeredOnElems.Has(triggeredOn) { + b.mu.RUnlock() + return + } + b.mu.RUnlock() // The trigger is idempotently armed. This statement never blocks select { case b.trigger <- struct{}{}: default: } + b.mu.Lock() + b.triggeredOnElems.Insert(triggeredOn) + b.mu.Unlock() } // Wait starts a batching window and continues waiting as long as it continues receiving triggers within // the idleDuration, up to the maxDuration -func (b *Batcher) Wait(ctx context.Context) bool { +func (b *Batcher[T]) Wait(ctx context.Context) bool { + // Ensure that we always reset our tracked elements at the end of a Wait() statement + defer func() { + b.mu.Lock() + b.triggeredOnElems.Clear() + b.mu.Unlock() + }() select { case <-b.trigger: // start the batching window after the first item is received diff --git a/pkg/controllers/provisioning/controller.go b/pkg/controllers/provisioning/controller.go index eb71009f8e..9e2a6706ec 100644 --- a/pkg/controllers/provisioning/controller.go +++ b/pkg/controllers/provisioning/controller.go @@ -55,7 +55,7 @@ func (c *PodController) Reconcile(ctx context.Context, p *v1.Pod) (reconcile.Res if !pod.IsProvisionable(p) { return reconcile.Result{}, nil } - c.provisioner.Trigger() + c.provisioner.Trigger(p.UID) // Continue to requeue until the pod is no longer provisionable. Pods may // not be scheduled as expected if new pods are created while nodes are // coming online. Even if a provisioning loop is successful, the pod may @@ -96,7 +96,7 @@ func (c *NodeController) Reconcile(ctx context.Context, n *v1.Node) (reconcile.R if !lo.Contains(n.Spec.Taints, v1beta1.DisruptionNoScheduleTaint) { return reconcile.Result{}, nil } - c.provisioner.Trigger() + c.provisioner.Trigger(n.UID) // Continue to requeue until the node is no longer provisionable. Pods may // not be scheduled as expected if new pods are created while nodes are // coming online. Even if a provisioning loop is successful, the pod may diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index fda2d8761d..770c1110dc 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -78,7 +78,7 @@ func WithReason(reason string) func(*LaunchOptions) { type Provisioner struct { cloudProvider cloudprovider.CloudProvider kubeClient client.Client - batcher *Batcher + batcher *Batcher[types.UID] volumeTopology *scheduler.VolumeTopology cluster *state.Cluster recorder events.Recorder @@ -89,7 +89,7 @@ func NewProvisioner(kubeClient client.Client, recorder events.Recorder, cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster, ) *Provisioner { p := &Provisioner{ - batcher: NewBatcher(), + batcher: NewBatcher[types.UID](), cloudProvider: cloudProvider, kubeClient: kubeClient, volumeTopology: scheduler.NewVolumeTopology(kubeClient), @@ -100,8 +100,8 @@ func NewProvisioner(kubeClient client.Client, recorder events.Recorder, return p } -func (p *Provisioner) Trigger() { - p.batcher.Trigger() +func (p *Provisioner) Trigger(uid types.UID) { + p.batcher.Trigger(uid) } func (p *Provisioner) Register(_ context.Context, m manager.Manager) error {