Skip to content

Commit 0d896fd

Browse files
manimani
authored andcommitted
[YUNIKORN-3141] Evaluate Preemption possibilities through preconditions (#1038)
Closes: #1038 Signed-off-by: mani <[email protected]>
1 parent 6bd254c commit 0d896fd

File tree

4 files changed

+201
-24
lines changed

4 files changed

+201
-24
lines changed

pkg/scheduler/objects/application_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2238,29 +2238,29 @@ func TestTryAllocatePreemptQueue(t *testing.T) {
22382238
assert.Assert(t, alloc2 != nil, "alloc2 expected")
22392239

22402240
// preemption max attempts exhausted
2241-
preemptionAttemptsRemaining = 0
2242-
result3 := app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
2241+
maxAttemptsExhausted := 0
2242+
result3 := app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), true, 30*time.Second, &maxAttemptsExhausted, iterator, iterator, getNode)
22432243
assert.Assert(t, result3 == nil, "result3 not expected")
22442244
assert.Assert(t, !alloc2.IsPreempted(), "alloc2 should not have been preempted")
22452245
log := ask3.GetAllocationLog()
22462246
assert.Equal(t, log[0].Message, common.PreemptionMaxAttemptsExhausted)
2247-
assert.Equal(t, preemptionAttemptsRemaining, 0)
2247+
assert.Equal(t, maxAttemptsExhausted, 0)
22482248

2249-
preemptionAttemptsRemaining = 10
2249+
maxAttemptsDecrease := 10
22502250

22512251
// on first attempt, not enough time has passed
2252-
result3 = app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
2252+
result3 = app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), true, 30*time.Second, &maxAttemptsDecrease, iterator, iterator, getNode)
22532253
assert.Assert(t, result3 == nil, "result3 not expected")
22542254
assert.Assert(t, !alloc2.IsPreempted(), "alloc2 should not have been preempted")
22552255
assertAllocationLog(t, ask3)
2256-
assert.Equal(t, preemptionAttemptsRemaining, 10)
2256+
assert.Equal(t, maxAttemptsDecrease, 10)
22572257

22582258
// pass the time and try again
22592259
ask3.createTime = ask3.createTime.Add(-30 * time.Second)
2260-
result3 = app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
2260+
result3 = app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), true, 30*time.Second, &maxAttemptsDecrease, iterator, iterator, getNode)
22612261
assert.Assert(t, result3 != nil && result3.Request != nil && result3.ResultType == Reserved, "alloc3 should be a reservation")
22622262
assert.Assert(t, alloc2.IsPreempted(), "alloc2 should have been preempted")
2263-
assert.Equal(t, preemptionAttemptsRemaining, 9)
2263+
assert.Equal(t, maxAttemptsDecrease, 9)
22642264
}
22652265

