Skip to content

Commit

Permalink
perf: Pre-filter instance types on nodepool requirements (#1824)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis authored Nov 17, 2024
1 parent 879964d commit 1d087f0
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 17 deletions.
6 changes: 3 additions & 3 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,10 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
}
if len(instanceTypeOptions) == 0 {
log.FromContext(ctx).WithValues("NodePool", klog.KRef("", nodePool.Name)).Info("skipping, no resolved instance types found")

continue
}
instanceTypes[nodePool.Name] = append(instanceTypes[nodePool.Name], instanceTypeOptions...)

instanceTypes[nodePool.Name] = instanceTypeOptions

// Construct Topology Domains
for _, instanceType := range instanceTypeOptions {
Expand Down Expand Up @@ -301,7 +301,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
if err != nil {
return nil, fmt.Errorf("getting daemon pods, %w", err)
}
return scheduler.NewScheduler(p.kubeClient, lo.ToSlicePtr(nodePoolList.Items), p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, p.clock), nil
return scheduler.NewScheduler(ctx, p.kubeClient, lo.ToSlicePtr(nodePoolList.Items), p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, p.clock), nil
}

func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
Expand Down
17 changes: 15 additions & 2 deletions pkg/controllers/provisioning/scheduling/instance_selection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,8 +861,7 @@ var _ = Describe("Instance Type Selection", func() {
{
NodeSelectorRequirement: corev1.NodeSelectorRequirement{
Key: instanceGeneration,
Operator: corev1.NodeSelectorOpGt,
Values: []string{"2"},
Operator: corev1.NodeSelectorOpExists,
},
MinValues: lo.ToPtr(2),
},
Expand All @@ -875,12 +874,26 @@ var _ = Describe("Instance Type Selection", func() {
corev1.ResourceCPU: resource.MustParse("0.9"),
corev1.ResourceMemory: resource.MustParse("0.9Gi")},
},
NodeRequirements: []corev1.NodeSelectorRequirement{
{
Key: instanceGeneration,
Operator: corev1.NodeSelectorOpGt,
Values: []string{"2"},
},
},
})
pod2 := test.UnschedulablePod(test.PodOptions{
ResourceRequirements: corev1.ResourceRequirements{Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("0.9"),
corev1.ResourceMemory: resource.MustParse("0.9Gi")},
},
NodeRequirements: []corev1.NodeSelectorRequirement{
{
Key: instanceGeneration,
Operator: corev1.NodeSelectorOpGt,
Values: []string{"2"},
},
},
})

ExpectApplied(ctx, env.Client, pod1)
Expand Down
27 changes: 16 additions & 11 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
"sigs.k8s.io/karpenter/pkg/utils/resources"
)

func NewScheduler(kubeClient client.Client, nodePools []*v1.NodePool,
func NewScheduler(ctx context.Context, kubeClient client.Client, nodePools []*v1.NodePool,
cluster *state.Cluster, stateNodes []*state.StateNode, topology *Topology,
instanceTypes map[string][]*cloudprovider.InstanceType, daemonSetPods []*corev1.Pod,
recorder events.Recorder, clock clock.Clock) *Scheduler {
Expand All @@ -59,15 +59,22 @@ func NewScheduler(kubeClient client.Client, nodePools []*v1.NodePool,
}
}
}

templates := lo.Map(nodePools, func(np *v1.NodePool, _ int) *NodeClaimTemplate { return NewNodeClaimTemplate(np) })
// Pre-filter instance types eligible for NodePools to reduce work done during scheduling loops for pods
templates := lo.FilterMap(nodePools, func(np *v1.NodePool, _ int) (*NodeClaimTemplate, bool) {
nct := NewNodeClaimTemplate(np)
nct.InstanceTypeOptions = filterInstanceTypesByRequirements(instanceTypes[np.Name], nct.Requirements, corev1.ResourceList{}).remaining
if len(nct.InstanceTypeOptions) == 0 {
log.FromContext(ctx).WithValues("NodePool", klog.KRef("", np.Name)).Info("skipping, nodepool requirements filtered out all instance types")
return nil, false
}
return nct, true
})
s := &Scheduler{
id: uuid.NewUUID(),
kubeClient: kubeClient,
nodeClaimTemplates: templates,
topology: topology,
cluster: cluster,
instanceTypes: instanceTypes,
daemonOverhead: getDaemonOverhead(templates, daemonSetPods),
recorder: recorder,
preferences: &Preferences{ToleratePreferNoSchedule: toleratePreferNoSchedule},
Expand All @@ -85,8 +92,7 @@ type Scheduler struct {
newNodeClaims []*NodeClaim
existingNodes []*ExistingNode
nodeClaimTemplates []*NodeClaimTemplate
remainingResources map[string]corev1.ResourceList // (NodePool name) -> remaining resources for that NodePool
instanceTypes map[string][]*cloudprovider.InstanceType // (NodePool name) -> instance types for NodePool
remainingResources map[string]corev1.ResourceList // (NodePool name) -> remaining resources for that NodePool
daemonOverhead map[*NodeClaimTemplate]corev1.ResourceList
preferences *Preferences
topology *Topology
Expand Down Expand Up @@ -274,17 +280,16 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {
// Create new node
var errs error
for _, nodeClaimTemplate := range s.nodeClaimTemplates {
instanceTypes := s.instanceTypes[nodeClaimTemplate.NodePoolName]
instanceTypes := nodeClaimTemplate.InstanceTypeOptions
// if limits have been applied to the nodepool, ensure we filter instance types to avoid violating those limits
if remaining, ok := s.remainingResources[nodeClaimTemplate.NodePoolName]; ok {
instanceTypes = filterByRemainingResources(s.instanceTypes[nodeClaimTemplate.NodePoolName], remaining)
instanceTypes = filterByRemainingResources(instanceTypes, remaining)
if len(instanceTypes) == 0 {
errs = multierr.Append(errs, fmt.Errorf("all available instance types exceed limits for nodepool: %q", nodeClaimTemplate.NodePoolName))
continue
} else if len(s.instanceTypes[nodeClaimTemplate.NodePoolName]) != len(instanceTypes) {

} else if len(nodeClaimTemplate.InstanceTypeOptions) != len(instanceTypes) {
log.FromContext(ctx).V(1).WithValues("NodePool", klog.KRef("", nodeClaimTemplate.NodePoolName)).Info(fmt.Sprintf("%d out of %d instance types were excluded because they would breach limits",
len(s.instanceTypes[nodeClaimTemplate.NodePoolName])-len(instanceTypes), len(s.instanceTypes[nodeClaimTemplate.NodePoolName])))
len(nodeClaimTemplate.InstanceTypeOptions)-len(instanceTypes), len(nodeClaimTemplate.InstanceTypeOptions)))
}
}
nodeClaim := NewNodeClaim(nodeClaimTemplate, s.topology, s.daemonOverhead[nodeClaimTemplate], instanceTypes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
b.Fatalf("creating topology, %s", err)
}

scheduler := scheduling.NewScheduler(client, []*v1.NodePool{nodePool},
scheduler := scheduling.NewScheduler(ctx, client, []*v1.NodePool{nodePool},
cluster, nil, topology,
map[string][]*cloudprovider.InstanceType{nodePool.Name: instanceTypes}, nil,
events.NewRecorder(&record.FakeRecorder{}), clock)
Expand Down

0 comments on commit 1d087f0

Please sign in to comment.