Skip to content

Commit

Permalink
Merge pull request #7497 from towca/jtuznik/dra-predicate-snapshot
Browse files Browse the repository at this point in the history
CA: refactor PredicateChecker into ClusterSnapshot
  • Loading branch information
k8s-ci-robot authored Dec 4, 2024
2 parents b5b760f + 054d5d2 commit 0fc5c40
Show file tree
Hide file tree
Showing 57 changed files with 1,428 additions and 1,245 deletions.
11 changes: 5 additions & 6 deletions cluster-autoscaler/context/autoscaling_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/expander"
processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
Expand All @@ -44,9 +44,8 @@ type AutoscalingContext struct {
AutoscalingKubeClients
// CloudProvider used in CA.
CloudProvider cloudprovider.CloudProvider
// TODO(kgolab) - move away too as it's not config
// PredicateChecker to check if a pod can fit into a node.
PredicateChecker predicatechecker.PredicateChecker
// FrameworkHandle can be used to interact with the scheduler framework.
FrameworkHandle *framework.Handle
// ClusterSnapshot denotes cluster snapshot used for predicate checking.
ClusterSnapshot clustersnapshot.ClusterSnapshot
// ExpanderStrategy is the strategy used to choose which node group to expand when scaling up
Expand Down Expand Up @@ -100,7 +99,7 @@ func NewResourceLimiterFromAutoscalingOptions(options config.AutoscalingOptions)
// NewAutoscalingContext returns an autoscaling context from all the necessary parameters passed via arguments
func NewAutoscalingContext(
options config.AutoscalingOptions,
predicateChecker predicatechecker.PredicateChecker,
fwHandle *framework.Handle,
clusterSnapshot clustersnapshot.ClusterSnapshot,
autoscalingKubeClients *AutoscalingKubeClients,
cloudProvider cloudprovider.CloudProvider,
Expand All @@ -114,7 +113,7 @@ func NewAutoscalingContext(
AutoscalingOptions: options,
CloudProvider: cloudProvider,
AutoscalingKubeClients: *autoscalingKubeClients,
PredicateChecker: predicateChecker,
FrameworkHandle: fwHandle,
ClusterSnapshot: clusterSnapshot,
ExpanderStrategy: expanderStrategy,
ProcessorCallbacks: processorCallbacks,
Expand Down
17 changes: 13 additions & 4 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/client-go/informers"
Expand All @@ -49,7 +51,7 @@ type AutoscalerOptions struct {
InformerFactory informers.SharedInformerFactory
AutoscalingKubeClients *context.AutoscalingKubeClients
CloudProvider cloudprovider.CloudProvider
PredicateChecker predicatechecker.PredicateChecker
FrameworkHandle *framework.Handle
ClusterSnapshot clustersnapshot.ClusterSnapshot
ExpanderStrategy expander.Strategy
EstimatorBuilder estimator.EstimatorBuilder
Expand Down Expand Up @@ -86,7 +88,7 @@ func NewAutoscaler(opts AutoscalerOptions, informerFactory informers.SharedInfor
}
return NewStaticAutoscaler(
opts.AutoscalingOptions,
opts.PredicateChecker,
opts.FrameworkHandle,
opts.ClusterSnapshot,
opts.AutoscalingKubeClients,
opts.Processors,
Expand Down Expand Up @@ -114,8 +116,15 @@ func initializeDefaultOptions(opts *AutoscalerOptions, informerFactory informers
if opts.AutoscalingKubeClients == nil {
opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory)
}
if opts.FrameworkHandle == nil {
fwHandle, err := framework.NewHandle(opts.InformerFactory, opts.SchedulerConfig)
if err != nil {
return err
}
opts.FrameworkHandle = fwHandle
}
if opts.ClusterSnapshot == nil {
opts.ClusterSnapshot = clustersnapshot.NewBasicClusterSnapshot()
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), opts.FrameworkHandle)
}
if opts.RemainingPdbTracker == nil {
opts.RemainingPdbTracker = pdb.NewBasicRemainingPdbTracker()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"time"

"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
Expand Down Expand Up @@ -267,7 +269,7 @@ func TestCurrentlyDrainedNodesPodListProcessor(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctx := context.AutoscalingContext{
ScaleDownActuator: &mockActuator{&mockActuationStatus{tc.drainedNodes}},
ClusterSnapshot: clustersnapshot.NewBasicClusterSnapshot(),
ClusterSnapshot: testsnapshot.NewTestSnapshotOrDie(t),
}
clustersnapshot.InitializeClusterSnapshotOrDie(t, ctx.ClusterSnapshot, tc.nodes, tc.pods)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods
// CA logic from before migration to scheduler framework. So let's keep it for now
func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
for _, p := range pods {
// TODO(DRA): Figure out if/how to use the predicate-checking SchedulePod() here instead - otherwise this doesn't work with DRA pods.
if err := ctx.ClusterSnapshot.ForceAddPod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestFilterOutExpendable(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
processor := NewFilterOutExpendablePodListProcessor()
snapshot := clustersnapshot.NewBasicClusterSnapshot()
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
err := snapshot.SetClusterState(tc.nodes, nil)
assert.NoError(t, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
klog "k8s.io/klog/v2"
Expand All @@ -38,9 +37,9 @@ type filterOutSchedulablePodListProcessor struct {
}

// NewFilterOutSchedulablePodListProcessor creates a PodListProcessor filtering out schedulable pods
func NewFilterOutSchedulablePodListProcessor(predicateChecker predicatechecker.PredicateChecker, nodeFilter func(*framework.NodeInfo) bool) *filterOutSchedulablePodListProcessor {
func NewFilterOutSchedulablePodListProcessor(nodeFilter func(*framework.NodeInfo) bool) *filterOutSchedulablePodListProcessor {
return &filterOutSchedulablePodListProcessor{
schedulingSimulator: scheduling.NewHintingSimulator(predicateChecker),
schedulingSimulator: scheduling.NewHintingSimulator(),
nodeFilter: nodeFilter,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,14 @@ import (

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
schedulermetrics "k8s.io/kubernetes/pkg/scheduler/metrics"
)

func TestFilterOutSchedulable(t *testing.T) {
schedulermetrics.Register()

node := buildReadyTestNode("node", 2000, 100)
matchesAllNodes := func(*framework.NodeInfo) bool { return true }
matchesNoNodes := func(*framework.NodeInfo) bool { return false }
Expand Down Expand Up @@ -176,9 +174,7 @@ func TestFilterOutSchedulable(t *testing.T) {

for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot()
predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(t, err)
clusterSnapshot := testsnapshot.NewTestSnapshotOrDie(t)

var allExpectedScheduledPods []*apiv1.Pod
allExpectedScheduledPods = append(allExpectedScheduledPods, tc.expectedScheduledPods...)
Expand All @@ -194,7 +190,7 @@ func TestFilterOutSchedulable(t *testing.T) {

clusterSnapshot.Fork()

processor := NewFilterOutSchedulablePodListProcessor(predicateChecker, tc.nodeFilter)
processor := NewFilterOutSchedulablePodListProcessor(tc.nodeFilter)
unschedulablePods, err := processor.filterOutSchedulableByPacking(tc.unschedulableCandidates, clusterSnapshot)

assert.NoError(t, err)
Expand Down Expand Up @@ -253,8 +249,12 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
},
}
snapshots := map[string]func() clustersnapshot.ClusterSnapshot{
"basic": func() clustersnapshot.ClusterSnapshot { return clustersnapshot.NewBasicClusterSnapshot() },
"delta": func() clustersnapshot.ClusterSnapshot { return clustersnapshot.NewDeltaClusterSnapshot() },
"basic": func() clustersnapshot.ClusterSnapshot {
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewBasicSnapshotStore())
},
"delta": func() clustersnapshot.ClusterSnapshot {
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore())
},
}
for snapshotName, snapshotFactory := range snapshots {
for _, tc := range tests {
Expand All @@ -279,9 +279,6 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
}
}

predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(b, err)

clusterSnapshot := snapshotFactory()
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods); err != nil {
assert.NoError(b, err)
Expand All @@ -290,7 +287,7 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
processor := NewFilterOutSchedulablePodListProcessor(predicateChecker, scheduling.ScheduleAnywhere)
processor := NewFilterOutSchedulablePodListProcessor(scheduling.ScheduleAnywhere)
if stillPending, err := processor.filterOutSchedulableByPacking(pendingPods, clusterSnapshot); err != nil {
assert.NoError(b, err)
} else if len(stillPending) < tc.pendingPods {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ package podlistprocessor
import (
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
)

// NewDefaultPodListProcessor returns a default implementation of the pod list
// processor, which wraps and sequentially runs other sub-processors.
func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker, nodeFilter func(*framework.NodeInfo) bool) *pods.CombinedPodListProcessor {
func NewDefaultPodListProcessor(nodeFilter func(*framework.NodeInfo) bool) *pods.CombinedPodListProcessor {
return pods.NewCombinedPodListProcessor([]pods.PodListProcessor{
NewClearTPURequestsPodListProcessor(),
NewFilterOutExpendablePodListProcessor(),
NewCurrentlyDrainedNodesPodListProcessor(),
NewFilterOutSchedulablePodListProcessor(predicateChecker, nodeFilter),
NewFilterOutSchedulablePodListProcessor(nodeFilter),
NewFilterOutDaemonSetPodListProcessor(),
})
}
4 changes: 3 additions & 1 deletion cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
Expand Down Expand Up @@ -356,7 +358,7 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
}

func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) {
snapshot := clustersnapshot.NewBasicClusterSnapshot()
snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.ctx.FrameworkHandle)
pods, err := a.ctx.AllPodLister().List()
if err != nil {
return nil, err
Expand Down
4 changes: 1 addition & 3 deletions cluster-autoscaler/core/scaledown/actuation/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
Expand All @@ -50,7 +51,6 @@ import (
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
schedulermetrics "k8s.io/kubernetes/pkg/scheduler/metrics"
)

type nodeGroupViewInfo struct {
Expand Down Expand Up @@ -1000,8 +1000,6 @@ func getStartDeletionTestCases(ignoreDaemonSetsUtilization bool, suffix string)
}

func TestStartDeletion(t *testing.T) {
schedulermetrics.Register()

testSets := []map[string]startDeletionTestCase{
// IgnoreDaemonSetsUtilization is false
getStartDeletionTestCases(false, "testNg1"),
Expand Down
4 changes: 3 additions & 1 deletion cluster-autoscaler/core/scaledown/actuation/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -37,6 +38,7 @@ import (
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -611,7 +613,7 @@ func TestPodsToEvict(t *testing.T) {
},
} {
t.Run(tn, func(t *testing.T) {
snapshot := clustersnapshot.NewBasicClusterSnapshot()
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
node := BuildTestNode("test-node", 1000, 1000)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.pods...))
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
Expand All @@ -29,10 +32,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
schedulermetrics "k8s.io/kubernetes/pkg/scheduler/metrics"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/fake"
)

Expand Down Expand Up @@ -149,8 +148,6 @@ func getTestCases(ignoreDaemonSetsUtilization bool, suffix string, now time.Time
}

func TestFilterOutUnremovable(t *testing.T) {
schedulermetrics.Register()

now := time.Now()
for _, tc := range append(getTestCases(false, "IgnoreDaemonSetUtilization=false", now),
getTestCases(true, "IgnoreDaemonsetUtilization=true", now)...) {
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/core/scaledown/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
context: context,
unremovableNodes: unremovable.NewNodes(),
unneededNodes: unneeded.NewNodes(processors.NodeGroupConfigProcessor, resourceLimitsFinder),
rs: simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, deleteOptions, drainabilityRules, true),
actuationInjector: scheduling.NewHintingSimulator(context.PredicateChecker),
rs: simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, deleteOptions, drainabilityRules, true),
actuationInjector: scheduling.NewHintingSimulator(),
eligibilityChecker: eligibility.NewChecker(processors.NodeGroupConfigProcessor),
nodeUtilizationMap: make(map[string]utilization.Info),
resourceLimitsFinder: resourceLimitsFinder,
Expand Down
4 changes: 1 addition & 3 deletions cluster-autoscaler/core/scaledown/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/stretchr/testify/assert"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -44,12 +45,9 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/client-go/kubernetes/fake"
schedulermetrics "k8s.io/kubernetes/pkg/scheduler/metrics"
)

func TestUpdateClusterState(t *testing.T) {
schedulermetrics.Register()

testCases := []struct {
name string
nodes []*apiv1.Node
Expand Down
Loading

0 comments on commit 0fc5c40

Please sign in to comment.