From f14c532f8722914ff39852621d0fbc554bb91775 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20K=C5=82obuszewski?= Date: Wed, 15 Jan 2025 13:10:24 +0100 Subject: [PATCH] Revert "Parallelize creating cluster snapshot" --- .../config/autoscaling_options.go | 2 - .../filter_out_schedulable_test.go | 2 +- cluster-autoscaler/main.go | 4 +- .../pod_injection_processor_test.go | 2 +- .../predicate/predicate_snapshot_test.go | 2 +- .../simulator/clustersnapshot/store/delta.go | 104 +++--------------- .../store/delta_benchmark_test.go | 4 +- 7 files changed, 21 insertions(+), 99 deletions(-) diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 5f32de514af7..ed4a8b645801 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -311,8 +311,6 @@ type AutoscalingOptions struct { ForceDeleteLongUnregisteredNodes bool // DynamicResourceAllocationEnabled configures whether logic for handling DRA objects is enabled. DynamicResourceAllocationEnabled bool - // ClusterSnapshotParallelism is the maximum parallelism of cluster snapshot creation. - ClusterSnapshotParallelism int } // KubeClientOptions specify options for kube client diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go b/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go index 61303ed3a8d1..c8510cb0ac17 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go @@ -254,7 +254,7 @@ func BenchmarkFilterOutSchedulable(b *testing.B) { return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewBasicSnapshotStore()) }, "delta": func() clustersnapshot.ClusterSnapshot { - return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore(16)) + return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore()) }, } for snapshotName, snapshotFactory := range snapshots { diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 3395b26ac6e7..c828774e992b 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -283,7 +283,6 @@ var ( checkCapacityProvisioningRequestBatchTimebox = flag.Duration("check-capacity-provisioning-request-batch-timebox", 10*time.Second, "Maximum time to process a batch of provisioning requests.") forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Whether to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.") enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.") - clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.") ) func isFlagPassed(name string) bool { @@ -464,7 +463,6 @@ func createAutoscalingOptions() config.AutoscalingOptions { CheckCapacityProvisioningRequestBatchTimebox: *checkCapacityProvisioningRequestBatchTimebox, ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes, DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation, - ClusterSnapshotParallelism: *clusterSnapshotParallelism, } } @@ -507,7 +505,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) drainabilityRules := rules.Default(deleteOptions) - var snapshotStore clustersnapshot.ClusterSnapshotStore = store.NewDeltaSnapshotStore(autoscalingOptions.ClusterSnapshotParallelism) + var snapshotStore clustersnapshot.ClusterSnapshotStore = store.NewDeltaSnapshotStore() if autoscalingOptions.DynamicResourceAllocationEnabled { // TODO(DRA): Remove this once DeltaSnapshotStore is integrated with DRA. klog.Warningf("Using BasicSnapshotStore instead of DeltaSnapshotStore because DRA is enabled. Autoscaling performance/scalability might be decreased.") diff --git a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go index 4ce6b895b15f..5ef275fd3dcf 100644 --- a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go +++ b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go @@ -114,7 +114,7 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry()) - clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore(16)) + clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore()) err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...)) assert.NoError(t, err) ctx := context.AutoscalingContext{ diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot_test.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot_test.go index b56a67e9e729..615f3e5f91e8 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot_test.go @@ -56,7 +56,7 @@ var snapshots = map[string]func() (clustersnapshot.ClusterSnapshot, error){ if err != nil { return nil, err } - return NewPredicateSnapshot(store.NewDeltaSnapshotStore(16), fwHandle, true), nil + return NewPredicateSnapshot(store.NewDeltaSnapshotStore(), fwHandle, true), nil }, } diff --git a/cluster-autoscaler/simulator/clustersnapshot/store/delta.go b/cluster-autoscaler/simulator/clustersnapshot/store/delta.go index 705bc655893e..40154047491a 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/store/delta.go +++ b/cluster-autoscaler/simulator/clustersnapshot/store/delta.go @@ -17,13 +17,11 @@ limitations under the License. package store import ( - "context" "fmt" apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" - "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -46,8 +44,7 @@ import ( // pod affinity - causes scheduler framework to list pods with non-empty selector, // so basic caching doesn't help. type DeltaSnapshotStore struct { - data *internalDeltaSnapshotData - parallelism int + data *internalDeltaSnapshotData } type deltaSnapshotStoreNodeLister DeltaSnapshotStore @@ -140,14 +137,10 @@ func (data *internalDeltaSnapshotData) buildNodeInfoList() []*schedulerframework return nodeInfoList } -func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) (*schedulerframework.NodeInfo, error) { +func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) error { nodeInfo := schedulerframework.NewNodeInfo() nodeInfo.SetNode(node) - err := data.addNodeInfo(nodeInfo) - if err != nil { - return nil, err - } - return nodeInfo, nil + return data.addNodeInfo(nodeInfo) } func (data *internalDeltaSnapshotData) addNodeInfo(nodeInfo *schedulerframework.NodeInfo) error { @@ -248,24 +241,6 @@ func (data *internalDeltaSnapshotData) addPod(pod *apiv1.Pod, nodeName string) e return nil } -func (data *internalDeltaSnapshotData) addPodToNode(pod *apiv1.Pod, ni *schedulerframework.NodeInfo) error { - ni.AddPod(pod) - - // Maybe consider deleting from the list in the future. Maybe not. - data.clearCaches() - return nil -} - -func (data *internalDeltaSnapshotData) addPodsToNode(pods []*apiv1.Pod, ni *schedulerframework.NodeInfo) error { - for _, pod := range pods { - ni.AddPod(pod) - } - - // Maybe consider deleting from the list in the future. Maybe not. - data.clearCaches() - return nil -} - func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName string) error { // This always clones node info, even if the pod is actually missing. // Not sure if we mind, since removing non-existent pod @@ -428,10 +403,8 @@ func (snapshot *DeltaSnapshotStore) DeviceClasses() schedulerframework.DeviceCla } // NewDeltaSnapshotStore creates instances of DeltaSnapshotStore. -func NewDeltaSnapshotStore(parallelism int) *DeltaSnapshotStore { - snapshot := &DeltaSnapshotStore{ - parallelism: parallelism, - } +func NewDeltaSnapshotStore() *DeltaSnapshotStore { + snapshot := &DeltaSnapshotStore{} snapshot.clear() return snapshot } @@ -444,7 +417,7 @@ func (snapshot *DeltaSnapshotStore) DraSnapshot() drasnapshot.Snapshot { // AddSchedulerNodeInfo adds a NodeInfo. func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerframework.NodeInfo) error { - if _, err := snapshot.data.addNode(nodeInfo.Node()); err != nil { + if err := snapshot.data.addNode(nodeInfo.Node()); err != nil { return err } for _, podInfo := range nodeInfo.Pods { @@ -455,71 +428,24 @@ func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerfram return nil } -// setClusterStatePodsSequential sets the pods in cluster state in a sequential way. -func (snapshot *DeltaSnapshotStore) setClusterStatePodsSequential(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error { - for _, pod := range scheduledPods { - if nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]; ok { - if err := snapshot.data.addPodToNode(pod, nodeInfos[nodeIdx]); err != nil { - return err - } - } - } - return nil -} - -// setClusterStatePodsParallelized sets the pods in cluster state in parallel based on snapshot.parallelism value. -func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error { - podsForNode := make([][]*apiv1.Pod, len(nodeInfos)) - for _, pod := range scheduledPods { - nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName] - if !ok { - continue - } - podsForNode[nodeIdx] = append(podsForNode[nodeIdx], pod) - } - - ctx := context.Background() - ctx, cancel := context.WithCancelCause(ctx) - - workqueue.ParallelizeUntil(ctx, snapshot.parallelism, len(nodeInfos), func(nodeIdx int) { - err := snapshot.data.addPodsToNode(podsForNode[nodeIdx], nodeInfos[nodeIdx]) - if err != nil { - cancel(err) - } - }) - - return context.Cause(ctx) -} - // SetClusterState sets the cluster state. func (snapshot *DeltaSnapshotStore) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod, draSnapshot drasnapshot.Snapshot) error { snapshot.clear() - nodeNameToIdx := make(map[string]int, len(nodes)) - nodeInfos := make([]*schedulerframework.NodeInfo, len(nodes)) - for i, node := range nodes { - nodeInfo, err := snapshot.data.addNode(node) - if err != nil { + knownNodes := make(map[string]bool) + for _, node := range nodes { + if err := snapshot.data.addNode(node); err != nil { return err } - nodeNameToIdx[node.Name] = i - nodeInfos[i] = nodeInfo + knownNodes[node.Name] = true } - - if snapshot.parallelism > 1 { - err := snapshot.setClusterStatePodsParallelized(nodeInfos, nodeNameToIdx, scheduledPods) - if err != nil { - return err - } - } else { - // TODO(macsko): Migrate to setClusterStatePodsParallelized for parallelism == 1 - // after making sure the implementation is always correct in CA 1.33. - err := snapshot.setClusterStatePodsSequential(nodeInfos, nodeNameToIdx, scheduledPods) - if err != nil { - return err + for _, pod := range scheduledPods { + if knownNodes[pod.Spec.NodeName] { + if err := snapshot.data.addPod(pod, pod.Spec.NodeName); err != nil { + return err + } } } - // TODO(DRA): Save DRA snapshot. return nil } diff --git a/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go b/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go index 5f618befd180..c10776cc9de8 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go @@ -48,7 +48,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) { for _, tc := range testCases { b.Run(fmt.Sprintf("fork add 1000 to %d", tc.nodeCount), func(b *testing.B) { nodes := clustersnapshot.CreateTestNodes(tc.nodeCount + 1000) - deltaStore := NewDeltaSnapshotStore(16) + deltaStore := NewDeltaSnapshotStore() if err := deltaStore.SetClusterState(nodes[:tc.nodeCount], nil, drasnapshot.Snapshot{}); err != nil { assert.NoError(b, err) } @@ -70,7 +70,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) { for _, tc := range testCases { b.Run(fmt.Sprintf("base %d", tc.nodeCount), func(b *testing.B) { nodes := clustersnapshot.CreateTestNodes(tc.nodeCount) - deltaStore := NewDeltaSnapshotStore(16) + deltaStore := NewDeltaSnapshotStore() if err := deltaStore.SetClusterState(nodes, nil, drasnapshot.Snapshot{}); err != nil { assert.NoError(b, err) }