Skip to content

Commit

Permalink
Fix idle duration timeout for pod/node batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed May 16, 2024
1 parent f26918e commit 075decf
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 20 deletions.
36 changes: 22 additions & 14 deletions pkg/controllers/provisioning/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,42 @@ import (
"context"
"time"

"k8s.io/apimachinery/pkg/util/sets"

"sigs.k8s.io/karpenter/pkg/operator/options"
)

// Batcher separates a stream of Trigger() calls into windowed slices. The
// window is dynamic and will be extended if additional items are added up to a
// maximum batch duration.
type Batcher struct {
trigger chan struct{}
type Batcher[T comparable] struct {
trigger chan T
}

// NewBatcher is a constructor for the Batcher
func NewBatcher() *Batcher {
return &Batcher{
trigger: make(chan struct{}, 1),
func NewBatcher[T comparable]() *Batcher[T] {
return &Batcher[T]{
trigger: make(chan T, 1),
}
}

// Trigger causes the batcher to start a batching window, or extend the current batching window if it hasn't reached the
// maximum length.
func (b *Batcher) Trigger() {
func (b *Batcher[T]) Trigger(triggeredOn T) {
// The trigger is idempotently armed. This statement never blocks
select {
case b.trigger <- struct{}{}:
case b.trigger <- triggeredOn:
default:
}
}

// Wait starts a batching window and continues waiting as long as it continues receiving triggers within
// the idleDuration, up to the maxDuration
func (b *Batcher) Wait(ctx context.Context) bool {
func (b *Batcher[T]) Wait(ctx context.Context) bool {
triggeredOnElems := sets.New[T]()
select {
case <-b.trigger:
case triggeredOn := <-b.trigger:
triggeredOnElems.Insert(triggeredOn)
// start the batching window after the first item is received
case <-time.After(1 * time.Second):
// If no pods, bail to the outer controller framework to refresh the context
Expand All @@ -61,12 +65,16 @@ func (b *Batcher) Wait(ctx context.Context) bool {
idle := time.NewTimer(options.FromContext(ctx).BatchIdleDuration)
for {
select {
case <-b.trigger:
// correct way to reset an active timer per docs
if !idle.Stop() {
<-idle.C
case triggeredOn := <-b.trigger:
// Ensure we don't re-trigger when we just get the same item triggering us multiple times
if !triggeredOnElems.Has(triggeredOn) {
// correct way to reset an active timer per docs
if !idle.Stop() {
<-idle.C
}
idle.Reset(options.FromContext(ctx).BatchIdleDuration)
}
idle.Reset(options.FromContext(ctx).BatchIdleDuration)
triggeredOnElems.Insert(triggeredOn)
case <-timeout.C:
return true
case <-idle.C:
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c *PodController) Reconcile(ctx context.Context, p *v1.Pod) (reconcile.Res
if !pod.IsProvisionable(p) {
return reconcile.Result{}, nil
}
c.provisioner.Trigger()
c.provisioner.Trigger(p.UID)
// Continue to requeue until the pod is no longer provisionable. Pods may
// not be scheduled as expected if new pods are created while nodes are
// coming online. Even if a provisioning loop is successful, the pod may
Expand Down Expand Up @@ -104,7 +104,7 @@ func (c *NodeController) Reconcile(ctx context.Context, n *v1.Node) (reconcile.R
if !lo.Contains(n.Spec.Taints, v1beta1.DisruptionNoScheduleTaint) {
return reconcile.Result{}, nil
}
c.provisioner.Trigger()
c.provisioner.Trigger(n.UID)
// Continue to requeue until the node is no longer provisionable. Pods may
// not be scheduled as expected if new pods are created while nodes are
// coming online. Even if a provisioning loop is successful, the pod may
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func WithReason(reason string) func(LaunchOptions) LaunchOptions {
type Provisioner struct {
cloudProvider cloudprovider.CloudProvider
kubeClient client.Client
batcher *Batcher
batcher *Batcher[types.UID]
volumeTopology *scheduler.VolumeTopology
cluster *state.Cluster
recorder events.Recorder
Expand All @@ -86,7 +86,7 @@ func NewProvisioner(kubeClient client.Client, recorder events.Recorder,
cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster,
) *Provisioner {
p := &Provisioner{
batcher: NewBatcher(),
batcher: NewBatcher[types.UID](),
cloudProvider: cloudProvider,
kubeClient: kubeClient,
volumeTopology: scheduler.NewVolumeTopology(kubeClient),
Expand All @@ -97,8 +97,8 @@ func NewProvisioner(kubeClient client.Client, recorder events.Recorder,
return p
}

func (p *Provisioner) Trigger() {
p.batcher.Trigger()
func (p *Provisioner) Trigger(uid types.UID) {
p.batcher.Trigger(uid)
}

func (p *Provisioner) Register(_ context.Context, mgr manager.Manager) error {
Expand Down

0 comments on commit 075decf

Please sign in to comment.