Skip to content

Commit

Permalink
Use clock interface
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed Aug 28, 2024
1 parent 66a3444 commit fd14a96
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func NewControllers(
) []controller.Controller {

cluster := state.NewCluster(clock, kubeClient)
p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster)
p := provisioning.NewProvisioner(clock, kubeClient, recorder, cloudProvider, cluster)
evictionQueue := terminator.NewQueue(kubeClient, recorder)
disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p)

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/orchestration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var _ = BeforeSuite(func() {
nodeStateController = informer.NewNodeController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster)
recorder = test.NewEventRecorder()
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster)
prov = provisioning.NewProvisioner(fakeClock, env.Client, recorder, cloudProvider, cluster)
queue = orchestration.NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov)
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ var _ = BeforeSuite(func() {
nodeStateController = informer.NewNodeController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster)
recorder = test.NewEventRecorder()
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster)
prov = provisioning.NewProvisioner(fakeClock, env.Client, recorder, cloudProvider, cluster)
queue = orchestration.NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov)
disruptionController = disruption.NewController(fakeClock, env.Client, prov, cloudProvider, recorder, cluster, queue)
})
Expand Down
17 changes: 10 additions & 7 deletions pkg/controllers/provisioning/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/clock"

"sigs.k8s.io/karpenter/pkg/operator/options"
)
Expand All @@ -31,15 +32,17 @@ import (
// maximum batch duration.
type Batcher[T comparable] struct {
trigger chan struct{}
clk clock.Clock

mu sync.RWMutex
triggeredOnElems sets.Set[T]
}

// NewBatcher is a constructor for the Batcher
func NewBatcher[T comparable]() *Batcher[T] {
func NewBatcher[T comparable](clk clock.Clock) *Batcher[T] {
return &Batcher[T]{
trigger: make(chan struct{}, 1),
clk: clk,
triggeredOnElems: sets.New[T](),
}
}
Expand Down Expand Up @@ -76,23 +79,23 @@ func (b *Batcher[T]) Wait(ctx context.Context) bool {
select {
case <-b.trigger:
// start the batching window after the first item is received
case <-time.After(1 * time.Second):
case <-b.clk.After(1 * time.Second):
// If no pods, bail to the outer controller framework to refresh the context
return false
}
timeout := time.NewTimer(options.FromContext(ctx).BatchMaxDuration)
idle := time.NewTimer(options.FromContext(ctx).BatchIdleDuration)
timeout := b.clk.NewTimer(options.FromContext(ctx).BatchMaxDuration)
idle := b.clk.NewTimer(options.FromContext(ctx).BatchIdleDuration)
for {
select {
case <-b.trigger:
// correct way to reset an active timer per docs
if !idle.Stop() {
<-idle.C
<-idle.C()
}
idle.Reset(options.FromContext(ctx).BatchIdleDuration)
case <-timeout.C:
case <-timeout.C():
return true
case <-idle.C:
case <-idle.C():
return true
}
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/awslabs/operatorpkg/option"
"github.com/awslabs/operatorpkg/status"
"k8s.io/utils/clock"

"github.com/awslabs/operatorpkg/singleton"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -85,11 +86,9 @@ type Provisioner struct {
cm *pretty.ChangeMonitor
}

func NewProvisioner(kubeClient client.Client, recorder events.Recorder,
cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster,
) *Provisioner {
func NewProvisioner(clk clock.Clock, kubeClient client.Client, recorder events.Recorder, cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster) *Provisioner {
p := &Provisioner{
batcher: NewBatcher[types.UID](),
batcher: NewBatcher[types.UID](clk),
cloudProvider: cloudProvider,
kubeClient: kubeClient,
volumeTopology: scheduler.NewVolumeTopology(kubeClient),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ var _ = BeforeSuite(func() {
nodeStateController = informer.NewNodeController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster)
podStateController = informer.NewPodController(env.Client, cluster)
prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster)
prov = provisioning.NewProvisioner(fakeClock, env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster)
})

var _ = AfterSuite(func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var _ = BeforeSuite(func() {
fakeClock = clock.NewFakeClock(time.Now())
cluster = state.NewCluster(fakeClock, env.Client)
nodeController = informer.NewNodeController(env.Client, cluster)
prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster)
prov = provisioning.NewProvisioner(fakeClock, env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster)
daemonsetController = informer.NewDaemonSetController(env.Client, cluster)
instanceTypes, _ := cloudProvider.GetInstanceTypes(ctx, nil)
instanceTypeMap = map[string]*cloudprovider.InstanceType{}
Expand Down

0 comments on commit fd14a96

Please sign in to comment.