diff --git a/Makefile b/Makefile index 704a608480..52b0ad2be5 100644 --- a/Makefile +++ b/Makefile @@ -79,7 +79,7 @@ deflake: ## Run randomized, racing tests until the test fails to catch flakes ginkgo \ --race \ --focus="${FOCUS}" \ - --timeout=10m \ + --timeout=20m \ --randomize-all \ --until-it-fails \ -v \ diff --git a/pkg/controllers/disruption/consolidation_test.go b/pkg/controllers/disruption/consolidation_test.go index 5777967c1c..07a7795f55 100644 --- a/pkg/controllers/disruption/consolidation_test.go +++ b/pkg/controllers/disruption/consolidation_test.go @@ -108,7 +108,7 @@ var _ = Describe("Consolidation", func() { ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -205,7 +205,7 @@ var _ = Describe("Consolidation", func() { fakeClock.Step(10 * time.Minute) wg := sync.WaitGroup{} - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -254,7 +254,7 @@ var _ = Describe("Consolidation", func() { ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -280,7 +280,7 @@ var _ = Describe("Consolidation", func() { ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -353,7 +353,7 @@ var _ = Describe("Consolidation", func() { ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -398,7 +398,7 @@ var _ = Describe("Consolidation", func() { ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) // Reconcile 5 times, enqueuing 3 commands total. for i := 0; i < 5; i++ { ExpectSingletonReconciled(ctx, disruptionController) @@ -462,7 +462,7 @@ var _ = Describe("Consolidation", func() { ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -528,7 +528,7 @@ var _ = Describe("Consolidation", func() { ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -901,7 +901,7 @@ var _ = Describe("Consolidation", func() { // consolidation won't delete the old nodeclaim until the new nodeclaim is ready var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -1197,7 +1197,7 @@ var _ = Describe("Consolidation", func() { // consolidation won't delete the old nodeclaim until the new nodeclaim is ready var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -1322,7 +1322,7 @@ var _ = Describe("Consolidation", func() { // consolidation won't delete the old nodeclaim until the new nodeclaim is ready var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -1447,7 +1447,7 @@ var _ = Describe("Consolidation", func() { // consolidation won't delete the old nodeclaim until the new nodeclaim is ready var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -1615,7 +1615,7 @@ var _ = Describe("Consolidation", func() { // consolidation won't delete the old nodeclaim until the new nodeclaim is ready var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -1767,7 +1767,7 @@ var _ = Describe("Consolidation", func() { // consolidation won't delete the old nodeclaim until the new nodeclaim is ready var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -1841,7 +1841,7 @@ var _ = Describe("Consolidation", func() { // consolidation won't delete the old node until the new node is ready var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -1936,7 +1936,7 @@ var _ = Describe("Consolidation", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -2027,7 +2027,7 @@ var _ = Describe("Consolidation", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -2285,7 +2285,7 @@ var _ = Describe("Consolidation", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -2332,7 +2332,7 @@ var _ = Describe("Consolidation", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -2386,7 +2386,7 @@ var _ = Describe("Consolidation", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -2448,7 +2448,7 @@ var _ = Describe("Consolidation", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -2500,7 +2500,7 @@ var _ = Describe("Consolidation", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -2548,7 +2548,7 @@ var _ = Describe("Consolidation", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -2694,7 +2694,7 @@ var _ = Describe("Consolidation", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -2884,7 +2884,7 @@ var _ = Describe("Consolidation", func() { ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{consolidatableNode}, []*v1.NodeClaim{consolidatableNodeClaim}) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -2940,7 +2940,7 @@ var _ = Describe("Consolidation", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -3052,7 +3052,7 @@ var _ = Describe("Consolidation", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -3575,7 +3575,7 @@ var _ = Describe("Consolidation", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -3639,7 +3639,7 @@ var _ = Describe("Consolidation", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -3712,7 +3712,7 @@ var _ = Describe("Consolidation", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -4046,7 +4046,7 @@ var _ = Describe("Consolidation", func() { fakeClock.SetTime(time.Now()) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -4158,7 +4158,7 @@ var _ = Describe("Consolidation", func() { // consolidation won't delete the old node until the new node is ready var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -4290,7 +4290,7 @@ var _ = Describe("Consolidation", func() { // Run the processing loop in parallel in the background with environment context var wg sync.WaitGroup ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) go func() { defer GinkgoRecover() ExpectSingletonReconciled(ctx, disruptionController) diff --git a/pkg/controllers/disruption/emptiness_test.go b/pkg/controllers/disruption/emptiness_test.go index e534e087e5..8d06380f84 100644 --- a/pkg/controllers/disruption/emptiness_test.go +++ b/pkg/controllers/disruption/emptiness_test.go @@ -103,7 +103,7 @@ var _ = Describe("Emptiness", func() { fakeClock.Step(10 * time.Minute) wg := sync.WaitGroup{} - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -147,7 +147,7 @@ var _ = Describe("Emptiness", func() { // consolidation won't delete the old nodeclaim until the new nodeclaim is ready var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -235,7 +235,7 @@ var _ = Describe("Emptiness", func() { ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) wg := sync.WaitGroup{} - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -303,7 +303,7 @@ var _ = Describe("Emptiness", func() { ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) wg := sync.WaitGroup{} - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -372,7 +372,7 @@ var _ = Describe("Emptiness", func() { ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) wg := sync.WaitGroup{} - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -398,7 +398,7 @@ var _ = Describe("Emptiness", func() { fakeClock.Step(10 * time.Minute) wg := sync.WaitGroup{} - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -479,7 +479,7 @@ var _ = Describe("Emptiness", func() { fakeClock.Step(10 * time.Minute) wg := sync.WaitGroup{} - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -594,7 +594,7 @@ var _ = Describe("Emptiness", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -659,7 +659,7 @@ var _ = Describe("Emptiness", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() diff --git a/pkg/controllers/disruption/suite_test.go b/pkg/controllers/disruption/suite_test.go index 8deea114de..88245eb349 100644 --- a/pkg/controllers/disruption/suite_test.go +++ b/pkg/controllers/disruption/suite_test.go @@ -439,7 +439,7 @@ var _ = Describe("Simulate Scheduling", func() { // disruption won't delete the old node until the new node is ready var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -564,7 +564,7 @@ var _ = Describe("Disruption Taints", func() { wg.Add(1) go func() { defer wg.Done() - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) }() @@ -1889,7 +1889,7 @@ var _ = Describe("Metrics", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -1931,7 +1931,7 @@ var _ = Describe("Metrics", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -1993,7 +1993,7 @@ var _ = Describe("Metrics", func() { fakeClock.Step(10 * time.Minute) var wg sync.WaitGroup - ExpectToWait(&wg) + ExpectToWait(fakeClock, &wg) ExpectSingletonReconciled(ctx, disruptionController) wg.Wait() @@ -2030,24 +2030,6 @@ func fromInt(i int) *intstr.IntOrString { return &v } -// This continually polls the wait group to see if there -// is a timer waiting, incrementing the clock if not. -func ExpectToWait(wg *sync.WaitGroup) { - wg.Add(1) - Expect(fakeClock.HasWaiters()).To(BeFalse()) - go func() { - defer GinkgoRecover() - defer wg.Done() - Eventually(func() bool { return fakeClock.HasWaiters() }). - // Caution: if another go routine takes longer than this timeout to - // wait on the clock, we will deadlock until the test suite timeout - WithTimeout(10 * time.Second). - WithPolling(10 * time.Millisecond). - Should(BeTrue()) - fakeClock.Step(45 * time.Second) - }() -} - // ExpectTaintedNodeCount will assert the number of nodes and tainted nodes in the cluster and return the tainted nodes. func ExpectTaintedNodeCount(ctx context.Context, c client.Client, numTainted int) []*corev1.Node { GinkgoHelper() diff --git a/pkg/controllers/provisioning/batcher.go b/pkg/controllers/provisioning/batcher.go index 9d6cbcc792..e0fee7c9c0 100644 --- a/pkg/controllers/provisioning/batcher.go +++ b/pkg/controllers/provisioning/batcher.go @@ -18,58 +18,92 @@ package provisioning import ( "context" + "sync" "time" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/utils/clock" + "sigs.k8s.io/karpenter/pkg/operator/options" ) // Batcher separates a stream of Trigger() calls into windowed slices. The // window is dynamic and will be extended if additional items are added up to a // maximum batch duration. -type Batcher struct { +type Batcher[T comparable] struct { trigger chan struct{} + clk clock.Clock + + mu sync.RWMutex + elems sets.Set[T] } // NewBatcher is a constructor for the Batcher -func NewBatcher() *Batcher { - return &Batcher{ +func NewBatcher[T comparable](clk clock.Clock) *Batcher[T] { + return &Batcher[T]{ trigger: make(chan struct{}, 1), + clk: clk, + elems: sets.New[T](), } } // Trigger causes the batcher to start a batching window, or extend the current batching window if it hasn't reached the // maximum length. -func (b *Batcher) Trigger() { +func (b *Batcher[T]) Trigger(elem T) { + // Don't trigger if we've already triggered for this element + b.mu.RLock() + if b.elems.Has(elem) { + b.mu.RUnlock() + return + } + b.mu.RUnlock() // The trigger is idempotently armed. This statement never blocks select { case b.trigger <- struct{}{}: default: } + b.mu.Lock() + b.elems.Insert(elem) + b.mu.Unlock() } // Wait starts a batching window and continues waiting as long as it continues receiving triggers within // the idleDuration, up to the maxDuration -func (b *Batcher) Wait(ctx context.Context) bool { +func (b *Batcher[T]) Wait(ctx context.Context) bool { + // Ensure that we always reset our tracked elements at the end of a Wait() statement + defer func() { + b.mu.Lock() + b.elems.Clear() + b.mu.Unlock() + }() + + timeout := b.clk.NewTimer(time.Second) select { case <-b.trigger: // start the batching window after the first item is received - case <-time.After(1 * time.Second): + timeout.Stop() + case <-timeout.C(): // If no pods, bail to the outer controller framework to refresh the context return false } - timeout := time.NewTimer(options.FromContext(ctx).BatchMaxDuration) - idle := time.NewTimer(options.FromContext(ctx).BatchIdleDuration) + timeout = b.clk.NewTimer(options.FromContext(ctx).BatchMaxDuration) + idle := b.clk.NewTimer(options.FromContext(ctx).BatchIdleDuration) + defer func() { + timeout.Stop() + idle.Stop() + }() + for { select { case <-b.trigger: // correct way to reset an active timer per docs if !idle.Stop() { - <-idle.C + <-idle.C() } idle.Reset(options.FromContext(ctx).BatchIdleDuration) - case <-timeout.C: + case <-timeout.C(): return true - case <-idle.C: + case <-idle.C(): return true } } diff --git a/pkg/controllers/provisioning/controller.go b/pkg/controllers/provisioning/controller.go index f54ce7a254..0b6827d9ee 100644 --- a/pkg/controllers/provisioning/controller.go +++ b/pkg/controllers/provisioning/controller.go @@ -57,7 +57,7 @@ func (c *PodController) Reconcile(ctx context.Context, p *corev1.Pod) (reconcile if !pod.IsProvisionable(p) { return reconcile.Result{}, nil } - c.provisioner.Trigger() + c.provisioner.Trigger(p.UID) // ACK the pending pod when first observed so that total time spent pending due to Karpenter is tracked. c.cluster.AckPods(p) // Continue to requeue until the pod is no longer provisionable. Pods may @@ -102,7 +102,7 @@ func (c *NodeController) Reconcile(ctx context.Context, n *corev1.Node) (reconci }) { return reconcile.Result{}, nil } - c.provisioner.Trigger() + c.provisioner.Trigger(n.UID) // Continue to requeue until the node is no longer provisionable. Pods may // not be scheduled as expected if new pods are created while nodes are // coming online. Even if a provisioning loop is successful, the pod may diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 30c8de6e8b..83e59b18cb 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -74,7 +74,7 @@ func WithReason(reason string) func(*LaunchOptions) { type Provisioner struct { cloudProvider cloudprovider.CloudProvider kubeClient client.Client - batcher *Batcher + batcher *Batcher[types.UID] volumeTopology *scheduler.VolumeTopology cluster *state.Cluster recorder events.Recorder @@ -87,7 +87,7 @@ func NewProvisioner(kubeClient client.Client, recorder events.Recorder, clock clock.Clock, ) *Provisioner { p := &Provisioner{ - batcher: NewBatcher(), + batcher: NewBatcher[types.UID](clock), cloudProvider: cloudProvider, kubeClient: kubeClient, volumeTopology: scheduler.NewVolumeTopology(kubeClient), @@ -99,8 +99,8 @@ func NewProvisioner(kubeClient client.Client, recorder events.Recorder, return p } -func (p *Provisioner) Trigger() { - p.batcher.Trigger() +func (p *Provisioner) Trigger(uid types.UID) { + p.batcher.Trigger(uid) } func (p *Provisioner) Register(_ context.Context, m manager.Manager) error { diff --git a/pkg/controllers/provisioning/suite_test.go b/pkg/controllers/provisioning/suite_test.go index d718e41403..56f1a21dd8 100644 --- a/pkg/controllers/provisioning/suite_test.go +++ b/pkg/controllers/provisioning/suite_test.go @@ -19,6 +19,7 @@ package provisioning_test import ( "context" "fmt" + "sync" "testing" "time" @@ -90,6 +91,12 @@ var _ = BeforeSuite(func() { var _ = BeforeEach(func() { ctx = options.ToContext(ctx, test.Options()) cloudProvider.Reset() + + // ensure any waiters on our clock are allowed to proceed before resetting our clock time + for fakeClock.HasWaiters() { + fakeClock.Step(1 * time.Minute) + } + fakeClock.SetTime(time.Now()) }) var _ = AfterSuite(func() { @@ -104,6 +111,99 @@ var _ = AfterEach(func() { }) var _ = Describe("Provisioning", func() { + Context("Batcher", func() { + It("should provision single pod if no other pod is received within the batch idle duration", func() { + pod := test.UnschedulablePod() + ExpectApplied(ctx, env.Client, test.NodePool(), pod) + prov.Trigger(pod.UID) + + wg := sync.WaitGroup{} + ExpectToWait(fakeClock, &wg) + result := ExpectSingletonReconciled(ctx, prov) + Expect(result.RequeueAfter).ToNot(BeNil()) + }) + It("should not extend the timeout if we receive the same pod within the batch idle duration", func() { + ctx = options.ToContext(ctx, test.Options(test.OptionsFields{ + BatchMaxDuration: lo.ToPtr(10 * time.Second), + BatchIdleDuration: lo.ToPtr(5 * time.Second), + })) + pod := test.UnschedulablePod() + ExpectApplied(ctx, env.Client, test.NodePool(), pod) + + wg := sync.WaitGroup{} + wg.Add(1) + Expect(fakeClock.HasWaiters()).To(BeFalse()) + go func() { + defer GinkgoRecover() + defer wg.Done() + + // Have a waiter on the first trigger and trigger the batcher + Eventually(func() bool { return fakeClock.HasWaiters() }, time.Second).Should(BeTrue()) + prov.Trigger(pod.UID) + + time.Sleep(time.Second) // give the process time to make it to the next batching section + + // Fall-through to the second batching section + Eventually(func() bool { return fakeClock.HasWaiters() }, time.Second).Should(BeTrue()) + + // Step the clock by 3 seconds which is within the batch idle duration of 5s and then add the same pod again. + fakeClock.Step(3 * time.Second) + // We expect to have waiters on the fakeClock since this is still within the batch idle duration of 5s. + Eventually(func() bool { return fakeClock.HasWaiters() }, time.Second).Should(BeTrue()) + prov.Trigger(pod.UID) + // Step the clock again by 3s to just cross the batch idle duration. We should be able to get out of the + // provisioning loop because the same pod will not cause the idle duration to reset. + fakeClock.Step(3 * time.Second) + Eventually(func() bool { return fakeClock.HasWaiters() }, time.Second).Should(BeFalse()) + }() + ExpectSingletonReconciled(ctx, prov) + wg.Wait() + }) + It("should extend the timeout if we receive a new pod within the batch idle duration", func() { + ctx = options.ToContext(ctx, test.Options(test.OptionsFields{ + BatchMaxDuration: lo.ToPtr(10 * time.Second), + BatchIdleDuration: lo.ToPtr(5 * time.Second), + })) + pod := test.UnschedulablePod() + ExpectApplied(ctx, env.Client, test.NodePool(), pod) + + pod2 := test.UnschedulablePod() + ExpectApplied(ctx, env.Client, pod2) + + wg := sync.WaitGroup{} + wg.Add(1) + Expect(fakeClock.HasWaiters()).To(BeFalse()) + go func() { + defer GinkgoRecover() + defer wg.Done() + + // Have a waiter on the first trigger and trigger the batcher + Eventually(func() bool { return fakeClock.HasWaiters() }, time.Second).Should(BeTrue()) + prov.Trigger(pod.UID) + + time.Sleep(time.Second) // give the process time to make it to the next batching section + + // Fall-through to the second batching section + Eventually(func() bool { return fakeClock.HasWaiters() }, time.Second).Should(BeTrue()) + + // Step the clock by 3 seconds which is within the batch idle duration of 5s and then add a new pod + fakeClock.Step(3 * time.Second) + // We expect to have waiters on the fakeClock since this is still within the batch idle duration of 5s. + Eventually(func() bool { return fakeClock.HasWaiters() }, time.Second).Should(BeTrue()) + prov.Trigger(pod2.UID) + // Step the clock by 5s as we expect provisioning to not happen until another 5s because the + // batch idle duration was reset due to a new pod being added. + fakeClock.Step(5 * time.Second) + Consistently(func() bool { return fakeClock.HasWaiters() }, time.Second).Should(BeTrue()) + // Stepping the clock again by 2s. We should be able to get out of the + // provisioning loop at this point + fakeClock.Step(3 * time.Second) + Eventually(func() bool { return fakeClock.HasWaiters() }, time.Second).Should(BeFalse()) + }() + ExpectSingletonReconciled(ctx, prov) + wg.Wait() + }) + }) It("should provision nodes", func() { ExpectApplied(ctx, env.Client, test.NodePool()) pod := test.UnschedulablePod() diff --git a/pkg/test/expectations/expectations.go b/pkg/test/expectations/expectations.go index 590699e18d..060ec78b43 100644 --- a/pkg/test/expectations/expectations.go +++ b/pkg/test/expectations/expectations.go @@ -26,6 +26,8 @@ import ( "sync" "time" + "k8s.io/utils/clock/testing" + opmetrics "github.com/awslabs/operatorpkg/metrics" "github.com/awslabs/operatorpkg/singleton" "github.com/awslabs/operatorpkg/status" @@ -128,6 +130,24 @@ func ExpectNotScheduled(ctx context.Context, c client.Client, pod *corev1.Pod) * return p } +// ExpectToWait continually polls the wait group to see if there +// is a timer waiting, incrementing the clock if not. +func ExpectToWait(fakeClock *testing.FakeClock, wg *sync.WaitGroup) { + wg.Add(1) + Expect(fakeClock.HasWaiters()).To(BeFalse()) + go func() { + defer GinkgoRecover() + defer wg.Done() + Eventually(func() bool { return fakeClock.HasWaiters() }). + // Caution: if another go routine takes longer than this timeout to + // wait on the clock, we will deadlock until the test suite timeout + WithTimeout(10 * time.Second). + WithPolling(10 * time.Millisecond). + Should(BeTrue()) + fakeClock.Step(45 * time.Second) + }() +} + func ExpectApplied(ctx context.Context, c client.Client, objects ...client.Object) { GinkgoHelper() for _, object := range objects {