diff --git a/pkg/controllers/provisioning/batcher.go b/pkg/controllers/provisioning/batcher.go index 9d6cbcc792..d76d17649b 100644 --- a/pkg/controllers/provisioning/batcher.go +++ b/pkg/controllers/provisioning/batcher.go @@ -20,38 +20,42 @@ import ( "context" "time" + "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/karpenter/pkg/operator/options" ) // Batcher separates a stream of Trigger() calls into windowed slices. The // window is dynamic and will be extended if additional items are added up to a // maximum batch duration. -type Batcher struct { - trigger chan struct{} +type Batcher[T comparable] struct { + trigger chan T } // NewBatcher is a constructor for the Batcher -func NewBatcher() *Batcher { - return &Batcher{ - trigger: make(chan struct{}, 1), +func NewBatcher[T comparable]() *Batcher[T] { + return &Batcher[T]{ + trigger: make(chan T, 1), } } // Trigger causes the batcher to start a batching window, or extend the current batching window if it hasn't reached the // maximum length. -func (b *Batcher) Trigger() { +func (b *Batcher[T]) Trigger(triggeredOn T) { // The trigger is idempotently armed. This statement never blocks select { - case b.trigger <- struct{}{}: + case b.trigger <- triggeredOn: default: } } // Wait starts a batching window and continues waiting as long as it continues receiving triggers within // the idleDuration, up to the maxDuration -func (b *Batcher) Wait(ctx context.Context) bool { +func (b *Batcher[T]) Wait(ctx context.Context) bool { + triggeredOnElems := sets.New[T]() select { - case <-b.trigger: + case triggeredOn := <-b.trigger: + triggeredOnElems.Insert(triggeredOn) // start the batching window after the first item is received case <-time.After(1 * time.Second): // If no pods, bail to the outer controller framework to refresh the context @@ -61,12 +65,16 @@ func (b *Batcher) Wait(ctx context.Context) bool { idle := time.NewTimer(options.FromContext(ctx).BatchIdleDuration) for { select { - case <-b.trigger: - // correct way to reset an active timer per docs - if !idle.Stop() { - <-idle.C + case triggeredOn := <-b.trigger: + // Ensure we don't re-trigger when we just get the same item triggering us multiple times + if !triggeredOnElems.Has(triggeredOn) { + // correct way to reset an active timer per docs + if !idle.Stop() { + <-idle.C + } + idle.Reset(options.FromContext(ctx).BatchIdleDuration) } - idle.Reset(options.FromContext(ctx).BatchIdleDuration) + triggeredOnElems.Insert(triggeredOn) case <-timeout.C: return true case <-idle.C: diff --git a/pkg/controllers/provisioning/controller.go b/pkg/controllers/provisioning/controller.go index e3b6bc6f64..e66547ecf8 100644 --- a/pkg/controllers/provisioning/controller.go +++ b/pkg/controllers/provisioning/controller.go @@ -60,7 +60,7 @@ func (c *PodController) Reconcile(ctx context.Context, p *v1.Pod) (reconcile.Res if !pod.IsProvisionable(p) { return reconcile.Result{}, nil } - c.provisioner.Trigger() + c.provisioner.Trigger(p.UID) // Continue to requeue until the pod is no longer provisionable. Pods may // not be scheduled as expected if new pods are created while nodes are // coming online. Even if a provisioning loop is successful, the pod may @@ -104,7 +104,7 @@ func (c *NodeController) Reconcile(ctx context.Context, n *v1.Node) (reconcile.R if !lo.Contains(n.Spec.Taints, v1beta1.DisruptionNoScheduleTaint) { return reconcile.Result{}, nil } - c.provisioner.Trigger() + c.provisioner.Trigger(n.UID) // Continue to requeue until the node is no longer provisionable. Pods may // not be scheduled as expected if new pods are created while nodes are // coming online. Even if a provisioning loop is successful, the pod may diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 5844c748c7..7377f67e92 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -75,7 +75,7 @@ func WithReason(reason string) func(LaunchOptions) LaunchOptions { type Provisioner struct { cloudProvider cloudprovider.CloudProvider kubeClient client.Client - batcher *Batcher + batcher *Batcher[types.UID] volumeTopology *scheduler.VolumeTopology cluster *state.Cluster recorder events.Recorder @@ -86,7 +86,7 @@ func NewProvisioner(kubeClient client.Client, recorder events.Recorder, cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster, ) *Provisioner { p := &Provisioner{ - batcher: NewBatcher(), + batcher: NewBatcher[types.UID](), cloudProvider: cloudProvider, kubeClient: kubeClient, volumeTopology: scheduler.NewVolumeTopology(kubeClient), @@ -97,8 +97,8 @@ func NewProvisioner(kubeClient client.Client, recorder events.Recorder, return p } -func (p *Provisioner) Trigger() { - p.batcher.Trigger() +func (p *Provisioner) Trigger(uid types.UID) { + p.batcher.Trigger(uid) } func (p *Provisioner) Register(_ context.Context, mgr manager.Manager) error {