Skip to content

Commit

Permalink
Use clock interface
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed Dec 3, 2024
1 parent 90017c9 commit abd0869
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
17 changes: 10 additions & 7 deletions pkg/controllers/provisioning/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/clock"

"sigs.k8s.io/karpenter/pkg/operator/options"
)
Expand All @@ -31,15 +32,17 @@ import (
// maximum batch duration.
type Batcher[T comparable] struct {
trigger chan struct{}
clk clock.Clock

mu sync.RWMutex
triggeredOnElems sets.Set[T]
}

// NewBatcher is a constructor for the Batcher
func NewBatcher[T comparable]() *Batcher[T] {
func NewBatcher[T comparable](clk clock.Clock) *Batcher[T] {
return &Batcher[T]{
trigger: make(chan struct{}, 1),
clk: clk,
triggeredOnElems: sets.New[T](),
}
}
Expand Down Expand Up @@ -76,23 +79,23 @@ func (b *Batcher[T]) Wait(ctx context.Context) bool {
select {
case <-b.trigger:
// start the batching window after the first item is received
case <-time.After(1 * time.Second):
case <-b.clk.After(1 * time.Second):
// If no pods, bail to the outer controller framework to refresh the context
return false
}
timeout := time.NewTimer(options.FromContext(ctx).BatchMaxDuration)
idle := time.NewTimer(options.FromContext(ctx).BatchIdleDuration)
timeout := b.clk.NewTimer(options.FromContext(ctx).BatchMaxDuration)
idle := b.clk.NewTimer(options.FromContext(ctx).BatchIdleDuration)
for {
select {
case <-b.trigger:
// correct way to reset an active timer per docs
if !idle.Stop() {
<-idle.C
<-idle.C()
}
idle.Reset(options.FromContext(ctx).BatchIdleDuration)
case <-timeout.C:
case <-timeout.C():
return true
case <-idle.C:
case <-idle.C():
return true
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func NewProvisioner(kubeClient client.Client, recorder events.Recorder,
clock clock.Clock,
) *Provisioner {
p := &Provisioner{
batcher: NewBatcher[types.UID](),
batcher: NewBatcher[types.UID](clock),
cloudProvider: cloudProvider,
kubeClient: kubeClient,
volumeTopology: scheduler.NewVolumeTopology(kubeClient),
Expand Down

0 comments on commit abd0869

Please sign in to comment.