Skip to content

Commit fde27e5

Browse files
adrian-lin-1-0-0pbacsko
authored andcommitted
[YUNIKORN-2057] Optimize FindQueueByAppID (#1037)
Closes: #1037 Signed-off-by: Peter Bacsko <[email protected]>
1 parent 446c1b3 commit fde27e5

File tree

12 files changed

+420
-265
lines changed

12 files changed

+420
-265
lines changed
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package objects
20+
21+
import "github.com/apache/yunikorn-core/pkg/locking"
22+
23+
// AppQueueMapping maintains a mapping between application IDs and their corresponding queues.
24+
type AppQueueMapping struct {
25+
byAppID map[string]*Queue
26+
locking.RWMutex
27+
}
28+
29+
func NewAppQueueMapping() *AppQueueMapping {
30+
return &AppQueueMapping{
31+
byAppID: make(map[string]*Queue),
32+
}
33+
}
34+
35+
func (aqm *AppQueueMapping) AddAppQueueMapping(appID string, queue *Queue) {
36+
aqm.Lock()
37+
defer aqm.Unlock()
38+
aqm.byAppID[appID] = queue
39+
}
40+
41+
func (aqm *AppQueueMapping) GetQueueByAppId(appID string) *Queue {
42+
aqm.RLock()
43+
defer aqm.RUnlock()
44+
return aqm.byAppID[appID]
45+
}
46+
47+
func (aqm *AppQueueMapping) RemoveAppQueueMapping(appID string) {
48+
aqm.Lock()
49+
defer aqm.Unlock()
50+
delete(aqm.byAppID, appID)
51+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package objects
20+
21+
import (
22+
"testing"
23+
24+
"gotest.tools/v3/assert"
25+
)
26+
27+
func TestNewAppQueueMapping(t *testing.T) {
28+
aqm := NewAppQueueMapping()
29+
assert.Assert(t, aqm != nil, "expected non-nil AppQueueMapping")
30+
assert.Equal(t, 0, len(aqm.byAppID), "expected empty byAppID map")
31+
}
32+
func TestAppQueueMappingOperations(t *testing.T) {
33+
aqm := NewAppQueueMapping()
34+
queue := &Queue{}
35+
appID := "app-1234"
36+
37+
// Test AddAppQueueMapping
38+
aqm.AddAppQueueMapping(appID, queue)
39+
assert.Equal(t, 1, len(aqm.byAppID), "expected 1 entry in byAppID map")
40+
41+
// Test FindQueueByAppID
42+
foundQueue := aqm.GetQueueByAppId(appID)
43+
assert.Equal(t, foundQueue, queue, "expected to find the correct queue for appID %s", appID)
44+
45+
// Test RemoveAppQueueMapping
46+
aqm.RemoveAppQueueMapping(appID)
47+
assert.Equal(t, 0, len(aqm.byAppID), "expected 0 entries in byAppID map after removal")
48+
}

pkg/scheduler/objects/application.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1395,7 +1395,7 @@ func (sa *Application) tryRequiredNodePreemption(reserve *reservation, ask *Allo
13951395
zap.String("allocation name", ask.GetAllocationName()),
13961396
zap.Int("no.of victims", len(victims)))
13971397
for _, victim := range victims {
1398-
if victimQueue := sa.queue.FindQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
1398+
if victimQueue := sa.queue.GetQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
13991399
victimQueue.IncPreemptingResource(victim.GetAllocatedResource())
14001400
}
14011401
victim.MarkPreempted()

pkg/scheduler/objects/application_test.go

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,7 +1040,7 @@ func TestUpdateAllocationResourcePending(t *testing.T) {
10401040
app := newApplication(appID1, "default", "root.a")
10411041
root, err := createRootQueue(nil)
10421042
assert.NilError(t, err, "failed to create root queue")
1043-
queue, err := createDynamicQueue(root, "test", false)
1043+
queue, err := createDynamicQueue(root, "test", false, nil)
10441044
assert.NilError(t, err, "failed to create test queue")
10451045
app.SetQueue(queue)
10461046

@@ -1088,7 +1088,7 @@ func TestUpdateAllocationResourceAllocated(t *testing.T) {
10881088
app := newApplication(appID1, "default", "root.a")
10891089
root, err := createRootQueue(nil)
10901090
assert.NilError(t, err, "failed to create root queue")
1091-
queue, err := createDynamicQueue(root, "test", false)
1091+
queue, err := createDynamicQueue(root, "test", false, nil)
10921092
assert.NilError(t, err, "failed to create test queue")
10931093
app.SetQueue(queue)
10941094

@@ -1138,7 +1138,7 @@ func TestQueueUpdate(t *testing.T) {
11381138

11391139
root, err := createRootQueue(nil)
11401140
assert.NilError(t, err, "failed to create root queue")
1141-
queue, err := createDynamicQueue(root, "test", false)
1141+
queue, err := createDynamicQueue(root, "test", false, nil)
11421142
assert.NilError(t, err, "failed to create test queue")
11431143
app.SetQueue(queue)
11441144
assert.Equal(t, app.GetQueuePath(), "root.test")
@@ -2195,19 +2195,21 @@ func TestTryAllocatePreemptQueue(t *testing.T) {
21952195
getNode := func(nodeID string) *Node {
21962196
return nodeMap[nodeID]
21972197
}
2198+
appQueueMapping := NewAppQueueMapping()
21982199

21992200
rootQ, err := createRootQueue(map[string]string{"first": "20"})
22002201
assert.NilError(t, err)
2201-
parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true, map[string]string{"first": "20"}, map[string]string{"first": "10"})
2202+
parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true, map[string]string{"first": "20"}, map[string]string{"first": "10"}, appQueueMapping)
22022203
assert.NilError(t, err)
2203-
childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false, nil, map[string]string{"first": "5"})
2204+
childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false, nil, map[string]string{"first": "5"}, appQueueMapping)
22042205
assert.NilError(t, err)
2205-
childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false, nil, map[string]string{"first": "5"})
2206+
childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false, nil, map[string]string{"first": "5"}, appQueueMapping)
22062207
assert.NilError(t, err)
22072208

