Skip to content

Commit 56347fa

Browse files
author
mani
committed
[YUNIKORN-3143] Victim ordering and selection
1 parent c8b6d5e commit 56347fa

File tree

8 files changed

+446
-262
lines changed

8 files changed

+446
-262
lines changed

pkg/log/logger.go

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -54,42 +54,43 @@ const (
5454

5555
// Defined loggers: when adding new loggers, ids must be sequential, and all must be added to the loggers slice in the same order
5656
var (
57-
Core = &LoggerHandle{id: 0, name: "core"}
58-
Test = &LoggerHandle{id: 1, name: "test"}
59-
Deprecation = &LoggerHandle{id: 2, name: "deprecation"}
60-
Config = &LoggerHandle{id: 3, name: "core.config"}
61-
Entrypoint = &LoggerHandle{id: 4, name: "core.entrypoint"}
62-
Events = &LoggerHandle{id: 5, name: "core.events"}
63-
OpenTracing = &LoggerHandle{id: 6, name: "core.opentracing"}
64-
Resources = &LoggerHandle{id: 7, name: "core.resources"}
65-
REST = &LoggerHandle{id: 8, name: "core.rest"}
66-
RMProxy = &LoggerHandle{id: 9, name: "core.rmproxy"}
67-
RPC = &LoggerHandle{id: 10, name: "core.rpc"}
68-
Metrics = &LoggerHandle{id: 11, name: "core.metrics"}
69-
Scheduler = &LoggerHandle{id: 12, name: "core.scheduler"}
70-
SchedAllocation = &LoggerHandle{id: 13, name: "core.scheduler.allocation"}
71-
SchedApplication = &LoggerHandle{id: 14, name: "core.scheduler.application"}
72-
SchedAppUsage = &LoggerHandle{id: 15, name: "core.scheduler.application.usage"}
73-
SchedContext = &LoggerHandle{id: 16, name: "core.scheduler.context"}
74-
SchedFSM = &LoggerHandle{id: 17, name: "core.scheduler.fsm"}
75-
SchedHealth = &LoggerHandle{id: 18, name: "core.scheduler.health"}
76-
SchedNode = &LoggerHandle{id: 19, name: "core.scheduler.node"}
77-
SchedPartition = &LoggerHandle{id: 20, name: "core.scheduler.partition"}
78-
SchedPreemption = &LoggerHandle{id: 21, name: "core.scheduler.preemption"}
79-
SchedQueue = &LoggerHandle{id: 22, name: "core.scheduler.queue"}
80-
SchedReservation = &LoggerHandle{id: 23, name: "core.scheduler.reservation"}
81-
SchedUGM = &LoggerHandle{id: 24, name: "core.scheduler.ugm"}
82-
SchedNodesUsage = &LoggerHandle{id: 25, name: "core.scheduler.nodesusage"}
83-
Security = &LoggerHandle{id: 26, name: "core.security"}
84-
Utils = &LoggerHandle{id: 27, name: "core.utils"}
85-
Diagnostics = &LoggerHandle{id: 28, name: "core.diagnostics"}
57+
Core = &LoggerHandle{id: 0, name: "core"}
58+
Test = &LoggerHandle{id: 1, name: "test"}
59+
Deprecation = &LoggerHandle{id: 2, name: "deprecation"}
60+
Config = &LoggerHandle{id: 3, name: "core.config"}
61+
Entrypoint = &LoggerHandle{id: 4, name: "core.entrypoint"}
62+
Events = &LoggerHandle{id: 5, name: "core.events"}
63+
OpenTracing = &LoggerHandle{id: 6, name: "core.opentracing"}
64+
Resources = &LoggerHandle{id: 7, name: "core.resources"}
65+
REST = &LoggerHandle{id: 8, name: "core.rest"}
66+
RMProxy = &LoggerHandle{id: 9, name: "core.rmproxy"}
67+
RPC = &LoggerHandle{id: 10, name: "core.rpc"}
68+
Metrics = &LoggerHandle{id: 11, name: "core.metrics"}
69+
Scheduler = &LoggerHandle{id: 12, name: "core.scheduler"}
70+
SchedAllocation = &LoggerHandle{id: 13, name: "core.scheduler.allocation"}
71+
SchedApplication = &LoggerHandle{id: 14, name: "core.scheduler.application"}
72+
SchedAppUsage = &LoggerHandle{id: 15, name: "core.scheduler.application.usage"}
73+
SchedContext = &LoggerHandle{id: 16, name: "core.scheduler.context"}
74+
SchedFSM = &LoggerHandle{id: 17, name: "core.scheduler.fsm"}
75+
SchedHealth = &LoggerHandle{id: 18, name: "core.scheduler.health"}
76+
SchedNode = &LoggerHandle{id: 19, name: "core.scheduler.node"}
77+
SchedPartition = &LoggerHandle{id: 20, name: "core.scheduler.partition"}
78+
SchedPreemption = &LoggerHandle{id: 21, name: "core.scheduler.preemption"}
79+
SchedQueue = &LoggerHandle{id: 22, name: "core.scheduler.queue"}
80+
SchedReservation = &LoggerHandle{id: 23, name: "core.scheduler.reservation"}
81+
SchedUGM = &LoggerHandle{id: 24, name: "core.scheduler.ugm"}
82+
SchedNodesUsage = &LoggerHandle{id: 25, name: "core.scheduler.nodesusage"}
83+
Security = &LoggerHandle{id: 26, name: "core.security"}
84+
Utils = &LoggerHandle{id: 27, name: "core.utils"}
85+
Diagnostics = &LoggerHandle{id: 28, name: "core.diagnostics"}
86+
ShedQuotaChangePreemption = &LoggerHandle{id: 29, name: "core.scheduler.preemption.quotachange"}
8687
)
8788

8889
// this tracks all the known logger handles, used to preallocate the real logger instances when configuration changes
8990
var loggers = []*LoggerHandle{
9091
Core, Test, Deprecation, Config, Entrypoint, Events, OpenTracing, Resources, REST, RMProxy, RPC, Metrics,
9192
Scheduler, SchedAllocation, SchedApplication, SchedAppUsage, SchedContext, SchedFSM, SchedHealth, SchedNode,
92-
SchedPartition, SchedPreemption, SchedQueue, SchedReservation, SchedUGM, SchedNodesUsage, Security, Utils, Diagnostics,
93+
SchedPartition, SchedPreemption, SchedQueue, SchedReservation, SchedUGM, SchedNodesUsage, Security, Utils, Diagnostics, ShedQuotaChangePreemption,
9394
}
9495

9596
// structure to hold all current logger configuration state

pkg/log/logger_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestLoggerIDs(t *testing.T) {
3939
_ = Log(Test)
4040

4141
// validate logger count
42-
assert.Equal(t, 29, len(loggers), "wrong logger count")
42+
assert.Equal(t, 30, len(loggers), "wrong logger count")
4343

4444
// validate that all loggers are populated and have sequential ids
4545
for i := 0; i < len(loggers); i++ {
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package objects
20+
21+
import (
22+
"sort"
23+
24+
"github.com/apache/yunikorn-core/pkg/common/resources"
25+
)
26+
27+
// SortAllocations Sort allocations based on the following criteria in the specified order:
28+
// 1. By type (regular pods, opted out pods, driver/owner pods),
29+
// 2. By priority (least priority ask placed first),
30+
// 3. By Create time or age of the ask (younger ask placed first),
31+
// 4. By resource (ask with lesser allocated resources placed first)
32+
func SortAllocations(allocations []*Allocation) {
33+
sort.SliceStable(allocations, func(i, j int) bool {
34+
l := allocations[i]
35+
r := allocations[j]
36+
37+
// sort based on the type
38+
lAskType := 1 // regular pod
39+
if l.IsOriginator() { // driver/owner pod
40+
lAskType = 3
41+
} else if !l.IsAllowPreemptSelf() { // opted out pod
42+
lAskType = 2
43+
}
44+
rAskType := 1
45+
if r.IsOriginator() {
46+
rAskType = 3
47+
} else if !r.IsAllowPreemptSelf() {
48+
rAskType = 2
49+
}
50+
if lAskType < rAskType {
51+
return true
52+
}
53+
if lAskType > rAskType {
54+
return false
55+
}
56+
57+
// sort based on the priority
58+
lPriority := l.GetPriority()
59+
rPriority := r.GetPriority()
60+
if lPriority < rPriority {
61+
return true
62+
}
63+
if lPriority > rPriority {
64+
return false
65+
}
66+
67+
// sort based on the age
68+
if !l.GetCreateTime().Equal(r.GetCreateTime()) {
69+
return l.GetCreateTime().After(r.GetCreateTime())
70+
}
71+
72+
// sort based on the allocated resource
73+
lResource := l.GetAllocatedResource()
74+
rResource := r.GetAllocatedResource()
75+
if !resources.Equals(lResource, rResource) {
76+
delta := resources.Sub(lResource, rResource)
77+
return !resources.StrictlyGreaterThanZero(delta)
78+
}
79+
return true
80+
})
81+
}
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package objects
20+
21+
import (
22+
"testing"
23+
"time"
24+
25+
"gotest.tools/v3/assert"
26+
27+
"github.com/apache/yunikorn-core/pkg/common/resources"
28+
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
29+
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
30+
)
31+
32+
func createAllocationAsk(allocationKey string, app string, allowPreemption bool, isOriginator bool, priority int32, res *resources.Resource) *Allocation {
33+
tags := map[string]string{}
34+
siAsk := &si.Allocation{
35+
AllocationKey: allocationKey,
36+
ApplicationID: app,
37+
PartitionName: "default",
38+
Priority: priority,
39+
ResourcePerAlloc: res.ToProto(),
40+
Originator: isOriginator,
41+
PreemptionPolicy: &si.PreemptionPolicy{AllowPreemptSelf: allowPreemption, AllowPreemptOther: true},
42+
AllocationTags: tags,
43+
}
44+
ask := NewAllocationFromSI(siAsk)
45+
return ask
46+
}
47+
48+
func createAllocation(allocationKey string, app string, nodeID string, allowPreemption bool, isOriginator bool, priority int32, requiredNode bool, res *resources.Resource) *Allocation {
49+
tags := map[string]string{}
50+
siAsk := &si.Allocation{
51+
AllocationKey: allocationKey,
52+
ApplicationID: app,
53+
PartitionName: "default",
54+
Priority: priority,
55+
ResourcePerAlloc: res.ToProto(),
56+
Originator: isOriginator,
57+
PreemptionPolicy: &si.PreemptionPolicy{AllowPreemptSelf: allowPreemption, AllowPreemptOther: true},
58+
AllocationTags: tags,
59+
NodeID: nodeID,
60+
}
61+
if requiredNode {
62+
tags[siCommon.DomainYuniKorn+siCommon.KeyRequiredNode] = nodeID
63+
}
64+
ask := NewAllocationFromSI(siAsk)
65+
return ask
66+
}
67+
68+
func prepareAllocationAsks(t *testing.T, node *Node) []*Allocation {
69+
createTime := time.Now()
70+
71+
result := make([]*Allocation, 0)
72+
73+
// regular pods
74+
alloc1 := createAllocation("ask1", "app1", node.NodeID, true, false, 10, false,
75+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
76+
assert.Assert(t, node.TryAddAllocation(alloc1))
77+
result = append(result, alloc1)
78+
79+
alloc2 := createAllocation("ask2", "app1", node.NodeID, true, false, 10, false,
80+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}))
81+
alloc2.createTime = createTime
82+
assert.Assert(t, node.TryAddAllocation(alloc2))
83+
result = append(result, alloc2)
84+
85+
alloc3 := createAllocation("ask3", "app1", node.NodeID, true, false, 15, false,
86+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
87+
assert.Assert(t, node.TryAddAllocation(alloc3))
88+
result = append(result, alloc3)
89+
90+
alloc4 := createAllocation("ask4", "app1", node.NodeID, true, false, 10, false,
91+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
92+
alloc4.createTime = createTime
93+
assert.Assert(t, node.TryAddAllocation(alloc4))
94+
result = append(result, alloc4)
95+
96+
alloc5 := createAllocation("ask5", "app1", node.NodeID, true, false, 5, false,
97+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
98+
assert.Assert(t, node.TryAddAllocation(alloc5))
99+
result = append(result, alloc5)
100+
101+
// opted out pods
102+
alloc6 := createAllocation("ask6", "app1", node.NodeID, false, false, 10, false,
103+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
104+
assert.Assert(t, node.TryAddAllocation(alloc6))
105+
result = append(result, alloc6)
106+
107+
alloc7 := createAllocation("ask7", "app1", node.NodeID, false, false, 10, false,
108+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}))
109+
alloc7.createTime = createTime
110+
assert.Assert(t, node.TryAddAllocation(alloc7))
111+
result = append(result, alloc7)
112+
113+
alloc8 := createAllocation("ask8", "app1", node.NodeID, false, false, 15, false,
114+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
115+
assert.Assert(t, node.TryAddAllocation(alloc8))
116+
result = append(result, alloc8)
117+
118+
// driver/owner pods
119+
alloc9 := createAllocation("ask9", "app1", node.NodeID, false, true, 10, false,
120+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
121+
alloc9.createTime = createTime
122+
assert.Assert(t, node.TryAddAllocation(alloc9))
123+
result = append(result, alloc9)
124+
125+
alloc10 := createAllocation("ask10", "app1", node.NodeID, true, true, 5, false,
126+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
127+
assert.Assert(t, node.TryAddAllocation(alloc10))
128+
result = append(result, alloc10)
129+
130+
return result
131+
}
132+
133+
func removeAllocationAsks(node *Node, asks []*Allocation) {
134+
for _, ask := range asks {
135+
node.RemoveAllocation(ask.GetAllocationKey())
136+
}
137+
}
138+
139+
func assignAllocationsToQueue(allocations []*Allocation, queue *Queue) {
140+
for _, allocation := range allocations {
141+
var app *Application
142+
var ok bool
143+
if _, ok = queue.applications[allocation.applicationID]; !ok {
144+
app = newApplication(allocation.applicationID, "default", queue.QueuePath)
145+
app.SetQueue(queue)
146+
queue.applications[allocation.applicationID] = app
147+
} else {
148+
app = queue.applications[allocation.applicationID]
149+
}
150+
app.AddAllocation(allocation)
151+
}
152+
}
153+
154+
func removeAllocationFromQueue(queue *Queue) {
155+
queue.applications = make(map[string]*Application)
156+
}
157+
158+
// regular pods
159+
// ask1: pri - 10, create time - 1, res - 10
160+
// ask2: pri - 10, create time - 2, res - 8
161+
// ask3: pri - 15, create time - 3, res - 10
162+
// ask4: pri - 10, create time - 2, res - 5
163+
// ask5: pri - 5, create time - 4, res - 5
164+
165+
// opted out pods
166+
// ask6: pri - 10, create time - 5, res - 10
167+
// ask7: pri - 10, create time - 2, res - 8
168+
// ask8: pri - 15, create time - 6, res - 10
169+
170+
// driver/owner pods
171+
// ask9: pri - 10, create time - 2, res - 5
172+
// ask10: pri - 5, create time - 4, res - 5
173+
174+
// original asks order: 6, 7, 8, 9, 10, 1, 2, 3, 4, 5
175+
// expected sorted asks o/p: 5, 1, 4, 2, 3, 6, 7, 8, 10, 9
176+
func TestSortAllocations(t *testing.T) {
177+
node := NewNode(&si.NodeInfo{
178+
NodeID: "node",
179+
Attributes: nil,
180+
SchedulableResource: &si.Resource{
181+
Resources: map[string]*si.Quantity{"first": {Value: 100}},
182+
},
183+
})
184+
185+
asks := prepareAllocationAsks(t, node)
186+
SortAllocations(asks)
187+
sortedAsks := asks
188+
189+
// assert regular pods
190+
assert.Equal(t, sortedAsks[0].GetAllocationKey(), "ask5")
191+
assert.Equal(t, sortedAsks[1].GetAllocationKey(), "ask1")
192+
assert.Equal(t, sortedAsks[2].GetAllocationKey(), "ask4")
193+
assert.Equal(t, sortedAsks[3].GetAllocationKey(), "ask2")
194+
assert.Equal(t, sortedAsks[4].GetAllocationKey(), "ask3")
195+
196+
// assert opted out pods
197+
assert.Equal(t, sortedAsks[5].GetAllocationKey(), "ask6")
198+
assert.Equal(t, sortedAsks[6].GetAllocationKey(), "ask7")
199+
assert.Equal(t, sortedAsks[7].GetAllocationKey(), "ask8")
200+
201+
// assert driver/owner pods
202+
assert.Equal(t, sortedAsks[8].GetAllocationKey(), "ask10")
203+
assert.Equal(t, sortedAsks[9].GetAllocationKey(), "ask9")
204+
205+
removeAllocationAsks(node, asks)
206+
}

0 commit comments

Comments
 (0)