Skip to content

Commit 4e2c901

Browse files
committed
[YUNIKORN-3121] Add REST Endpoint for Scheduling Order Visibility
1 parent ceb8ca5 commit 4e2c901

File tree

6 files changed

+211
-0
lines changed

6 files changed

+211
-0
lines changed

pkg/scheduler/objects/queue.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,6 +1233,49 @@ func (sq *Queue) sortQueues() []*Queue {
12331233
return sortedQueues
12341234
}
12351235

1236+
// GetSchedulingOrder returns a sorted shallow copy of the queues for this parent queue along with
1237+
// their eligible applications that would be tried in the current scheduling cycle.
1238+
// This follows the same logic as TryAllocate method.
1239+
// Only queues with a pending resource request are considered. The queues are sorted using the
1240+
// sorting type for the parent queue.
1241+
// Lock free call all locks are taken when needed in called functions
1242+
func (sq *Queue) GetSchedulingOrder() []*dao.SchedulingOrderDAO {
1243+
if sq.IsLeafQueue() {
1244+
// For leaf queues, return the queue with its eligible applications
1245+
appIDs := make([]string, 0)
1246+
1247+
// Process the apps (filters out app without pending requests) - same logic as TryAllocate
1248+
for _, app := range sq.sortApplications(false) {
1249+
runnableInQueue := sq.canRunApp(app.ApplicationID)
1250+
runnableByUserLimit := ugm.GetUserManager().CanRunApp(sq.QueuePath, app.ApplicationID, app.user)
1251+
app.updateRunnableStatus(runnableInQueue, runnableByUserLimit)
1252+
if app.IsAccepted() && (!runnableInQueue || !runnableByUserLimit) {
1253+
continue
1254+
}
1255+
appIDs = append(appIDs, app.ApplicationID)
1256+
}
1257+
1258+
// Only return this queue if it has eligible applications or pending resources
1259+
if len(appIDs) > 0 || resources.StrictlyGreaterThanZero(sq.GetPendingResource()) {
1260+
return []*dao.SchedulingOrderDAO{{
1261+
QueueName: sq.QueuePath,
1262+
ApplicationIDs: appIDs,
1263+
}}
1264+
}
1265+
return nil
1266+
} else {
1267+
// For parent queues, process child queues - same logic as TryAllocate
1268+
result := make([]*dao.SchedulingOrderDAO, 0)
1269+
1270+
// Process each sorted child queue using the original sortQueues method
1271+
for _, child := range sq.sortQueues() {
1272+
childInfo := child.GetSchedulingOrder()
1273+
result = append(result, childInfo...)
1274+
}
1275+
return result
1276+
}
1277+
}
1278+
12361279
// getHeadRoom returns the headroom for the queue. This can never be more than the headroom for the parent.
12371280
// In case there are no nodes in a newly started cluster and no queues have a limit configured this call
12381281
// will return nil.

pkg/scheduler/partition.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,11 @@ func (pc *PartitionContext) GetPartitionQueues() dao.PartitionQueueDAOInfo {
507507
return partitionQueueDAOInfo
508508
}
509509

510+
// GetPartitionSchedulingOrder builds the sorted queue info for the whole queue structure to pass to the webservice
511+
func (pc *PartitionContext) GetPartitionSchedulingOrder() []*dao.SchedulingOrderDAO {
512+
return pc.root.GetSchedulingOrder()
513+
}
514+
510515
// GetPlacementRules returns the current active rule set as dao to expose to the webservice
511516
func (pc *PartitionContext) GetPlacementRules() []*dao.RuleDAO {
512517
return pc.getPlacementManager().GetRulesDAO()
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dao
18+
19+
// SchedulingOrderDAO contains queue and application information for scheduling order
20+
type SchedulingOrderDAO struct {
21+
QueueName string `json:"queueName"`
22+
ApplicationIDs []string `json:"applicationIDs"` // Application IDs that would be tried in current scheduling cycle
23+
}

pkg/webservice/handlers.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,27 @@ func getPartitionQueues(w http.ResponseWriter, r *http.Request) {
714714
}
715715
}
716716