22082209
app1 := newApplication(appID1, "default", "root.parent.child1")
22092210
app1.SetQueue(childQ1)
2210-
childQ1.applications[appID1] = app1
2211+
childQ1.AddApplication(app1)
2212+
appQueueMapping.AddAppQueueMapping(appID1, childQ1)
22112213
ask1 := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
22122214
err = app1.AddAllocationAsk(ask1)
22132215
assert.NilError(t, err)
@@ -2217,7 +2219,8 @@ func TestTryAllocatePreemptQueue(t *testing.T) {
22172219

22182220
app2 := newApplication(appID2, "default", "root.parent.child2")
22192221
app2.SetQueue(childQ2)
2220-
childQ2.applications[appID2] = app2
2222+
childQ2.AddApplication(app2)
2223+
appQueueMapping.AddAppQueueMapping(appID2, childQ2)
22212224
ask3 := newAllocationAsk("alloc3", appID2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
22222225
ask3.allowPreemptOther = true
22232226
err = app2.AddAllocationAsk(ask3)
@@ -2276,20 +2279,23 @@ func createPreemptNodeTestSetup(t *testing.T) (func() NodeIterator, func(NodeID
22762279
return nodeMap[nodeID]
22772280
}
22782281

2282+
appQueueMapping := NewAppQueueMapping()
2283+
22792284
rootQ, err := createRootQueue(map[string]string{"first": "40"})
22802285
assert.NilError(t, err)
2281-
parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true, map[string]string{"first": "20"}, map[string]string{"first": "10"})
2286+
parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true, map[string]string{"first": "20"}, map[string]string{"first": "10"}, appQueueMapping)
22822287
assert.NilError(t, err)
2283-
unlimitedQ, err := createManagedQueueGuaranteed(rootQ, "unlimited", false, nil, nil)
2288+
unlimitedQ, err := createManagedQueueGuaranteed(rootQ, "unlimited", false, nil, nil, appQueueMapping)
22842289
assert.NilError(t, err)
2285-
childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false, nil, map[string]string{"first": "5"})
2290+
childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false, nil, map[string]string{"first": "5"}, appQueueMapping)
22862291
assert.NilError(t, err)
2287-
childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false, nil, map[string]string{"first": "5"})
2292+
childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false, nil, map[string]string{"first": "5"}, appQueueMapping)
22882293
assert.NilError(t, err)
22892294

