diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 2b69b26380..b68daf82aa 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -57,7 +57,7 @@ func NewControllers( ) []controller.Controller { cluster := state.NewCluster(clock, kubeClient) - p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster) + p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster, clock) evictionQueue := terminator.NewQueue(kubeClient, recorder) disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p) diff --git a/pkg/controllers/disruption/orchestration/suite_test.go b/pkg/controllers/disruption/orchestration/suite_test.go index 2b4538ffe0..dcb5054420 100644 --- a/pkg/controllers/disruption/orchestration/suite_test.go +++ b/pkg/controllers/disruption/orchestration/suite_test.go @@ -85,7 +85,7 @@ var _ = BeforeSuite(func() { nodeStateController = informer.NewNodeController(env.Client, cluster) nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster) recorder = test.NewEventRecorder() - prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster) + prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster, fakeClock) queue = NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov) }) diff --git a/pkg/controllers/disruption/suite_test.go b/pkg/controllers/disruption/suite_test.go index a09aa8872b..40338a0a48 100644 --- a/pkg/controllers/disruption/suite_test.go +++ b/pkg/controllers/disruption/suite_test.go @@ -96,7 +96,7 @@ var _ = BeforeSuite(func() { nodeStateController = informer.NewNodeController(env.Client, cluster) nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster) recorder = test.NewEventRecorder() - prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster) + prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster, fakeClock) queue = NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov) disruptionController = disruption.NewController(fakeClock, env.Client, prov, cloudProvider, recorder, cluster, queue) }) diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index a51b04a4fd..344cb83971 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -35,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/clock" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -78,10 +79,12 @@ type Provisioner struct { cluster *state.Cluster recorder events.Recorder cm *pretty.ChangeMonitor + clock clock.Clock } func NewProvisioner(kubeClient client.Client, recorder events.Recorder, cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster, + clock clock.Clock, ) *Provisioner { p := &Provisioner{ batcher: NewBatcher(), @@ -91,6 +94,7 @@ func NewProvisioner(kubeClient client.Client, recorder events.Recorder, cluster: cluster, recorder: recorder, cm: pretty.NewChangeMonitor(), + clock: clock, } return p } @@ -298,7 +302,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat if err != nil { return nil, fmt.Errorf("getting daemon pods, %w", err) } - return scheduler.NewScheduler(p.kubeClient, lo.ToSlicePtr(nodePoolList.Items), p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder), nil + return scheduler.NewScheduler(p.kubeClient, lo.ToSlicePtr(nodePoolList.Items), p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, p.clock), nil } func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index 95e0059c24..dcc867190a 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "sort" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" @@ -29,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/klog/v2" + "k8s.io/utils/clock" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -46,7 +48,7 @@ import ( func NewScheduler(kubeClient client.Client, nodePools []*v1.NodePool, cluster *state.Cluster, stateNodes []*state.StateNode, topology *Topology, instanceTypes map[string][]*cloudprovider.InstanceType, daemonSetPods []*corev1.Pod, - recorder events.Recorder) *Scheduler { + recorder events.Recorder, clock clock.Clock) *Scheduler { // if any of the nodePools add a taint with a prefer no schedule effect, we add a toleration for the taint // during preference relaxation @@ -73,6 +75,7 @@ func NewScheduler(kubeClient client.Client, nodePools []*v1.NodePool, remainingResources: lo.SliceToMap(nodePools, func(np *v1.NodePool) (string, corev1.ResourceList) { return np.Name, corev1.ResourceList(np.Spec.Limits) }), + clock: clock, } s.calculateExistingNodeClaims(stateNodes, daemonSetPods) return s @@ -91,6 +94,7 @@ type Scheduler struct { cluster *state.Cluster recorder events.Recorder kubeClient client.Client + clock clock.Clock } // Results contains the results of the scheduling operation @@ -205,10 +209,19 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results { errors := map[*corev1.Pod]error{} QueueDepth.DeletePartialMatch(prometheus.Labels{ControllerLabel: injection.GetControllerName(ctx)}) // Reset the metric for the controller, so we don't keep old ids around q := NewQueue(pods...) + + startTime := s.clock.Now() + lastLogTime := s.clock.Now() + batchSize := len(q.pods) for { QueueDepth.With( prometheus.Labels{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.id)}, ).Set(float64(len(q.pods))) + + if s.clock.Since(lastLogTime) > time.Minute { + log.FromContext(ctx).WithValues("pods-scheduled", batchSize-len(q.pods), "pods-remaining", len(q.pods), "duration", s.clock.Since(startTime).Truncate(time.Second), "scheduling-id", string(s.id)).Info("computing pod scheduling...") + lastLogTime = s.clock.Now() + } // Try the next pod pod, ok := q.Pop() if !ok { diff --git a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go index aeeb80a52d..57a2c4fcaf 100644 --- a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go +++ b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go @@ -165,7 +165,8 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) { client := fakecr.NewFakeClient() pods := makeDiversePods(podCount) - cluster = state.NewCluster(&clock.RealClock{}, client) + clock := &clock.RealClock{} + cluster = state.NewCluster(clock, client) domains := map[string]sets.Set[string]{} topology, err := scheduling.NewTopology(ctx, client, cluster, domains, pods) if err != nil { @@ -175,7 +176,7 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) { scheduler := scheduling.NewScheduler(client, []*v1.NodePool{nodePool}, cluster, nil, topology, map[string][]*cloudprovider.InstanceType{nodePool.Name: instanceTypes}, nil, - events.NewRecorder(&record.FakeRecorder{})) + events.NewRecorder(&record.FakeRecorder{}), clock) b.ResetTimer() // Pack benchmark diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index 0a1016ef61..d4a7a88ffc 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -92,7 +92,7 @@ var _ = BeforeSuite(func() { nodeStateController = informer.NewNodeController(env.Client, cluster) nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster) podStateController = informer.NewPodController(env.Client, cluster) - prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster) + prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster, fakeClock) }) var _ = AfterSuite(func() { diff --git a/pkg/controllers/provisioning/suite_test.go b/pkg/controllers/provisioning/suite_test.go index 8e497a542c..b24f93c444 100644 --- a/pkg/controllers/provisioning/suite_test.go +++ b/pkg/controllers/provisioning/suite_test.go @@ -78,7 +78,7 @@ var _ = BeforeSuite(func() { fakeClock = clock.NewFakeClock(time.Now()) cluster = state.NewCluster(fakeClock, env.Client) nodeController = informer.NewNodeController(env.Client, cluster) - prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster) + prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster, fakeClock) daemonsetController = informer.NewDaemonSetController(env.Client, cluster) instanceTypes, _ := cloudProvider.GetInstanceTypes(ctx, nil) instanceTypeMap = map[string]*cloudprovider.InstanceType{}