Skip to content

Commit 6bdbaee

Browse files
committed
[YUNIKORN-3119] Add Metrics for Monitoring Applications and Nodes Attempted in Each Scheduling Cycle
1 parent ceb8ca5 commit 6bdbaee

File tree

5 files changed

+252
-1
lines changed

5 files changed

+252
-1
lines changed

pkg/metrics/scheduler.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ type SchedulerMetrics struct {
6868
tryPreemptionLatency prometheus.Histogram
6969
tryNodeEvaluation prometheus.Histogram
7070
lock locking.RWMutex
71+
tryNodeCount *prometheus.CounterVec
7172
}
7273

7374
// InitSchedulerMetrics to initialize scheduler metrics
@@ -169,6 +170,13 @@ func InitSchedulerMetrics() *SchedulerMetrics {
169170
},
170171
)
171172

173+
s.tryNodeCount = prometheus.NewCounterVec(
174+
prometheus.CounterOpts{
175+
Namespace: Namespace,
176+
Subsystem: SchedulerSubsystem,
177+
Name: "trynode_count",
178+
Help: "Total number of nodes evaluated during scheduling cycle",
179+
}, nil)
172180
// Register the metrics
173181
var metricsList = []prometheus.Collector{
174182
s.containerAllocation,
@@ -181,6 +189,7 @@ func InitSchedulerMetrics() *SchedulerMetrics {
181189
s.schedulingCycle,
182190
s.tryNodeEvaluation,
183191
s.tryPreemptionLatency,
192+
s.tryNodeCount,
184193
}
185194
for _, metric := range metricsList {
186195
if err := prometheus.Register(metric); err != nil {
@@ -197,6 +206,7 @@ func (m *SchedulerMetrics) Reset() {
197206
m.application.Reset()
198207
m.applicationSubmission.Reset()
199208
m.containerAllocation.Reset()
209+
m.tryNodeCount.Reset()
200210
}
201211

202212
func SinceInSeconds(start time.Time) float64 {
@@ -274,6 +284,19 @@ func (m *SchedulerMetrics) GetSchedulingErrors() (int, error) {
274284
return -1, err
275285
}
276286

287+
func (m *SchedulerMetrics) IncTryNodeCount() {
288+
m.tryNodeCount.With(nil).Inc()
289+
}
290+
291+
func (m *SchedulerMetrics) GetTryNodeCount() (int64, error) {
292+
metricDto := &dto.Metric{}
293+
err := m.tryNodeCount.With(nil).Write(metricDto)
294+
if err == nil {
295+
return int64(*metricDto.Counter.Value), nil
296+
}
297+
return -1, err
298+
}
299+
277300
func (m *SchedulerMetrics) IncTotalApplicationsNew() {
278301
m.applicationSubmission.WithLabelValues(AppNew).Inc()
279302
}

pkg/scheduler/objects/application.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1482,6 +1482,7 @@ func (sa *Application) tryNodes(ask *Allocation, iterator NodeIterator) *Allocat
14821482
return true
14831483
}
14841484
tryNodeStart := time.Now()
1485+
metrics.GetSchedulerMetrics().IncTryNodeCount()
14851486
result, err := sa.tryNode(node, ask)
14861487
if err != nil {
14871488
if predicateErrors == nil {

pkg/scheduler/objects/queue.go

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ type Queue struct {
8989
template *template.Template
9090
queueEvents *schedEvt.QueueEvents
9191
appQueueMapping *AppQueueMapping // appID mapping to queues
92+
applicationsTried int64 // number of applications tried per scheduling cycle
93+
nodesTried int64 // number of nodes tried per scheduling cycle
9294

9395
locking.RWMutex
9496
}
@@ -1445,13 +1447,32 @@ func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() N
14451447
if app.IsAccepted() && (!runnableInQueue || !runnableByUserLimit) {
14461448
continue
14471449
}
1450+
1451+
// Increment counter in root queue before calling app.tryAllocate
1452+
sq.incrementApplicationsTried()
1453+
countBefore, err := metrics.GetSchedulerMetrics().GetTryNodeCount()
1454+
if err != nil {
1455+
// if metric read fails, default to 0 to avoid disrupting scheduling
1456+
countBefore = 0
1457+
}
1458+
14481459
result := app.tryAllocate(headRoom, allowPreemption, preemptionDelay, &preemptAttemptsRemaining, iterator, fullIterator, getnode)
1460+
1461+
countAfter, err := metrics.GetSchedulerMetrics().GetTryNodeCount()
1462+
if err != nil {
1463+
// if metric read fails, default to 0 to avoid disrupting scheduling
1464+
countAfter = 0
1465+
}
1466+
sq.addNodesTried(countAfter - countBefore)
1467+
14491468
if result != nil {
14501469
log.Log(log.SchedQueue).Info("allocation found on queue",
14511470
zap.String("queueName", sq.QueuePath),
14521471
zap.String("appID", app.ApplicationID),
14531472
zap.Stringer("resultType", result.ResultType),
1454-
zap.Stringer("allocation", result.Request))
1473+
zap.Stringer("allocation", result.Request),
1474+
zap.Int64("applicationsTried:", sq.GetApplicationsTried()),
1475+
zap.Int64("nodesTried", sq.GetNodesTried()))
14551476
// if the app is still in Accepted state we're allocating placeholders.
14561477
// we want to count these apps as running
14571478
if app.IsAccepted() {
@@ -2038,3 +2059,86 @@ func (sq *Queue) recalculatePriority() int32 {
20382059
sq.currentPriority = curr
20392060
return priorityValueByPolicy(sq.priorityPolicy, sq.priorityOffset, curr)
20402061
}
2062+
2063+
func (sq *Queue) addNodesTried(count int64) {
2064+
if sq == nil {
2065+
return
2066+
}
2067+
// Find the root queue and increment its counter
2068+
root := sq
2069+
for root.parent != nil {
2070+
root = root.parent
2071+
}
2072+
root.Lock()
2073+
defer root.Unlock()
2074+
root.nodesTried += count
2075+
}
2076+
2077+
func (sq *Queue) GetNodesTried() int64 {
2078+
if sq == nil {
2079+
return 0
2080+
}
2081+
root := sq
2082+
for root.parent != nil {
2083+
root = root.parent
2084+
}
2085+
root.Lock()
2086+
defer root.Unlock()
2087+
return root.nodesTried
2088+
}
2089+
2090+
func (sq *Queue) ResetNodesTried() {
2091+
if sq == nil {
2092+
return
2093+
}
2094+
root := sq
2095+
for root.parent != nil {
2096+
root = root.parent
2097+
}
2098+
root.Lock()
2099+
defer root.Unlock()
2100+
root.nodesTried = 0
2101+
}
2102+
2103+
// Increment the root queue's counter for application allocation attempts
2104+
func (sq *Queue) incrementApplicationsTried() {
2105+
if sq == nil {
2106+
return
2107+
}
2108+
// Find the root queue and increment its counter
2109+
root := sq
2110+
for root.parent != nil {
2111+
root = root.parent
2112+
}
2113+
root.Lock()
2114+
defer root.Unlock()
2115+
root.applicationsTried++
2116+
}
2117+
2118+
// Retrieve the count of tryAllocate calls made during the current scheduling cycle
2119+
func (sq *Queue) GetApplicationsTried() int64 {
2120+
if sq == nil {
2121+
return 0
2122+
}
2123+
root := sq
2124+
for root.parent != nil {
2125+
root = root.parent
2126+
}
2127+
root.Lock()
2128+
defer root.Unlock()
2129+
return root.applicationsTried
2130+
}
2131+
2132+
// Clear the application allocation attempts counter for the new scheduling cycle
2133+
func (sq *Queue) ResetApplicationsTried() {
2134+
if sq == nil {
2135+
return
2136+
}
2137+
root := sq
2138+
for root.parent != nil {
2139+
root = root.parent
2140+
}
2141+
root.Lock()
2142+
defer root.Unlock()
2143+
root.applicationsTried = 0
2144+
}

pkg/scheduler/partition.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -853,6 +853,11 @@ func (pc *PartitionContext) tryPlaceholderAllocate() *objects.AllocationResult {
853853
// nothing to do just return
854854
return nil
855855
}
856+
857+
// Reset the tryAllocate call counter at the beginning of each scheduling cycle
858+
pc.root.ResetApplicationsTried()
859+
pc.root.ResetNodesTried()
860+
856861
// try allocating from the root down
857862
result := pc.root.TryPlaceholderAllocate(pc.GetNodeIterator, pc.GetNode)
858863
if result != nil {

pkg/scheduler/partition_test.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1614,6 +1614,124 @@ func TestTryAllocate(t *testing.T) {
16141614
assertUserGroupResourceMaxLimits(t, getTestUserGroup(), resources.Multiply(res, 3), expectedQueuesMaxLimits)
16151615
}
16161616

1617+
func TestApplicationsTriedCount(t *testing.T) {
1618+
setupUGM()
1619+
partition := createQueuesNodes(t)
1620+
assert.Assert(t, partition != nil, "partition create failed")
1621+
if result := partition.tryAllocate(); result != nil {
1622+
t.Fatalf("empty cluster allocate returned allocation: %s", result)
1623+
}
1624+
1625+
app := newApplication(appID1, "default", "root.leaf")
1626+
res, err := resources.NewResourceFromConf(map[string]string{"vcore": "18"})
1627+
assert.NilError(t, err, "failed to create resource")
1628+
// add to the partition
1629+
err = partition.AddApplication(app)
1630+
assert.NilError(t, err, "failed to add app-1 to partition")
1631+
err = app.AddAllocationAsk(newAllocationAsk(allocKey, appID1, res))
1632+
assert.NilError(t, err, "failed to add ask alloc-1 to app-1")
1633+
1634+
app = newApplication(appID2, "default", "root.leaf")
1635+
// add to the partition
1636+
err = partition.AddApplication(app)
1637+
assert.NilError(t, err, "failed to add app-2 to partition")
1638+
res, err = resources.NewResourceFromConf(map[string]string{"vcore": "16"})
1639+
assert.NilError(t, err, "failed to create resource")
1640+
err = app.AddAllocationAsk(newAllocationAsk(allocKey2, appID2, res))
1641+
assert.NilError(t, err, "failed to add ask alloc-1 to app-2")
1642+
1643+
app = newApplication(appID3, "default", "root.leaf")
1644+
// add to the partition
1645+
err = partition.AddApplication(app)
1646+
assert.NilError(t, err, "failed to add app-3 to partition")
1647+
res, err = resources.NewResourceFromConf(map[string]string{"vcore": "4"})
1648+
assert.NilError(t, err, "failed to create resource")
1649+
err = app.AddAllocationAsk(newAllocationAsk(allocKey3, appID3, res))
1650+
assert.NilError(t, err, "failed to add ask alloc-1 to app-3")
1651+
1652+
expectedQueuesMaxLimits := make(map[string]map[string]interface{})
1653+
expectedQueuesMaxLimits["root"] = make(map[string]interface{})
1654+
expectedQueuesMaxLimits["root.leaf"] = make(map[string]interface{})
1655+
expectedQueuesMaxLimits["root"][maxresources] = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10, "vcores": 10})
1656+
expectedQueuesMaxLimits["root.leaf"][maxresources] = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 5, "vcores": 5})
1657+
expectedQueuesMaxLimits["root"][maxapplications] = uint64(10)
1658+
expectedQueuesMaxLimits["root.leaf"][maxapplications] = uint64(1)
1659+
assertUserGroupResourceMaxLimits(t, getTestUserGroup(), nil, expectedQueuesMaxLimits)
1660+
1661+
// first allocation should be app-1 and alloc-2
1662+
result := partition.tryAllocate()
1663+
if result == nil || result.Request == nil {
1664+
t.Fatal("allocation did not return any allocation")
1665+
}
1666+
assert.Equal(t, result.ResultType, objects.Allocated, "result type is not the expected allocated")
1667+
assert.Equal(t, result.Request.GetApplicationID(), appID3, "expected application app-2 to be allocated")
1668+
assert.Equal(t, result.Request.GetAllocationKey(), allocKey3, "expected ask alloc-2 to be allocated")
1669+
assert.Equal(t, partition.root.GetApplicationsTried(), int64(3), "expected 3 applications to be tried")
1670+
}
1671+
1672+
func TestNodesTriedCount(t *testing.T) {
1673+
setupUGM()
1674+
partition, err := newConfiguredPartition()
1675+
assert.NilError(t, err, "test partition create failed with error")
1676+
var res1 *resources.Resource
1677+
res1, err = resources.NewResourceFromConf(map[string]string{"vcore": "2"})
1678+
assert.NilError(t, err, "failed to create basic resource")
1679+
err = partition.AddNode(newNodeMaxResource("node-1", res1))
1680+
assert.NilError(t, err, "test node1 add failed unexpected")
1681+
1682+
var res2 *resources.Resource
1683+
res2, err = resources.NewResourceFromConf(map[string]string{"vcore": "3"})
1684+
assert.NilError(t, err, "failed to create basic resource")
1685+
err = partition.AddNode(newNodeMaxResource("node-2", res2))
1686+
assert.NilError(t, err, "test node2 add failed unexpected")
1687+
1688+
var res3 *resources.Resource
1689+
res3, err = resources.NewResourceFromConf(map[string]string{"vcore": "5"})
1690+
assert.NilError(t, err, "failed to create basic resource")
1691+
err = partition.AddNode(newNodeMaxResource("node-3", res3))
1692+
assert.NilError(t, err, "test node3 add failed unexpected")
1693+
1694+
var res4 *resources.Resource
1695+
res4, err = resources.NewResourceFromConf(map[string]string{"vcore": "10"})
1696+
assert.NilError(t, err, "failed to create basic resource")
1697+
err = partition.AddNode(newNodeMaxResource("node-4", res4))
1698+
assert.NilError(t, err, "test node4 add failed unexpected")
1699+
1700+
assert.Assert(t, partition != nil, "partition create failed")
1701+
if result := partition.tryAllocate(); result != nil {
1702+
t.Fatalf("empty cluster allocate returned allocation: %s", result)
1703+
}
1704+
1705+
app := newApplication(appID1, "default", "root.leaf")
1706+
res, err := resources.NewResourceFromConf(map[string]string{"vcore": "4"})
1707+
assert.NilError(t, err, "failed to create resource")
1708+
// add to the partition
1709+
err = partition.AddApplication(app)
1710+
assert.NilError(t, err, "failed to add app-1 to partition")
1711+
err = app.AddAllocationAsk(newAllocationAsk(allocKey, appID1, res))
1712+
assert.NilError(t, err, "failed to add ask alloc-1 to app-1")
1713+
1714+
expectedQueuesMaxLimits := make(map[string]map[string]interface{})
1715+
expectedQueuesMaxLimits["root"] = make(map[string]interface{})
1716+
expectedQueuesMaxLimits["root.leaf"] = make(map[string]interface{})
1717+
expectedQueuesMaxLimits["root"][maxresources] = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10, "vcores": 10})
1718+
expectedQueuesMaxLimits["root.leaf"][maxresources] = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 5, "vcores": 5})
1719+
expectedQueuesMaxLimits["root"][maxapplications] = uint64(10)
1720+
expectedQueuesMaxLimits["root.leaf"][maxapplications] = uint64(1)
1721+
assertUserGroupResourceMaxLimits(t, getTestUserGroup(), nil, expectedQueuesMaxLimits)
1722+
1723+
// first allocation should be app-1 and alloc-2
1724+
result := partition.tryAllocate()
1725+
if result == nil || result.Request == nil {
1726+
t.Fatal("allocation did not return any allocation")
1727+
}
1728+
assert.Equal(t, result.ResultType, objects.Allocated, "result type is not the expected allocated")
1729+
assert.Equal(t, result.Request.GetApplicationID(), appID1, "expected application app-2 to be allocated")
1730+
assert.Equal(t, result.Request.GetAllocationKey(), allocKey, "expected ask alloc-2 to be allocated")
1731+
assert.Equal(t, partition.root.GetApplicationsTried(), int64(1), "expected 1 applications to be tried")
1732+
assert.Equal(t, partition.root.GetNodesTried(), int64(1), "expected 1 nodes to be tried")
1733+
}
1734+
16171735
// allocate ask request with required node
16181736
func TestRequiredNodeReservation(t *testing.T) {
16191737
setupUGM()

0 commit comments

Comments
 (0)