Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions pkg/common/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ type SchedulerConfig struct {
// - the preemption configuration for the partition
// - user group resolver type (os, ldap, "")
type PartitionConfig struct {
Name string
Queues []QueueConfig
PlacementRules []PlacementRule `yaml:",omitempty" json:",omitempty"`
Limits []Limit `yaml:",omitempty" json:",omitempty"`
Preemption PartitionPreemptionConfig `yaml:",omitempty" json:",omitempty"`
NodeSortPolicy NodeSortingPolicy `yaml:",omitempty" json:",omitempty"`
UserGroupResolver UserGroupResolver `yaml:",omitempty" json:",omitempty"`
Name string
Queues []QueueConfig
PlacementRules []PlacementRule `yaml:",omitempty" json:",omitempty"`
Limits []Limit `yaml:",omitempty" json:",omitempty"`
Preemption PartitionPreemptionConfig `yaml:",omitempty" json:",omitempty"`
NodeSortPolicy NodeSortingPolicy `yaml:",omitempty" json:",omitempty"`
UserGroupResolver UserGroupResolver `yaml:",omitempty" json:",omitempty"`
TryNodesThreadCount int `yaml:",omitempty" json:",omitempty"`
}

type UserGroupResolver struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/common/configs/configvalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,11 @@ func Validate(newConfig *SchedulerConfig) error {
return fmt.Errorf("duplicate partition name found with name %s", partition.Name)
}
partitionMap[strings.ToLower(partition.Name)] = true

if partition.TryNodesThreadCount <= 0 {
partition.TryNodesThreadCount = 1
}

// check the queue structure
err := checkQueuesStructure(&partition)
if err != nil {
Expand Down
180 changes: 177 additions & 3 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,9 @@ func (sa *Application) canReplace(request *Allocation) bool {
}

// tryAllocate will perform a regular allocation of a pending request, includes placeholders.
func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption bool, preemptionDelay time.Duration, preemptAttemptsRemaining *int, nodeIterator func() NodeIterator, fullNodeIterator func() NodeIterator, getNodeFn func(string) *Node) *AllocationResult {
func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption bool, tryNodesThreadCount int,
preemptionDelay time.Duration, preemptAttemptsRemaining *int, nodeIterator func() NodeIterator,
fullNodeIterator func() NodeIterator, getNodeFn func(string) *Node) *AllocationResult {
sa.Lock()
defer sa.Unlock()
if sa.sortedRequests == nil {
Expand Down Expand Up @@ -1043,8 +1045,15 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption

iterator := nodeIterator()
if iterator != nil {
if result := sa.tryNodes(request, iterator); result != nil {
// have a candidate return it
var result *AllocationResult

if tryNodesThreadCount > 1 {
result = sa.tryNodesInParallel(request, iterator, tryNodesThreadCount)
} else {
result = sa.tryNodes(request, iterator)
}

if result != nil {
return result
}

Expand Down Expand Up @@ -1460,6 +1469,171 @@ func (sa *Application) tryNodesNoReserve(ask *Allocation, iterator NodeIterator,

// Try all the nodes for a request. The resultType is an allocation or reservation of a node.
// New allocations can only be reserved after a delay.
func (sa *Application) tryNodesInParallel(ask *Allocation, iterator NodeIterator, tryNodesThreadCount int) *AllocationResult { //nolint:funlen
var nodeToReserve *Node
scoreReserved := math.Inf(1)
allocKey := ask.GetAllocationKey()
reserved := sa.reservations[allocKey]
var allocResult *AllocationResult
var predicateErrors map[string]int

var mu sync.Mutex

// Channel to signal completion
done := make(chan struct{})
defer close(done)

// Function to process each batch
processBatch := func(batch []*Node) {
var wg sync.WaitGroup
semaphore := make(chan struct{}, tryNodesThreadCount)
candidateNodes := make([]*Node, len(batch))
errors := make([]error, len(batch))

for idx, node := range batch {
wg.Add(1)
semaphore <- struct{}{}
go func(idx int, node *Node) {
defer wg.Done()
defer func() { <-semaphore }()
dryRunResult, err := sa.tryNodeDryRun(node, ask)

mu.Lock()
defer mu.Unlock()
if err != nil {
errors[idx] = err
} else if dryRunResult != nil {
candidateNodes[idx] = node
}
}(idx, node)
}
Comment on lines +1493 to +1509
Copy link
Contributor

@pbacsko pbacsko Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, but do I have a concern with this approach. What if a large cluster (eg 5000 nodes), we have unschedulable pods? In this case, we'd create 5000 goroutines for a single request in every scheduling cycle. If we have 10 unschedulable pods, that's 50000 goroutines - once per cycle. Overall, 500k goroutines per second.
Goroutines are cheap, but not free.

This might be an extreme, but we have to think about extremes, even if they're less common.

I'd definitely think about some sort of pooling solution, essentially worker goroutines which are always running and waiting for asks to evaluate. Shouldn't be hard to implement.

Anyway, I do have a simple test case which checks performance in the shim under pkg/shim/scheduling_perf_test.go. It's called BenchmarkSchedulingThroughPut(). This could be modified to submit unschedulable pods (eg. ones with a node selector that never matches) to see how it affects performance.

Copy link
Contributor Author

@mitdesai mitdesai Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @pbacsko, we do not create goroutines for each node. The parallelism is driven by the configuration: 'tryNodesThreadCount'

We will evaluate the nodes in a batches based on the threadCount. We also use the same node ordering that YuniKorn already has, so least used nodes are evaluated first.

Currently in our existing setup we use 100 threads for a 700 node cluster. This decreases the time for node evaluation at the cost of extra CPU.

Regarding an unschedulable pod in the system, even without the parallelism we will try each node for it in every cycle before bailing out. This process will just reduce the time taken for node evaluation in such case. Given that the threads count is set appropriately. Too many threads = more overhead for managing the threads. Too little = more time evaluating the nodes


wg.Wait()

for _, err := range errors {
if err != nil {
mu.Lock()
if predicateErrors == nil {
predicateErrors = make(map[string]int)
}
predicateErrors[err.Error()]++
mu.Unlock()
}
}

// Process dry-run candidateNodes sequentially within the batch
for _, candidateNode := range candidateNodes {
if candidateNode == nil {
continue
}
tryNodeStart := time.Now()
result, err := sa.tryNode(candidateNode, ask)
if err != nil {
if predicateErrors == nil {
predicateErrors = make(map[string]int)
}
predicateErrors[err.Error()]++
} else if result != nil {
metrics.GetSchedulerMetrics().ObserveTryNodeLatency(tryNodeStart)
if reserved != nil {
if reserved.nodeID != candidateNode.NodeID {
log.Log(log.SchedApplication).Debug("allocate picking reserved alloc during non reserved allocate",
zap.String("appID", sa.ApplicationID),
zap.String("reserved nodeID", reserved.nodeID),
zap.String("allocationKey", allocKey))
result.ReservedNodeID = reserved.nodeID
} else {
log.Log(log.SchedApplication).Debug("allocate found reserved alloc during non reserved allocate",
zap.String("appID", sa.ApplicationID),
zap.String("nodeID", candidateNode.NodeID),
zap.String("allocationKey", allocKey))
}
result.ResultType = AllocatedReserved
allocResult = result
return
}
allocResult = result
return
}
askAge := time.Since(ask.GetCreateTime())
if reserved == nil && askAge > reservationDelay {
log.Log(log.SchedApplication).Debug("app reservation check",
zap.String("allocationKey", allocKey),
zap.Time("createTime", ask.GetCreateTime()),
zap.Duration("askAge", askAge),
zap.Duration("reservationDelay", reservationDelay))
score := candidateNode.GetFitInScoreForAvailableResource(ask.GetAllocatedResource())
if score < scoreReserved {
scoreReserved = score
nodeToReserve = candidateNode
}
}
}
}

// Iterate over nodes and process in batches
var batch []*Node
iterator.ForEachNode(func(node *Node) bool {
batch = append(batch, node)
if len(batch) >= tryNodesThreadCount {
processBatch(batch)
batch = nil
if allocResult != nil {
return false // Exit iteration if an allocation has been made
}
}
return true
})
// Process any remaining nodes in the last batch
if len(batch) > 0 && allocResult == nil {
processBatch(batch)
}

if allocResult != nil {
return allocResult
}

if predicateErrors != nil {
ask.SendPredicatesFailedEvent(predicateErrors)
}

if nodeToReserve != nil && !nodeToReserve.IsReserved() {
log.Log(log.SchedApplication).Debug("found candidate node for app reservation",
zap.String("appID", sa.ApplicationID),
zap.String("nodeID", nodeToReserve.NodeID),
zap.String("allocationKey", allocKey),
zap.Int("reservations", len(sa.reservations)))
if nodeToReserve.preReserveConditions(ask) != nil {
return nil
}
return newReservedAllocationResult(nodeToReserve.NodeID, ask)
}

return nil
}

func (sa *Application) tryNodeDryRun(node *Node, ask *Allocation) (*AllocationResult, error) {
toAllocate := ask.GetAllocatedResource()
allocationKey := ask.GetAllocationKey()

if !node.IsSchedulable() || !node.FitInNode(ask.GetAllocatedResource()) {
return nil, nil
}

// create the key for the reservation
if !node.preAllocateCheck(toAllocate, allocationKey) {
// skip schedule onto node
return nil, nil
}
// skip the node if conditions can not be satisfied
if err := node.preAllocateConditions(ask); err != nil {
return nil, err
}

result := newAllocatedAllocationResult(node.NodeID, ask)
return result, nil
}

func (sa *Application) tryNodes(ask *Allocation, iterator NodeIterator) *AllocationResult {
var nodeToReserve *Node
scoreReserved := math.Inf(1)
Expand Down
Loading