22662266
func TestTryAllocatePreemptNode(t *testing.T) {

pkg/scheduler/objects/queue.go

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -74,22 +74,24 @@ type Queue struct {
7474
// The queue properties should be treated as immutable the value is a merge of the
7575
// parent properties with the config for this queue only manipulated during creation
7676
// of the queue or via a queue configuration update.
77-
properties map[string]string
78-
adminACL security.ACL // admin ACL
79-
submitACL security.ACL // submit ACL
80-
maxResource *resources.Resource // When not set, max = nil
81-
guaranteedResource *resources.Resource // When not set, Guaranteed == 0
82-
isLeaf bool // this is a leaf queue or not (i.e. parent)
83-
isManaged bool // queue is part of the config, not auto created
84-
stateMachine *fsm.FSM // the state of the queue for scheduling
85-
stateTime time.Time // last time the state was updated (needed for cleanup)
86-
maxRunningApps uint64
87-
runningApps uint64
88-
allocatingAcceptedApps map[string]bool
89-
template *template.Template
90-
queueEvents *schedEvt.QueueEvents
91-
appQueueMapping *AppQueueMapping // appID mapping to queues
92-
quotaChangePreemptionDelay uint64
77+
properties map[string]string
78+
adminACL security.ACL // admin ACL
79+
submitACL security.ACL // submit ACL
80+
maxResource *resources.Resource // When not set, max = nil
81+
guaranteedResource *resources.Resource // When not set, Guaranteed == 0
82+
isLeaf bool // this is a leaf queue or not (i.e. parent)
83+
isManaged bool // queue is part of the config, not auto created
84+
stateMachine *fsm.FSM // the state of the queue for scheduling
85+
stateTime time.Time // last time the state was updated (needed for cleanup)
86+
maxRunningApps uint64
87+
runningApps uint64
88+
allocatingAcceptedApps map[string]bool
89+
template *template.Template
90+
queueEvents *schedEvt.QueueEvents
91+
appQueueMapping *AppQueueMapping // appID mapping to queues
92+
quotaChangePreemptionDelay uint64
93+
hasTriggerredQuotaChangePreemption bool
94+
isQuotaChangePreemptionRunning bool
9395

9496
locking.RWMutex
9597
}
@@ -2076,3 +2078,27 @@ func (sq *Queue) recalculatePriority() int32 {
20762078
sq.currentPriority = curr
20772079
return priorityValueByPolicy(sq.priorityPolicy, sq.priorityOffset, curr)
20782080
}
2081+
2082+
func (sq *Queue) MarkTriggerredQuotaChangePreemption() {
2083+
sq.Lock()
2084+
defer sq.Unlock()
2085+
sq.hasTriggerredQuotaChangePreemption = true
2086+
}
2087+
2088+
func (sq *Queue) HasTriggerredQuotaChangePreemption() bool {
2089+
sq.RLock()
2090+
defer sq.RUnlock()
2091+
return sq.hasTriggerredQuotaChangePreemption
2092+
}
2093+
2094+
func (sq *Queue) MarkQuotaChangePreemptionRunning() {
2095+
sq.Lock()
2096+
defer sq.Unlock()
2097+
sq.isQuotaChangePreemptionRunning = true
2098+
}
2099+
2100+
func (sq *Queue) IsQuotaChangePreemptionRunning() bool {
2101+
sq.RLock()
2102+
defer sq.RUnlock()
2103+
return sq.isQuotaChangePreemptionRunning
2104+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package objects
20+
21+
type QuotaChangePreemptionContext struct {
22+
queue *Queue
23+
}
24+
25+
func NewQuotaChangePreemptor(queue *Queue) *QuotaChangePreemptionContext {
26+
preemptor := &QuotaChangePreemptionContext{
27+
queue: queue,
28+
}
29+
return preemptor
30+
}
31+
32+
func (qcp *QuotaChangePreemptionContext) CheckPreconditions() bool {
33+
if !qcp.queue.IsLeafQueue() || !qcp.queue.IsManaged() || qcp.queue.HasTriggerredQuotaChangePreemption() || qcp.queue.IsQuotaChangePreemptionRunning() {
34+
return false
35+
}
36+
if qcp.queue.GetMaxResource().StrictlyGreaterThanOnlyExisting(qcp.queue.GetAllocatedResource()) {
37+
return false
38+
}
39+
return true
40+
}
41+
42+
func (qcp *QuotaChangePreemptionContext) tryPreemption() {
43+
// quota change preemption has started, so mark the flag
44+
qcp.queue.MarkQuotaChangePreemptionRunning()
45+
46+
// Preemption logic goes here
47+
48+
// quota change preemption has really evicted victims, so mark the flag
49+
qcp.queue.MarkTriggerredQuotaChangePreemption()
50+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package objects
20+
21+
import (
22+
"testing"
23+
24+
"gotest.tools/v3/assert"
25+
26+
"github.com/apache/yunikorn-core/pkg/common/configs"
27+
"github.com/apache/yunikorn-core/pkg/common/resources"
28+
)
29+
30+
func TestQuotaChangeCheckPreconditions(t *testing.T) {
31+
parentConfig := configs.QueueConfig{
32+
Name: "parent",
33+
Parent: true,
34+
Resources: configs.Resources{
35+
Max: map[string]string{"memory": "1000"},
36+
},
37+
}
38+
parent, err := NewConfiguredQueue(parentConfig, nil, false, nil)
39+
assert.NilError(t, err)
40+
41+
leafRes := configs.Resources{
42+
Max: map[string]string{"memory": "1000"},
43+
}
44+
leaf, err := NewConfiguredQueue(configs.QueueConfig{
45+
Name: "leaf",
46+
Resources: leafRes,
47+
}, parent, false, nil)
48+
assert.NilError(t, err)
49+
50+
dynamicLeaf, err := NewConfiguredQueue(configs.QueueConfig{
51+
Name: "dynamic-leaf",
52+
Resources: leafRes,
53+
}, parent, false, nil)
54+
assert.NilError(t, err)
55+
dynamicLeaf.isManaged = false
56+
57+
alreadyPreemptionRunning, err := NewConfiguredQueue(configs.QueueConfig{
58+
Name: "leaf-already-preemption-running",
59+
Resources: leafRes,
60+
}, parent, false, nil)
61+
assert.NilError(t, err)
62+
alreadyPreemptionRunning.MarkQuotaChangePreemptionRunning()
63+
64+
alreadyTriggeredPreemption, err := NewConfiguredQueue(configs.QueueConfig{
65+
Name: "leaf-already-triggerred-running",
66+
Resources: leafRes,
67+
}, parent, false, nil)
68+
assert.NilError(t, err)
69+
alreadyTriggeredPreemption.MarkTriggerredQuotaChangePreemption()
70+
71+
usageExceededMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
72+
Name: "leaf-usage-exceeded-max",
73+
Resources: leafRes,
74+
}, parent, false, nil)
75+
assert.NilError(t, err)
76+
usageExceededMaxQueue.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000})
77+
78+
testCases := []struct {
79+
name string
80+
queue *Queue
81+
preconditionResult bool
82+
}{
83+
{"parent queue", parent, false},
84+
{"leaf queue", leaf, false},
85+
{"dynamic leaf queue", dynamicLeaf, false},
86+
{"leaf queue, already preemption process started or running", alreadyPreemptionRunning, false},
87+
{"leaf queue, already triggerred preemption", alreadyTriggeredPreemption, false},
88+
{"leaf queue, usage exceeded max resources", usageExceededMaxQueue, true},
89+
}
90+
for _, tc := range testCases {
91+
t.Run(tc.name, func(t *testing.T) {
92+
preemptor := NewQuotaChangePreemptor(tc.queue)
93+
assert.Equal(t, preemptor.CheckPreconditions(), tc.preconditionResult)
94+
if tc.preconditionResult {
95+
preemptor.tryPreemption()
96+
assert.Equal(t, tc.queue.HasTriggerredQuotaChangePreemption(), true)
97+
assert.Equal(t, tc.queue.IsQuotaChangePreemptionRunning(), true)
98+
}
99+
})
100+
}
101+
}

0 commit comments

Comments
 (0)