Skip to content

Commit 43636af

Browse files
manimani
authored andcommitted
[YUNIKORN-3141] Evaluate Preemption possibilities through preconditions
1 parent 6bd254c commit 43636af

File tree

3 files changed

+193
-16
lines changed

3 files changed

+193
-16
lines changed

pkg/scheduler/objects/queue.go

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -74,22 +74,24 @@ type Queue struct {
7474
// The queue properties should be treated as immutable the value is a merge of the
7575
// parent properties with the config for this queue only manipulated during creation
7676
// of the queue or via a queue configuration update.
77-
properties map[string]string
78-
adminACL security.ACL // admin ACL
79-
submitACL security.ACL // submit ACL
80-
maxResource *resources.Resource // When not set, max = nil
81-
guaranteedResource *resources.Resource // When not set, Guaranteed == 0
82-
isLeaf bool // this is a leaf queue or not (i.e. parent)
83-
isManaged bool // queue is part of the config, not auto created
84-
stateMachine *fsm.FSM // the state of the queue for scheduling
85-
stateTime time.Time // last time the state was updated (needed for cleanup)
86-
maxRunningApps uint64
87-
runningApps uint64
88-
allocatingAcceptedApps map[string]bool
89-
template *template.Template
90-
queueEvents *schedEvt.QueueEvents
91-
appQueueMapping *AppQueueMapping // appID mapping to queues
92-
quotaChangePreemptionDelay uint64
77+
properties map[string]string
78+
adminACL security.ACL // admin ACL
79+
submitACL security.ACL // submit ACL
80+
maxResource *resources.Resource // When not set, max = nil
81+
guaranteedResource *resources.Resource // When not set, Guaranteed == 0
82+
isLeaf bool // this is a leaf queue or not (i.e. parent)
83+
isManaged bool // queue is part of the config, not auto created
84+
stateMachine *fsm.FSM // the state of the queue for scheduling
85+
stateTime time.Time // last time the state was updated (needed for cleanup)
86+
maxRunningApps uint64
87+
runningApps uint64
88+
allocatingAcceptedApps map[string]bool
89+
template *template.Template
90+
queueEvents *schedEvt.QueueEvents
91+
appQueueMapping *AppQueueMapping // appID mapping to queues
92+
quotaChangePreemptionDelay uint64
93+
hasTriggerredQuotaChangePreemption bool
94+
isQuotaChangePreemptionRunning bool
9395

9496
locking.RWMutex
9597
}
@@ -2076,3 +2078,27 @@ func (sq *Queue) recalculatePriority() int32 {
20762078
sq.currentPriority = curr
20772079
return priorityValueByPolicy(sq.priorityPolicy, sq.priorityOffset, curr)
20782080
}
2081+
2082+
func (sq *Queue) MarkTriggerredQuotaChangePreemption() {
2083+
sq.Lock()
2084+
defer sq.Unlock()
2085+
sq.hasTriggerredQuotaChangePreemption = true
2086+
}
2087+
2088+
func (sq *Queue) HasTriggerredQuotaChangePreemption() bool {
2089+
sq.RLock()
2090+
defer sq.RUnlock()
2091+
return sq.hasTriggerredQuotaChangePreemption
2092+
}
2093+
2094+
func (sq *Queue) MarkQuotaChangePreemptionRunning() {
2095+
sq.Lock()
2096+
defer sq.Unlock()
2097+
sq.isQuotaChangePreemptionRunning = true
2098+
}
2099+
2100+
func (sq *Queue) IsQuotaChangePreemptionRunning() bool {
2101+
sq.RLock()
2102+
defer sq.RUnlock()
2103+
return sq.isQuotaChangePreemptionRunning
2104+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package objects
20+
21+
type QuotaChangePreemptionContext struct {
22+
queue *Queue
23+
}
24+
25+
func NewQuotaChangePreemptor(queue *Queue) *QuotaChangePreemptionContext {
26+
preemptor := &QuotaChangePreemptionContext{
27+
queue: queue,
28+
}
29+
return preemptor
30+
}
31+
32+
func (qcp *QuotaChangePreemptionContext) CheckPreconditions() bool {
33+
if !qcp.queue.IsLeafQueue() || !qcp.queue.IsManaged() || qcp.queue.HasTriggerredQuotaChangePreemption() || qcp.queue.IsQuotaChangePreemptionRunning() {
34+
return false
35+
}
36+
if qcp.queue.GetMaxResource().StrictlyGreaterThanOnlyExisting(qcp.queue.GetAllocatedResource()) {
37+
return false
38+
}
39+
return true
40+
}
41+
42+
func (qcp *QuotaChangePreemptionContext) tryPreemption() {
43+
// quota change preemption has started, so mark the flag
44+
qcp.queue.MarkQuotaChangePreemptionRunning()
45+
46+
// Preemption logic goes here
47+
48+
// quota change preemption has really evicted victims, so mark the flag
49+
qcp.queue.MarkTriggerredQuotaChangePreemption()
50+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package objects
20+
21+
import (
22+
"testing"
23+
24+
"gotest.tools/v3/assert"
25+
26+
"github.com/apache/yunikorn-core/pkg/common/configs"
27+
"github.com/apache/yunikorn-core/pkg/common/resources"
28+
)
29+
30+
func TestQuotaChangeCheckPreconditions(t *testing.T) {
31+
parentConfig := configs.QueueConfig{
32+
Name: "parent",
33+
Parent: true,
34+
Resources: configs.Resources{
35+
Max: map[string]string{"memory": "1000"},
36+
},
37+
}
38+
parent, err := NewConfiguredQueue(parentConfig, nil, false, nil)
39+
assert.NilError(t, err)
40+
41+
leafRes := configs.Resources{
42+
Max: map[string]string{"memory": "1000"},
43+
}
44+
leaf, err := NewConfiguredQueue(configs.QueueConfig{
45+
Name: "leaf",
46+
Resources: leafRes,
47+
}, parent, false, nil)
48+
assert.NilError(t, err)
49+
50+
dynamicLeaf, err := NewConfiguredQueue(configs.QueueConfig{
51+
Name: "dynamic-leaf",
52+
Resources: leafRes,
53+
}, parent, false, nil)
54+
assert.NilError(t, err)
55+
dynamicLeaf.isManaged = false
56+
57+
alreadyPreemptionRunning, err := NewConfiguredQueue(configs.QueueConfig{
58+
Name: "leaf-already-preemption-running",
59+
Resources: leafRes,
60+
}, parent, false, nil)
61+
assert.NilError(t, err)
62+
alreadyPreemptionRunning.MarkQuotaChangePreemptionRunning()
63+
64+
alreadyTriggeredPreemption, err := NewConfiguredQueue(configs.QueueConfig{
65+
Name: "leaf-already-triggerred-running",
66+
Resources: leafRes,
67+
}, parent, false, nil)
68+
assert.NilError(t, err)
69+
alreadyTriggeredPreemption.MarkTriggerredQuotaChangePreemption()
70+
71+
usageExceededMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
72+
Name: "leaf-usage-exceeded-max",
73+
Resources: leafRes,
74+
}, parent, false, nil)
75+
assert.NilError(t, err)
76+
usageExceededMaxQueue.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000})
77+
78+
testCases := []struct {
79+
name string
80+
queue *Queue
81+
preconditionResult bool
82+
}{
83+
{"parent queue", parent, false},
84+
{"leaf queue", leaf, false},
85+
{"dynamic leaf queue", dynamicLeaf, false},
86+
{"leaf queue, already preemption process started or running", alreadyPreemptionRunning, false},
87+
{"leaf queue, already triggerred preemption", alreadyTriggeredPreemption, false},
88+
{"leaf queue, usage exceeded max resources", usageExceededMaxQueue, true},
89+
}
90+
for _, tc := range testCases {
91+
t.Run(tc.name, func(t *testing.T) {
92+
preemptor := NewQuotaChangePreemptor(tc.queue)
93+
assert.Equal(t, preemptor.CheckPreconditions(), tc.preconditionResult)
94+
if tc.preconditionResult {
95+
preemptor.tryPreemption()
96+
assert.Equal(t, tc.queue.HasTriggerredQuotaChangePreemption(), true)
97+
assert.Equal(t, tc.queue.IsQuotaChangePreemptionRunning(), true)
98+
}
99+
})
100+
}
101+
}

0 commit comments

Comments
 (0)