-
Notifications
You must be signed in to change notification settings - Fork 4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
CA: refactor SchedulerBasedPredicateChecker into SchedulerPluginRunner
For DRA, this component will have to call the Reserve phase in addition to just checking predicates/filters. The new version also makes more sense in the context of PredicateSnapshot, which is the only context now. While refactoring, I noticed that CheckPredicates for some reason doesn't check the provided Node against the eligible Nodes returned from PreFilter (while FitsAnyNodeMatching does do that). This seems like a bug, so the check is added. The checks in FitsAnyNodeMatching are also reordered so that the cheapest ones are checked earliest.
- Loading branch information
Showing
4 changed files
with
264 additions
and
280 deletions.
There are no files selected for viewing
147 changes: 147 additions & 0 deletions
147
cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
/* | ||
Copyright 2024 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package predicate | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strings" | ||
|
||
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" | ||
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework" | ||
|
||
apiv1 "k8s.io/api/core/v1" | ||
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" | ||
) | ||
|
||
// SchedulerPluginRunner can be used to run various phases of scheduler plugins through the scheduler framework. | ||
type SchedulerPluginRunner struct { | ||
fwHandle *framework.Handle | ||
snapshotStore clustersnapshot.ClusterSnapshotStore | ||
lastIndex int | ||
} | ||
|
||
// NewSchedulerPluginRunner builds a SchedulerPluginRunner. | ||
func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshotStore clustersnapshot.ClusterSnapshotStore) *SchedulerPluginRunner { | ||
return &SchedulerPluginRunner{fwHandle: fwHandle, snapshotStore: snapshotStore} | ||
} | ||
|
||
// RunFiltersUntilPassingNode runs the scheduler framework PreFilter phase once, and then keeps running the Filter phase for all nodes in the cluster that match the provided | ||
// function - until a Node where the Filters pass is found. Filters are only run for matching Nodes. If no matching Node with passing Filters is found, an error is returned. | ||
// | ||
// The node iteration always starts from the next Node from the last Node that was found by this method. TODO: Extract the iteration strategy out of SchedulerPluginRunner. | ||
func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) { | ||
nodeInfosList, err := p.snapshotStore.ListNodeInfos() | ||
if err != nil { | ||
return "", clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error listing NodeInfos: %v", err)) | ||
} | ||
|
||
p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotStore) | ||
defer p.fwHandle.DelegatingLister.ResetDelegate() | ||
|
||
state := schedulerframework.NewCycleState() | ||
// Run the PreFilter phase of the framework for the Pod. This allows plugins to precompute some things (for all Nodes in the cluster at once) and | ||
// save them in the CycleState. During the Filter phase, plugins can retrieve the precomputes from the CycleState and use them for answering the Filter | ||
// for a given Node. | ||
preFilterResult, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod) | ||
if !preFilterStatus.IsSuccess() { | ||
// If any of the plugin PreFilter methods isn't successful, the corresponding Filter method can't be run, so the whole scheduling cycle is aborted. | ||
// Match that behavior here. | ||
return "", clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") | ||
} | ||
|
||
for i := range nodeInfosList { | ||
// Determine which NodeInfo to check next. | ||
nodeInfo := nodeInfosList[(p.lastIndex+i)%len(nodeInfosList)] | ||
|
||
// Plugins can filter some Nodes out during the PreFilter phase, if they're sure the Nodes won't work for the Pod at that stage. | ||
// Filters are only run for Nodes that haven't been filtered out during the PreFilter phase. Match that behavior here - skip such Nodes. | ||
if !preFilterResult.AllNodes() && !preFilterResult.NodeNames.Has(nodeInfo.Node().Name) { | ||
continue | ||
} | ||
|
||
// Nodes with the Unschedulable bit set will be rejected by one of the plugins during the Filter phase below. We can check that quickly here | ||
// and short-circuit to avoid running the expensive Filter phase at all in this case. | ||
if nodeInfo.Node().Spec.Unschedulable { | ||
continue | ||
} | ||
|
||
// Check if the NodeInfo matches the provided filtering condition. This should be less expensive than running the Filter phase below, so | ||
// check this first. | ||
if !nodeMatches(nodeInfo) { | ||
continue | ||
} | ||
|
||
// Run the Filter phase of the framework. Plugins retrieve the state they saved during PreFilter from CycleState, and answer whether the | ||
// given Pod can be scheduled on the given Node. | ||
filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo.ToScheduler()) | ||
if filterStatus.IsSuccess() { | ||
// Filter passed for all plugins, so this pod can be scheduled on this Node. | ||
p.lastIndex = (p.lastIndex + i + 1) % len(nodeInfosList) | ||
return nodeInfo.Node().Name, nil | ||
} | ||
// Filter didn't pass for some plugin, so this Node won't work - move on to the next one. | ||
} | ||
return "", clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod) | ||
} | ||
|
||
// RunFiltersOnNode runs the scheduler framework PreFilter and Filter phases to check if the given pod can be scheduled on the given node. | ||
func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { | ||
nodeInfo, err := p.snapshotStore.GetNodeInfo(nodeName) | ||
if err != nil { | ||
return clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err)) | ||
} | ||
|
||
p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotStore) | ||
defer p.fwHandle.DelegatingLister.ResetDelegate() | ||
|
||
state := schedulerframework.NewCycleState() | ||
// Run the PreFilter phase of the framework for the Pod and check the results. See the corresponding comments in RunFiltersUntilPassingNode() for more info. | ||
preFilterResult, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod) | ||
if !preFilterStatus.IsSuccess() { | ||
return clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") | ||
} | ||
if !preFilterResult.AllNodes() && !preFilterResult.NodeNames.Has(nodeInfo.Node().Name) { | ||
return clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter filtered the Node out", "") | ||
} | ||
|
||
// Run the Filter phase of the framework for the Pod and the Node and check the results. See the corresponding comments in RunFiltersUntilPassingNode() for more info. | ||
filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo.ToScheduler()) | ||
if !filterStatus.IsSuccess() { | ||
filterName := filterStatus.Plugin() | ||
filterReasons := filterStatus.Reasons() | ||
unexpectedErrMsg := "" | ||
if !filterStatus.IsRejected() { | ||
unexpectedErrMsg = fmt.Sprintf("unexpected filter status %q", filterStatus.Code().String()) | ||
} | ||
return clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo)) | ||
} | ||
|
||
// PreFilter and Filter phases checked, this Pod can be scheduled on this Node. | ||
return nil | ||
} | ||
|
||
func (p *SchedulerPluginRunner) failingFilterDebugInfo(filterName string, nodeInfo *framework.NodeInfo) string { | ||
infoParts := []string{fmt.Sprintf("nodeName: %q", nodeInfo.Node().Name)} | ||
|
||
switch filterName { | ||
case "TaintToleration": | ||
infoParts = append(infoParts, fmt.Sprintf("nodeTaints: %#v", nodeInfo.Node().Spec.Taints)) | ||
} | ||
|
||
return strings.Join(infoParts, ", ") | ||
} |
Oops, something went wrong.