22902295
app0 := newApplication(appID0, "default", "root.unlimited")
22912296
app0.SetQueue(unlimitedQ)
2292-
unlimitedQ.applications[appID0] = app0
2297+
unlimitedQ.AddApplication(app0)
2298+
appQueueMapping.AddAppQueueMapping(appID0, unlimitedQ)
22932299
ask00 := newAllocationAsk("alloc0-0", appID0, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}))
22942300
err = app0.AddAllocationAsk(ask00)
22952301
assert.NilError(t, err)
@@ -2299,7 +2305,8 @@ func createPreemptNodeTestSetup(t *testing.T) (func() NodeIterator, func(NodeID
22992305

23002306
app1 := newApplication(appID1, "default", "root.parent.child1")
23012307
app1.SetQueue(childQ1)
2302-
childQ1.applications[appID1] = app1
2308+
childQ1.AddApplication(app1)
2309+
appQueueMapping.AddAppQueueMapping(appID1, childQ1)
23032310
ask1 := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
23042311
err = app1.AddAllocationAsk(ask1)
23052312
assert.NilError(t, err)
@@ -2309,7 +2316,8 @@ func createPreemptNodeTestSetup(t *testing.T) (func() NodeIterator, func(NodeID
23092316

23102317
app2 := newApplication(appID2, "default", "root.parent.child2")
23112318
app2.SetQueue(childQ2)
2312-
childQ2.applications[appID2] = app2
2319+
childQ2.AddApplication(app2)
2320+
appQueueMapping.AddAppQueueMapping(appID2, childQ2)
23132321
ask3 := newAllocationAsk("alloc3", appID2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
23142322
ask3.allowPreemptOther = true
23152323
err = app2.AddAllocationAsk(ask3)
@@ -2358,7 +2366,7 @@ func createPreemptNodeWithReservationsTestSetup(t *testing.T) (func() NodeIterat
23582366

23592367
app3 := newApplication(appID3, "default", "root.parent.child2")
23602368
app3.SetQueue(childQ2)
2361-
childQ2.applications[appID3] = app3
2369+
childQ2.AddApplication(app3)
23622370
ask4 := newAllocationAsk("alloc4", appID3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
23632371
ask4.allowPreemptOther = true
23642372
ask4.priority = math.MaxInt32
@@ -3257,13 +3265,15 @@ func TestRequiredNodePreemption(t *testing.T) {
32573265
getNode := func(nodeID string) *Node {
32583266
return node
32593267
}
3268+
appQueueMapping := NewAppQueueMapping()
32603269

32613270
// set queue
32623271
rootQ, err := createRootQueue(map[string]string{"first": "20"})
32633272
assert.NilError(t, err)
3264-
childQ, err := createManagedQueue(rootQ, "default", false, map[string]string{"first": "20"})
3273+
childQ, err := createManagedQueueWithAppQueueMapping(rootQ, "default", false, map[string]string{"first": "20"}, appQueueMapping)
32653274
assert.NilError(t, err)
32663275
app.SetQueue(childQ)
3276+
appQueueMapping.AddAppQueueMapping(app.ApplicationID, childQ)
32673277

32683278
// add an ask
32693279
mockEvents := mock.NewEventSystem()

pkg/scheduler/objects/preemption.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,7 @@ func (p *Preemptor) TryPreemption() (*AllocationResult, bool) {
649649

650650
// preempt the victims
651651
for _, victim := range finalVictims {
652-
if victimQueue := p.queue.FindQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
652+
if victimQueue := p.queue.GetQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
653653
victimQueue.IncPreemptingResource(victim.GetAllocatedResource())
654654
victim.MarkPreempted()
655655
log.Log(log.SchedPreemption).Info("Preempting task",

0 commit comments

Comments
 (0)