From 40ab1d27cda93b3afc0d5dc1a46a75d4ba748a2a Mon Sep 17 00:00:00 2001 From: Jonathan Innis Date: Mon, 29 Jul 2024 23:00:51 -0700 Subject: [PATCH] Use clock interface --- pkg/controllers/controllers.go | 2 +- .../disruption/orchestration/suite_test.go | 2 +- pkg/controllers/disruption/suite_test.go | 2 +- pkg/controllers/provisioning/batcher.go | 17 ++++++++++------- pkg/controllers/provisioning/provisioner.go | 7 +++---- .../provisioning/scheduling/suite_test.go | 2 +- pkg/controllers/provisioning/suite_test.go | 2 +- 7 files changed, 18 insertions(+), 16 deletions(-) diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index ebe118585c..2cd376b90c 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -60,7 +60,7 @@ func NewControllers( ) []controller.Controller { cluster := state.NewCluster(clock, kubeClient) - p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster) + p := provisioning.NewProvisioner(clock, kubeClient, recorder, cloudProvider, cluster) evictionQueue := terminator.NewQueue(kubeClient, recorder) disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p) diff --git a/pkg/controllers/disruption/orchestration/suite_test.go b/pkg/controllers/disruption/orchestration/suite_test.go index fcbd12e7a9..1dd620a4cc 100644 --- a/pkg/controllers/disruption/orchestration/suite_test.go +++ b/pkg/controllers/disruption/orchestration/suite_test.go @@ -80,7 +80,7 @@ var _ = BeforeSuite(func() { nodeStateController = informer.NewNodeController(env.Client, cluster) nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster) recorder = test.NewEventRecorder() - prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster) + prov = provisioning.NewProvisioner(fakeClock, env.Client, recorder, cloudProvider, cluster) queue = orchestration.NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov) }) diff --git a/pkg/controllers/disruption/suite_test.go b/pkg/controllers/disruption/suite_test.go index 759968d997..86d52b9658 100644 --- a/pkg/controllers/disruption/suite_test.go +++ b/pkg/controllers/disruption/suite_test.go @@ -90,7 +90,7 @@ var _ = BeforeSuite(func() { nodeStateController = informer.NewNodeController(env.Client, cluster) nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster) recorder = test.NewEventRecorder() - prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster) + prov = provisioning.NewProvisioner(fakeClock, env.Client, recorder, cloudProvider, cluster) queue = orchestration.NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov) disruptionController = disruption.NewController(fakeClock, env.Client, prov, cloudProvider, recorder, cluster, queue) }) diff --git a/pkg/controllers/provisioning/batcher.go b/pkg/controllers/provisioning/batcher.go index 056ef4aabf..d87211649f 100644 --- a/pkg/controllers/provisioning/batcher.go +++ b/pkg/controllers/provisioning/batcher.go @@ -22,6 +22,7 @@ import ( "time" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/utils/clock" "sigs.k8s.io/karpenter/pkg/operator/options" ) @@ -31,15 +32,17 @@ import ( // maximum batch duration. type Batcher[T comparable] struct { trigger chan struct{} + clk clock.Clock mu sync.RWMutex triggeredOnElems sets.Set[T] } // NewBatcher is a constructor for the Batcher -func NewBatcher[T comparable]() *Batcher[T] { +func NewBatcher[T comparable](clk clock.Clock) *Batcher[T] { return &Batcher[T]{ trigger: make(chan struct{}, 1), + clk: clk, triggeredOnElems: sets.New[T](), } } @@ -76,23 +79,23 @@ func (b *Batcher[T]) Wait(ctx context.Context) bool { select { case <-b.trigger: // start the batching window after the first item is received - case <-time.After(1 * time.Second): + case <-b.clk.After(1 * time.Second): // If no pods, bail to the outer controller framework to refresh the context return false } - timeout := time.NewTimer(options.FromContext(ctx).BatchMaxDuration) - idle := time.NewTimer(options.FromContext(ctx).BatchIdleDuration) + timeout := b.clk.NewTimer(options.FromContext(ctx).BatchMaxDuration) + idle := b.clk.NewTimer(options.FromContext(ctx).BatchIdleDuration) for { select { case <-b.trigger: // correct way to reset an active timer per docs if !idle.Stop() { - <-idle.C + <-idle.C() } idle.Reset(options.FromContext(ctx).BatchIdleDuration) - case <-timeout.C: + case <-timeout.C(): return true - case <-idle.C: + case <-idle.C(): return true } } diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index d5d7f34735..e60fa83df4 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -25,6 +25,7 @@ import ( "github.com/awslabs/operatorpkg/option" "github.com/awslabs/operatorpkg/status" + "k8s.io/utils/clock" "github.com/awslabs/operatorpkg/singleton" "github.com/prometheus/client_golang/prometheus" @@ -85,11 +86,9 @@ type Provisioner struct { cm *pretty.ChangeMonitor } -func NewProvisioner(kubeClient client.Client, recorder events.Recorder, - cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster, -) *Provisioner { +func NewProvisioner(clk clock.Clock, kubeClient client.Client, recorder events.Recorder, cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster) *Provisioner { p := &Provisioner{ - batcher: NewBatcher[types.UID](), + batcher: NewBatcher[types.UID](clk), cloudProvider: cloudProvider, kubeClient: kubeClient, volumeTopology: scheduler.NewVolumeTopology(kubeClient), diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index 3c9e72912b..427e5c5c2d 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -98,7 +98,7 @@ var _ = BeforeSuite(func() { nodeStateController = informer.NewNodeController(env.Client, cluster) nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster) podStateController = informer.NewPodController(env.Client, cluster) - prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster) + prov = provisioning.NewProvisioner(fakeClock, env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster) }) var _ = AfterSuite(func() { diff --git a/pkg/controllers/provisioning/suite_test.go b/pkg/controllers/provisioning/suite_test.go index ffd3f07483..c93f0d909c 100644 --- a/pkg/controllers/provisioning/suite_test.go +++ b/pkg/controllers/provisioning/suite_test.go @@ -77,7 +77,7 @@ var _ = BeforeSuite(func() { fakeClock = clock.NewFakeClock(time.Now()) cluster = state.NewCluster(fakeClock, env.Client) nodeController = informer.NewNodeController(env.Client, cluster) - prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster) + prov = provisioning.NewProvisioner(fakeClock, env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster) daemonsetController = informer.NewDaemonSetController(env.Client, cluster) instanceTypes, _ := cloudProvider.GetInstanceTypes(ctx, nil) instanceTypeMap = map[string]*cloudprovider.InstanceType{}