Skip to content

Commit 7c05c32

Browse files
authored
Feat: support sharding (#44)
Signed-off-by: Yin Da <[email protected]>
1 parent ac89a08 commit 7c05c32

File tree

14 files changed

+692
-4
lines changed

14 files changed

+692
-4
lines changed

controller/sharding/cache.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
Copyright 2023 The KubeVela Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package sharding
18+
19+
import (
20+
"k8s.io/apimachinery/pkg/labels"
21+
"k8s.io/apimachinery/pkg/runtime"
22+
"sigs.k8s.io/controller-runtime/pkg/cache"
23+
"sigs.k8s.io/controller-runtime/pkg/client"
24+
)
25+
26+
// BuildCache add shard-id label selector for given typed object
27+
func BuildCache(scheme *runtime.Scheme, shardingObjects ...client.Object) cache.NewCacheFunc {
28+
opts := cache.Options{
29+
Scheme: scheme,
30+
SelectorsByObject: map[client.Object]cache.ObjectSelector{},
31+
}
32+
if EnableSharding {
33+
ls := labels.SelectorFromSet(map[string]string{LabelKubeVelaScheduledShardID: ShardID})
34+
for _, obj := range shardingObjects {
35+
opts.SelectorsByObject[obj] = cache.ObjectSelector{Label: ls}
36+
}
37+
}
38+
return cache.BuilderWithOptions(opts)
39+
}

controller/sharding/flags.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
Copyright 2023 The KubeVela Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package sharding
18+
19+
import (
20+
"github.com/spf13/pflag"
21+
)
22+
23+
// AddFlags add sharding flags
24+
func AddFlags(fs *pflag.FlagSet) {
25+
AddControllerFlags(fs)
26+
AddSchedulerFlags(fs)
27+
}
28+
29+
// AddControllerFlags add sharding controller flags
30+
func AddControllerFlags(fs *pflag.FlagSet) {
31+
fs.BoolVar(&EnableSharding, "enable-sharding", EnableSharding, "When sharding enabled, the controller will run as master (shard-id=master) or slave mode (shard-id is any non-empty string except master). Refer to https://github.com/kubevela/kubevela/blob/master/design/vela-core/sharding.md for details.")
32+
fs.StringVar(&ShardID, "shard-id", ShardID, "The id for sharding.")
33+
}
34+
35+
// AddSchedulerFlags add sharding scheduler flags
36+
func AddSchedulerFlags(fs *pflag.FlagSet) {
37+
fs.StringSliceVar(&SchedulableShards, "schedulable-shards", SchedulableShards, "The shard ids that are available for scheduling. If empty, dynamic discovery will be used.")
38+
fs.DurationVar(&DynamicDiscoverySchedulerResyncPeriod, "sharding-slave-discovery-resync-period", DynamicDiscoverySchedulerResyncPeriod, "The resync period for default dynamic discovery scheduler.")
39+
}

controller/sharding/scheduler.go

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
Copyright 2023 The KubeVela Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package sharding
18+
19+
import (
20+
"context"
21+
"math/rand"
22+
"sort"
23+
"strings"
24+
"sync"
25+
"sync/atomic"
26+
"time"
27+
28+
corev1 "k8s.io/api/core/v1"
29+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/apimachinery/pkg/labels"
31+
"k8s.io/apimachinery/pkg/runtime"
32+
"k8s.io/apimachinery/pkg/selection"
33+
"k8s.io/client-go/tools/cache"
34+
"k8s.io/klog/v2"
35+
"k8s.io/kubectl/pkg/util/podutils"
36+
"sigs.k8s.io/controller-runtime/pkg/client"
37+
38+
"github.com/kubevela/pkg/util/k8s"
39+
"github.com/kubevela/pkg/util/maps"
40+
velaruntime "github.com/kubevela/pkg/util/runtime"
41+
"github.com/kubevela/pkg/util/singleton"
42+
"github.com/kubevela/pkg/util/slices"
43+
)
44+
45+
// Scheduler schedule shard-id for object
46+
type Scheduler interface {
47+
Start(context.Context)
48+
Schedule(client.Object) bool
49+
}
50+
51+
var _ Scheduler = (*staticScheduler)(nil)
52+
53+
// NewStaticScheduler create a scheduler that do not make update but only use predefined shards for allocate
54+
func NewStaticScheduler(shards []string) Scheduler {
55+
return &staticScheduler{shards: shards}
56+
}
57+
58+
type staticScheduler struct {
59+
shards []string
60+
}
61+
62+
// Start .
63+
func (in *staticScheduler) Start(ctx context.Context) {
64+
klog.Infof("staticScheduler started, shards: [%s]", strings.Join(in.shards, ", "))
65+
}
66+
67+
// Schedule the target object to a random shard
68+
func (in *staticScheduler) Schedule(o client.Object) bool {
69+
if _, scheduled := GetScheduledShardID(o); !scheduled {
70+
if len(in.shards) > 0 {
71+
// nolint
72+
sid := in.shards[rand.Intn(len(in.shards))]
73+
klog.Infof("staticScheduler schedule %s %s/%s to shard[%s]", o.GetObjectKind().GroupVersionKind().Kind, o.GetNamespace(), o.GetName(), sid)
74+
SetScheduledShardID(o, sid)
75+
return true
76+
}
77+
klog.Infof("staticDiscoveryScheduler no schedulable shard found for %s %s/%s", o.GetObjectKind().GroupVersionKind().Kind, o.GetNamespace(), o.GetName())
78+
}
79+
return false
80+
}
81+
82+
var _ Scheduler = (*dynamicDiscoveryScheduler)(nil)
83+
84+
// NewDynamicDiscoveryScheduler create a scheduler that allow dynamic discovery for available shards
85+
func NewDynamicDiscoveryScheduler(name string, resyncPeriod time.Duration) Scheduler {
86+
return &dynamicDiscoveryScheduler{
87+
name: name,
88+
resyncPeriod: resyncPeriod,
89+
candidates: map[string]map[string]bool{},
90+
}
91+
}
92+
93+
type dynamicDiscoveryScheduler struct {
94+
mu sync.RWMutex
95+
96+
name string
97+
resyncPeriod time.Duration
98+
candidates map[string]map[string]bool
99+
roundRobinIndex atomic.Uint32
100+
101+
store cache.Store
102+
informer cache.Controller
103+
}
104+
105+
func (in *dynamicDiscoveryScheduler) _registerPod(obj interface{}) {
106+
if pod, ok := obj.(*corev1.Pod); ok {
107+
id := pod.GetLabels()[LabelKubeVelaShardID]
108+
healthy := podutils.IsPodReady(pod)
109+
klog.Infof("dynamicDiscoveryScheduler register pod %s/%s (id: %s) with health status: %t", pod.Namespace, pod.Name, id, healthy)
110+
in.mu.Lock()
111+
defer in.mu.Unlock()
112+
if _, exist := in.candidates[id]; !exist {
113+
in.candidates[id] = map[string]bool{}
114+
}
115+
in.candidates[id][pod.Name] = healthy
116+
}
117+
}
118+
119+
func (in *dynamicDiscoveryScheduler) _unregisterPod(obj interface{}) {
120+
if pod, ok := obj.(*corev1.Pod); ok {
121+
id := pod.GetLabels()[LabelKubeVelaShardID]
122+
klog.Infof("dynamicDiscoveryScheduler unregister pod %s/%s", pod.Namespace, pod.Name)
123+
in.mu.Lock()
124+
defer in.mu.Unlock()
125+
if _, exist := in.candidates[id]; exist {
126+
delete(in.candidates[id], pod.Name)
127+
if len(in.candidates[id]) == 0 {
128+
delete(in.candidates, id)
129+
}
130+
}
131+
}
132+
}
133+
134+
// resync the available shards
135+
func (in *dynamicDiscoveryScheduler) resync(stopCh <-chan struct{}) {
136+
ticker := time.NewTicker(in.resyncPeriod)
137+
defer ticker.Stop()
138+
for {
139+
select {
140+
case <-stopCh:
141+
return
142+
case <-ticker.C:
143+
in.mu.Lock()
144+
in.candidates = map[string]map[string]bool{}
145+
in.mu.Unlock()
146+
for _, obj := range in.store.List() {
147+
in._registerPod(obj)
148+
}
149+
available := in.availableShards()
150+
klog.Infof("dynamicDiscoveryScheduler resync finished, available shards: [%s]", strings.Join(available, ", "))
151+
}
152+
}
153+
}
154+
155+
// Start run scheduler to watch pods and automatic register
156+
func (in *dynamicDiscoveryScheduler) Start(ctx context.Context) {
157+
klog.Infof("dynamicDiscoveryScheduler staring, watching pods in %s", k8s.GetRuntimeNamespace())
158+
cli := singleton.StaticClient.Get().CoreV1().RESTClient()
159+
lw := cache.NewFilteredListWatchFromClient(cli, "pods", k8s.GetRuntimeNamespace(), func(options *metav1.ListOptions) {
160+
ls := labels.NewSelector()
161+
ls = ls.Add(*velaruntime.Must(labels.NewRequirement(LabelKubeVelaShardID, selection.Exists, nil)))
162+
ls = ls.Add(*velaruntime.Must(labels.NewRequirement("app.kubernetes.io/name", selection.Equals, []string{in.name})))
163+
options.LabelSelector = ls.String()
164+
})
165+
in.store, in.informer = cache.NewInformer(lw, &corev1.Pod{}, in.resyncPeriod, cache.ResourceEventHandlerFuncs{
166+
AddFunc: in._registerPod,
167+
UpdateFunc: func(oldObj, newObj interface{}) {
168+
if k8s.GetLabel(oldObj.(runtime.Object), LabelKubeVelaShardID) != k8s.GetLabel(newObj.(runtime.Object), LabelKubeVelaShardID) {
169+
in._unregisterPod(oldObj)
170+
}
171+
in._registerPod(newObj)
172+
},
173+
DeleteFunc: in._unregisterPod,
174+
})
175+
stopCh := ctx.Done()
176+
if stopCh == nil {
177+
stopCh = make(chan struct{})
178+
}
179+
if in.resyncPeriod > 0 {
180+
go in.resync(stopCh)
181+
}
182+
klog.Infof("dynamicDiscoveryScheduler started")
183+
in.informer.Run(stopCh)
184+
}
185+
186+
func (in *dynamicDiscoveryScheduler) availableShards() []string {
187+
in.mu.RLock()
188+
defer in.mu.RUnlock()
189+
var available []string
190+
for id, pods := range in.candidates {
191+
if slices.Any(maps.Values(pods), func(x bool) bool { return x }) {
192+
available = append(available, id)
193+
}
194+
}
195+
return available
196+
}
197+
198+
func (in *dynamicDiscoveryScheduler) schedule() (string, bool) {
199+
available := in.availableShards()
200+
if len(available) == 0 {
201+
return "", false
202+
}
203+
sort.Strings(available)
204+
idx := in.roundRobinIndex.Add(1) % uint32(len(available))
205+
return available[idx], true
206+
}
207+
208+
// Schedule get available shard-id for application
209+
func (in *dynamicDiscoveryScheduler) Schedule(o client.Object) bool {
210+
if _, scheduled := GetScheduledShardID(o); !scheduled {
211+
if sid, ok := in.schedule(); ok {
212+
klog.Infof("dynamicDiscoveryScheduler schedule %s %s/%s to shard[%s]", o.GetObjectKind().GroupVersionKind().Kind, o.GetNamespace(), o.GetName(), sid)
213+
SetScheduledShardID(o, sid)
214+
return true
215+
}
216+
klog.Infof("dynamicDiscoveryScheduler no schedulable shard found for %s %s/%s", o.GetObjectKind().GroupVersionKind().Kind, o.GetNamespace(), o.GetName())
217+
}
218+
return false
219+
}

0 commit comments

Comments
 (0)