Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions pkg/metrics/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type SchedulerMetrics struct {
tryPreemptionLatency prometheus.Histogram
tryNodeEvaluation prometheus.Histogram
lock locking.RWMutex
tryNodeCount *prometheus.CounterVec
}

// InitSchedulerMetrics to initialize scheduler metrics
Expand Down Expand Up @@ -169,6 +170,13 @@ func InitSchedulerMetrics() *SchedulerMetrics {
},
)

s.tryNodeCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: SchedulerSubsystem,
Name: "trynode_count",
Help: "Total number of nodes evaluated during scheduling cycle",
}, nil)
// Register the metrics
var metricsList = []prometheus.Collector{
s.containerAllocation,
Expand All @@ -181,6 +189,7 @@ func InitSchedulerMetrics() *SchedulerMetrics {
s.schedulingCycle,
s.tryNodeEvaluation,
s.tryPreemptionLatency,
s.tryNodeCount,
}
for _, metric := range metricsList {
if err := prometheus.Register(metric); err != nil {
Expand All @@ -197,6 +206,7 @@ func (m *SchedulerMetrics) Reset() {
m.application.Reset()
m.applicationSubmission.Reset()
m.containerAllocation.Reset()
m.tryNodeCount.Reset()
}

func SinceInSeconds(start time.Time) float64 {
Expand Down Expand Up @@ -274,6 +284,19 @@ func (m *SchedulerMetrics) GetSchedulingErrors() (int, error) {
return -1, err
}

func (m *SchedulerMetrics) IncTryNodeCount() {
m.tryNodeCount.With(nil).Inc()
}

func (m *SchedulerMetrics) GetTryNodeCount() (int64, error) {
metricDto := &dto.Metric{}
err := m.tryNodeCount.With(nil).Write(metricDto)
if err == nil {
return int64(*metricDto.Counter.Value), nil
}
return -1, err
}

func (m *SchedulerMetrics) IncTotalApplicationsNew() {
m.applicationSubmission.WithLabelValues(AppNew).Inc()
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/metrics/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ func TestTryNodeEvaluation(t *testing.T) {
verifyHistogram(t, "trynode_evaluation_milliseconds", 60, 1)
}

func TestTryNodeCount(t *testing.T) {
sm = getSchedulerMetrics(t)
defer unregisterMetrics()

sm.IncTryNodeCount()
count, err := sm.GetTryNodeCount()
assert.NilError(t, err)
assert.Equal(t, int64(1), count)
}

func getSchedulerMetrics(t *testing.T) *SchedulerMetrics {
unregisterMetrics()
return InitSchedulerMetrics()
Expand Down Expand Up @@ -244,4 +254,5 @@ func unregisterMetrics() {
prometheus.Unregister(sm.tryNodeLatency)
prometheus.Unregister(sm.tryNodeEvaluation)
prometheus.Unregister(sm.tryPreemptionLatency)
prometheus.Unregister(sm.tryNodeCount)
}
1 change: 1 addition & 0 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,7 @@ func (sa *Application) tryNodes(ask *Allocation, iterator NodeIterator) *Allocat
return true
}
tryNodeStart := time.Now()
metrics.GetSchedulerMetrics().IncTryNodeCount()
result, err := sa.tryNode(node, ask)
if err != nil {
if predicateErrors == nil {
Expand Down
106 changes: 105 additions & 1 deletion pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ type Queue struct {
template *template.Template
queueEvents *schedEvt.QueueEvents
appQueueMapping *AppQueueMapping // appID mapping to queues
applicationsTried int64 // number of applications tried per scheduling cycle
nodesTried int64 // number of nodes tried per scheduling cycle

locking.RWMutex
}
Expand Down Expand Up @@ -1445,13 +1447,32 @@ func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() N
if app.IsAccepted() && (!runnableInQueue || !runnableByUserLimit) {
continue
}

// Increment counter in root queue before calling app.tryAllocate
sq.incrementApplicationsTried()
countBefore, err := metrics.GetSchedulerMetrics().GetTryNodeCount()
if err != nil {
// if metric read fails, default to 0 to avoid disrupting scheduling
countBefore = 0
}

result := app.tryAllocate(headRoom, allowPreemption, preemptionDelay, &preemptAttemptsRemaining, iterator, fullIterator, getnode)

countAfter, err := metrics.GetSchedulerMetrics().GetTryNodeCount()
if err != nil {
// if metric read fails, default to 0 to avoid disrupting scheduling
countAfter = 0
}
sq.addNodesTried(countAfter - countBefore)

if result != nil {
log.Log(log.SchedQueue).Info("allocation found on queue",
zap.String("queueName", sq.QueuePath),
zap.String("appID", app.ApplicationID),
zap.Stringer("resultType", result.ResultType),
zap.Stringer("allocation", result.Request))
zap.Stringer("allocation", result.Request),
zap.Int64("applicationsTried:", sq.GetApplicationsTried()),
zap.Int64("nodesTried", sq.GetNodesTried()))
// if the app is still in Accepted state we're allocating placeholders.
// we want to count these apps as running
if app.IsAccepted() {
Expand Down Expand Up @@ -2038,3 +2059,86 @@ func (sq *Queue) recalculatePriority() int32 {
sq.currentPriority = curr
return priorityValueByPolicy(sq.priorityPolicy, sq.priorityOffset, curr)
}

func (sq *Queue) addNodesTried(count int64) {
if sq == nil {
return
}
// Find the root queue and increment its counter
root := sq
for root.parent != nil {
root = root.parent
}
root.Lock()
defer root.Unlock()
root.nodesTried += count
}

func (sq *Queue) GetNodesTried() int64 {
if sq == nil {
return 0
}
root := sq
for root.parent != nil {
root = root.parent
}
root.Lock()
defer root.Unlock()
return root.nodesTried
}

func (sq *Queue) ResetNodesTried() {
if sq == nil {
return
}
root := sq
for root.parent != nil {
root = root.parent
}
root.Lock()
defer root.Unlock()
root.nodesTried = 0
}

// Increment the root queue's counter for application allocation attempts
func (sq *Queue) incrementApplicationsTried() {
if sq == nil {
return
}
// Find the root queue and increment its counter
root := sq
for root.parent != nil {
root = root.parent
}
root.Lock()
defer root.Unlock()
root.applicationsTried++
}

// Retrieve the count of tryAllocate calls made during the current scheduling cycle
func (sq *Queue) GetApplicationsTried() int64 {
if sq == nil {
return 0
}
root := sq
for root.parent != nil {
root = root.parent
}
root.Lock()
defer root.Unlock()
return root.applicationsTried
}

// Clear the application allocation attempts counter for the new scheduling cycle
func (sq *Queue) ResetApplicationsTried() {
if sq == nil {
return
}
root := sq
for root.parent != nil {
root = root.parent
}
root.Lock()
defer root.Unlock()
root.applicationsTried = 0
}
40 changes: 40 additions & 0 deletions pkg/scheduler/objects/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2876,3 +2876,43 @@ func TestQueue_allocatedResFits_Other(t *testing.T) {
})
}
}

func TestQueueCounters(t *testing.T) {
// create the root
root, err := createRootQueue(nil)
assert.NilError(t, err, "queue create failed")
var parent *Queue
parent, err = createManagedQueue(root, "parent", true, nil)
assert.NilError(t, err, "failed to create parent queue")
var leaf *Queue
leaf, err = createManagedQueue(parent, "leaf", false, nil)
assert.NilError(t, err, "failed to create leaf queue")

// check applications tried
leaf.incrementApplicationsTried()
assert.Equal(t, root.GetApplicationsTried(), int64(1), "root count should be 1")
assert.Equal(t, parent.GetApplicationsTried(), int64(1), "parent count should be 1")
assert.Equal(t, leaf.GetApplicationsTried(), int64(1), "leaf count should be 1")
leaf.ResetApplicationsTried()
assert.Equal(t, root.GetApplicationsTried(), int64(0), "root count should be 0 after reset")
assert.Equal(t, parent.GetApplicationsTried(), int64(0), "parent count should be 0 after reset")
assert.Equal(t, leaf.GetApplicationsTried(), int64(0), "leaf count should be 0 after reset")

// check nodes tried
leaf.addNodesTried(5)
assert.Equal(t, root.GetNodesTried(), int64(5), "root count should be 5")
assert.Equal(t, parent.GetNodesTried(), int64(5), "parent count should be 5")
assert.Equal(t, leaf.GetNodesTried(), int64(5), "leaf count should be 5")
leaf.ResetNodesTried()
assert.Equal(t, root.GetNodesTried(), int64(0), "root count should be 0 after reset")
assert.Equal(t, parent.GetNodesTried(), int64(0), "parent count should be 0 after reset")
assert.Equal(t, leaf.GetNodesTried(), int64(0), "leaf count should be 0 after reset")

// check nil queue
var nilQueue *Queue
nilQueue.incrementApplicationsTried()
nilQueue.addNodesTried(1)
nilQueue.ResetApplicationsTried()
nilQueue.ResetNodesTried()
assert.Equal(t, nilQueue.GetApplicationsTried(), int64(0))
}
5 changes: 5 additions & 0 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,11 @@ func (pc *PartitionContext) tryPlaceholderAllocate() *objects.AllocationResult {
// nothing to do just return
return nil
}

// Reset the tryAllocate call counter at the beginning of each scheduling cycle
pc.root.ResetApplicationsTried()
pc.root.ResetNodesTried()

// try allocating from the root down
result := pc.root.TryPlaceholderAllocate(pc.GetNodeIterator, pc.GetNode)
if result != nil {
Expand Down
118 changes: 118 additions & 0 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1614,6 +1614,124 @@ func TestTryAllocate(t *testing.T) {
assertUserGroupResourceMaxLimits(t, getTestUserGroup(), resources.Multiply(res, 3), expectedQueuesMaxLimits)
}

func TestApplicationsTriedCount(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s", result)
}

app := newApplication(appID1, "default", "root.leaf")
res, err := resources.NewResourceFromConf(map[string]string{"vcore": "18"})
assert.NilError(t, err, "failed to create resource")
// add to the partition
err = partition.AddApplication(app)
assert.NilError(t, err, "failed to add app-1 to partition")
err = app.AddAllocationAsk(newAllocationAsk(allocKey, appID1, res))
assert.NilError(t, err, "failed to add ask alloc-1 to app-1")

app = newApplication(appID2, "default", "root.leaf")
// add to the partition
err = partition.AddApplication(app)
assert.NilError(t, err, "failed to add app-2 to partition")
res, err = resources.NewResourceFromConf(map[string]string{"vcore": "16"})
assert.NilError(t, err, "failed to create resource")
err = app.AddAllocationAsk(newAllocationAsk(allocKey2, appID2, res))
assert.NilError(t, err, "failed to add ask alloc-1 to app-2")

app = newApplication(appID3, "default", "root.leaf")
// add to the partition
err = partition.AddApplication(app)
assert.NilError(t, err, "failed to add app-3 to partition")
res, err = resources.NewResourceFromConf(map[string]string{"vcore": "4"})
assert.NilError(t, err, "failed to create resource")
err = app.AddAllocationAsk(newAllocationAsk(allocKey3, appID3, res))
assert.NilError(t, err, "failed to add ask alloc-1 to app-3")

expectedQueuesMaxLimits := make(map[string]map[string]interface{})
expectedQueuesMaxLimits["root"] = make(map[string]interface{})
expectedQueuesMaxLimits["root.leaf"] = make(map[string]interface{})
expectedQueuesMaxLimits["root"][maxresources] = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10, "vcores": 10})
expectedQueuesMaxLimits["root.leaf"][maxresources] = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 5, "vcores": 5})
expectedQueuesMaxLimits["root"][maxapplications] = uint64(10)
expectedQueuesMaxLimits["root.leaf"][maxapplications] = uint64(1)
assertUserGroupResourceMaxLimits(t, getTestUserGroup(), nil, expectedQueuesMaxLimits)

// first allocation should be app-1 and alloc-2
result := partition.tryAllocate()
if result == nil || result.Request == nil {
t.Fatal("allocation did not return any allocation")
}
assert.Equal(t, result.ResultType, objects.Allocated, "result type is not the expected allocated")
assert.Equal(t, result.Request.GetApplicationID(), appID3, "expected application app-2 to be allocated")
assert.Equal(t, result.Request.GetAllocationKey(), allocKey3, "expected ask alloc-2 to be allocated")
assert.Equal(t, partition.root.GetApplicationsTried(), int64(3), "expected 3 applications to be tried")
}

func TestNodesTriedCount(t *testing.T) {
setupUGM()
partition, err := newConfiguredPartition()
assert.NilError(t, err, "test partition create failed with error")
var res1 *resources.Resource
res1, err = resources.NewResourceFromConf(map[string]string{"vcore": "2"})
assert.NilError(t, err, "failed to create basic resource")
err = partition.AddNode(newNodeMaxResource("node-1", res1))
assert.NilError(t, err, "test node1 add failed unexpected")

var res2 *resources.Resource
res2, err = resources.NewResourceFromConf(map[string]string{"vcore": "3"})
assert.NilError(t, err, "failed to create basic resource")
err = partition.AddNode(newNodeMaxResource("node-2", res2))
assert.NilError(t, err, "test node2 add failed unexpected")

var res3 *resources.Resource
res3, err = resources.NewResourceFromConf(map[string]string{"vcore": "5"})
assert.NilError(t, err, "failed to create basic resource")
err = partition.AddNode(newNodeMaxResource("node-3", res3))
assert.NilError(t, err, "test node3 add failed unexpected")

var res4 *resources.Resource
res4, err = resources.NewResourceFromConf(map[string]string{"vcore": "10"})
assert.NilError(t, err, "failed to create basic resource")
err = partition.AddNode(newNodeMaxResource("node-4", res4))
assert.NilError(t, err, "test node4 add failed unexpected")

assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s", result)
}

app := newApplication(appID1, "default", "root.leaf")
res, err := resources.NewResourceFromConf(map[string]string{"vcore": "4"})
assert.NilError(t, err, "failed to create resource")
// add to the partition
err = partition.AddApplication(app)
assert.NilError(t, err, "failed to add app-1 to partition")
err = app.AddAllocationAsk(newAllocationAsk(allocKey, appID1, res))
assert.NilError(t, err, "failed to add ask alloc-1 to app-1")

expectedQueuesMaxLimits := make(map[string]map[string]interface{})
expectedQueuesMaxLimits["root"] = make(map[string]interface{})
expectedQueuesMaxLimits["root.leaf"] = make(map[string]interface{})
expectedQueuesMaxLimits["root"][maxresources] = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10, "vcores": 10})
expectedQueuesMaxLimits["root.leaf"][maxresources] = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 5, "vcores": 5})
expectedQueuesMaxLimits["root"][maxapplications] = uint64(10)
expectedQueuesMaxLimits["root.leaf"][maxapplications] = uint64(1)
assertUserGroupResourceMaxLimits(t, getTestUserGroup(), nil, expectedQueuesMaxLimits)

// first allocation should be app-1 and alloc-2
result := partition.tryAllocate()
if result == nil || result.Request == nil {
t.Fatal("allocation did not return any allocation")
}
assert.Equal(t, result.ResultType, objects.Allocated, "result type is not the expected allocated")
assert.Equal(t, result.Request.GetApplicationID(), appID1, "expected application app-2 to be allocated")
assert.Equal(t, result.Request.GetAllocationKey(), allocKey, "expected ask alloc-2 to be allocated")
assert.Equal(t, partition.root.GetApplicationsTried(), int64(1), "expected 1 applications to be tried")
assert.Equal(t, partition.root.GetNodesTried(), int64(1), "expected 1 nodes to be tried")
}

// allocate ask request with required node
func TestRequiredNodeReservation(t *testing.T) {
setupUGM()
Expand Down
Loading