717+
func getPartitionSchedulingOrder(w http.ResponseWriter, r *http.Request) {
718+
writeHeaders(w, r.Method)
719+
vars := httprouter.ParamsFromContext(r.Context())
720+
if vars == nil {
721+
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
722+
return
723+
}
724+
partitionName := vars.ByName("partition")
725+
var partitionSchedulingOrderDAOInfo []*dao.SchedulingOrderDAO
726+
var partition = schedulerContext.Load().GetPartitionWithoutClusterID(partitionName)
727+
if partition != nil {
728+
partitionSchedulingOrderDAOInfo = partition.GetPartitionSchedulingOrder()
729+
} else {
730+
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusNotFound)
731+
return
732+
}
733+
if err := json.NewEncoder(w).Encode(partitionSchedulingOrderDAOInfo); err != nil {
734+
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
735+
}
736+
}
737+
717738
func getPartitionQueue(w http.ResponseWriter, r *http.Request) {
718739
writeHeaders(w, r.Method)
719740
vars := httprouter.ParamsFromContext(r.Context())

pkg/webservice/handlers_test.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3008,6 +3008,119 @@ func TestSetMaxRESTResponseSize(t *testing.T) {
30083008
assert.Equal(t, uint64(10000), maxRESTResponseSize.Load())
30093009
}
30103010

3011+
func TestGetPartitionSchedulingOrderHandler(t *testing.T) {
3012+
// Setup partition with default configuration
3013+
part := setup(t, configDefault, 1)
3014+
3015+
// Create applications with pending requests to trigger scheduling order
3016+
app1 := addApp(t, "app-1", part, "root.default", false)
3017+
app2 := addApp(t, "app-2", part, "root.default", false)
3018+
app3 := addApp(t, "app-3", part, "root.default", false)
3019+
3020+
// Add pending allocation requests to applications
3021+
res1 := &si.Resource{
3022+
Resources: map[string]*si.Quantity{"memory": {Value: 100}, "vcore": {Value: 1}},
3023+
}
3024+
ask1 := objects.NewAllocationFromSI(&si.Allocation{
3025+
ApplicationID: "app-1",
3026+
PartitionName: part.Name,
3027+
ResourcePerAlloc: res1})
3028+
err := app1.AddAllocationAsk(ask1)
3029+
assert.NilError(t, err, "ask should have been added to app-1")
3030+
3031+
res2 := &si.Resource{
3032+
Resources: map[string]*si.Quantity{"memory": {Value: 200}, "vcore": {Value: 2}},
3033+
}
3034+
ask2 := objects.NewAllocationFromSI(&si.Allocation{
3035+
ApplicationID: "app-2",
3036+
PartitionName: part.Name,
3037+
ResourcePerAlloc: res2})
3038+
err = app2.AddAllocationAsk(ask2)
3039+
assert.NilError(t, err, "ask should have been added to app-2")
3040+
3041+
res3 := &si.Resource{
3042+
Resources: map[string]*si.Quantity{"memory": {Value: 150}, "vcore": {Value: 1}},
3043+
}
3044+
ask3 := objects.NewAllocationFromSI(&si.Allocation{
3045+
ApplicationID: "app-3",
3046+
PartitionName: part.Name,
3047+
ResourcePerAlloc: res3})
3048+
err = app3.AddAllocationAsk(ask3)
3049+
assert.NilError(t, err, "ask should have been added to app-3")
3050+
3051+
NewWebApp(schedulerContext.Load(), nil)
3052+
3053+
// Test successful scheduling order retrieval
3054+
var req *http.Request
3055+
req, err = createRequest(t, "/ws/v1/partition/default/schedulingorder", map[string]string{"partition": partitionNameWithoutClusterID})
3056+
assert.NilError(t, err, "Get Scheduling Order Handler request failed")
3057+
resp := &MockResponseWriter{}
3058+
var schedulingOrderDao []*dao.SchedulingOrderDAO
3059+
getPartitionSchedulingOrder(resp, req)
3060+
err = json.Unmarshal(resp.outputBytes, &schedulingOrderDao)
3061+
assert.NilError(t, err, unmarshalError)
3062+
3063+
// Validate response structure and content
3064+
assert.Assert(t, len(schedulingOrderDao) > 0, "scheduling order should contain at least one queue")
3065+
3066+
// Verify that queues with pending resources are included
3067+
queueFound := false
3068+
for _, queueOrder := range schedulingOrderDao {
3069+
if queueOrder.QueueName == "root.default" {
3070+
queueFound = true
3071+
// Verify that applications with pending requests are included
3072+
assert.Assert(t, len(queueOrder.ApplicationIDs) >= 3, "queue root.default should have at least 3 applications")
3073+
// Check that our test applications are in the list
3074+
appFound1, appFound2, appFound3 := false, false, false
3075+
for _, appID := range queueOrder.ApplicationIDs {
3076+
if appID == "app-1" {
3077+
appFound1 = true
3078+
}
3079+
if appID == "app-2" {
3080+
appFound2 = true
3081+
}
3082+
if appID == "app-3" {
3083+
appFound3 = true
3084+
}
3085+
}
3086+
assert.Assert(t, appFound1, "app-1 should be in scheduling order")
3087+
assert.Assert(t, appFound2, "app-2 should be in scheduling order")
3088+
assert.Assert(t, appFound3, "app-3 should be in scheduling order")
3089+
}
3090+
}
3091+
assert.Assert(t, queueFound, "queue root.default should be found in scheduling order")
3092+
3093+
// Test nonexistent partition
3094+
req, err = createRequest(t, "/ws/v1/partition/default/schedulingorder", map[string]string{"partition": "notexists"})
3095+
assert.NilError(t, err, "Get Scheduling Order Handler request failed")
3096+
resp = &MockResponseWriter{}
3097+
getPartitionSchedulingOrder(resp, req)
3098+
assertPartitionNotExists(t, resp)
3099+
3100+
// Test missing params name
3101+
req, err = http.NewRequest("GET", "/ws/v1/partition/default/schedulingorder", strings.NewReader(""))
3102+
assert.NilError(t, err, "Get Scheduling Order Handler request failed")
3103+
resp = &MockResponseWriter{}
3104+
getPartitionSchedulingOrder(resp, req)
3105+
assertParamsMissing(t, resp)
3106+
3107+
// Test empty scheduling order (partition with no pending requests)
3108+
// Create a new partition with no applications
3109+
emptyPart := setup(t, configDefault, 1)
3110+
// Add an application but without pending requests
3111+
addApp(t, "app-empty", emptyPart, "root.default", false)
3112+
3113+
req, err = createRequest(t, "/ws/v1/partition/default/schedulingorder", map[string]string{"partition": partitionNameWithoutClusterID})
3114+
assert.NilError(t, err, "Get Scheduling Order Handler request failed")
3115+
resp = &MockResponseWriter{}
3116+
var emptySchedulingOrderDao []*dao.SchedulingOrderDAO
3117+
getPartitionSchedulingOrder(resp, req)
3118+
err = json.Unmarshal(resp.outputBytes, &emptySchedulingOrderDao)
3119+
assert.NilError(t, err, unmarshalError)
3120+
// Should return empty array when no queues have pending resources
3121+
assert.Equal(t, len(emptySchedulingOrderDao), 0, "scheduling order should be empty when no pending requests exist")
3122+
}
3123+
30113124
type ResponseRecorderWithDeadline struct {
30123125
*httptest.ResponseRecorder
30133126
setWriteFails bool

pkg/webservice/routes.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,12 @@ var webRoutes = routes{
9090
"/ws/v1/partition/:partition/queues",
9191
getPartitionQueues,
9292
},
93+
route{
94+
"Scheduler",
95+
"GET",
96+
"/ws/v1/partition/:partition/schedulingorder",
97+
getPartitionSchedulingOrder,
98+
},
9399
route{
94100
"Scheduler",
95101
"GET",

0 commit comments

Comments
 (0)