Skip to content

Commit 69279ef

Browse files
committed
Implement endpointslices predicates
This commit adds predicates to the endpointslices controller to state if an object should reconciled or not. Signed-off-by: Elis Lulja <[email protected]>
1 parent 35405b7 commit 69279ef

File tree

6 files changed

+527
-28
lines changed

6 files changed

+527
-28
lines changed

controllers/base.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ import (
2020
"context"
2121
"fmt"
2222
"strings"
23+
"time"
2324

2425
"github.com/CloudNativeSDWAN/cnwan-operator/internal/types"
2526
sr "github.com/CloudNativeSDWAN/cnwan-operator/pkg/servregistry"
2627
"github.com/go-logr/logr"
2728
corev1 "k8s.io/api/core/v1"
29+
"k8s.io/api/discovery/v1beta1"
2830
"k8s.io/apimachinery/pkg/runtime"
2931
ktypes "k8s.io/apimachinery/pkg/types"
3032
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -128,6 +130,36 @@ func (b *BaseReconciler) filterAnnotations(annotations map[string]string) map[st
128130
return filtered
129131
}
130132

133+
func (b *BaseReconciler) shouldWatchEpSlice(epslice *v1beta1.EndpointSlice) bool {
134+
srvName, exists := epslice.Labels[v1beta1.LabelServiceName]
135+
if !exists {
136+
return false
137+
}
138+
139+
ctx, canc := context.WithTimeout(context.Background(), 30*time.Second)
140+
defer canc()
141+
142+
var srv corev1.Service
143+
if err := b.Get(ctx, ktypes.NamespacedName{Name: srvName, Namespace: epslice.Namespace}, &srv); err != nil {
144+
b.Log.Error(err, "error while getting service from endpointslice", "endpointslice", epslice.Name)
145+
return false
146+
}
147+
if !b.shouldWatchSrv(&srv) {
148+
return false
149+
}
150+
151+
enabled := srv.Labels[countPodsLabelKey]
152+
if strings.ToLower(enabled) != enableVal {
153+
return false
154+
}
155+
156+
if epslice.AddressType != v1beta1.AddressTypeIPv4 {
157+
return false
158+
}
159+
160+
return true
161+
}
162+
131163
// NamespaceReconciler returns a namespace reconciler starting from this
132164
// base reconciler.
133165
func (b *BaseReconciler) NamespaceReconciler() *NamespaceReconciler {

controllers/base_test.go

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/CloudNativeSDWAN/cnwan-operator/internal/types"
2424
"github.com/stretchr/testify/assert"
2525
corev1 "k8s.io/api/core/v1"
26+
"k8s.io/api/discovery/v1beta1"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
"k8s.io/apimachinery/pkg/runtime"
2829
ctrl "sigs.k8s.io/controller-runtime"
@@ -284,3 +285,191 @@ func TestShouldWatchSrv(t *testing.T) {
284285
}
285286
}
286287
}
288+
289+
func TestShouldWatchEpSlice(t *testing.T) {
290+
a := assert.New(t)
291+
cases := []struct {
292+
eps *v1beta1.EndpointSlice
293+
cli client.Client
294+
expRes bool
295+
}{
296+
{
297+
eps: &v1beta1.EndpointSlice{
298+
ObjectMeta: metav1.ObjectMeta{
299+
Namespace: "ns-name",
300+
Labels: map[string]string{},
301+
},
302+
},
303+
},
304+
{
305+
eps: &v1beta1.EndpointSlice{
306+
ObjectMeta: metav1.ObjectMeta{
307+
Namespace: "ns-name",
308+
Labels: map[string]string{
309+
v1beta1.LabelServiceName: "srv-name",
310+
},
311+
},
312+
},
313+
cli: func() client.Client {
314+
scheme := runtime.NewScheme()
315+
corev1.AddToScheme(scheme)
316+
return fakecli.NewFakeClientWithScheme(scheme)
317+
}(),
318+
},
319+
{
320+
eps: &v1beta1.EndpointSlice{
321+
ObjectMeta: metav1.ObjectMeta{
322+
Namespace: "ns-name",
323+
Labels: map[string]string{
324+
v1beta1.LabelServiceName: "srv-name",
325+
},
326+
},
327+
},
328+
cli: func() client.Client {
329+
scheme := runtime.NewScheme()
330+
corev1.AddToScheme(scheme)
331+
return fakecli.NewFakeClientWithScheme(scheme, &corev1.Namespace{
332+
ObjectMeta: metav1.ObjectMeta{
333+
Name: "ns-name",
334+
Labels: map[string]string{},
335+
},
336+
}, &corev1.Service{
337+
ObjectMeta: metav1.ObjectMeta{
338+
Name: "srv-name",
339+
Namespace: "ns-name",
340+
Labels: map[string]string{},
341+
},
342+
})
343+
}(),
344+
},
345+
{
346+
eps: &v1beta1.EndpointSlice{
347+
ObjectMeta: metav1.ObjectMeta{
348+
Namespace: "ns-name",
349+
Labels: map[string]string{
350+
v1beta1.LabelServiceName: "srv-name",
351+
},
352+
},
353+
},
354+
cli: func() client.Client {
355+
scheme := runtime.NewScheme()
356+
corev1.AddToScheme(scheme)
357+
return fakecli.NewFakeClientWithScheme(scheme, &corev1.Namespace{
358+
ObjectMeta: metav1.ObjectMeta{
359+
Name: "ns-name",
360+
Labels: map[string]string{string(types.AllowedKey): "whatever"},
361+
},
362+
}, &corev1.Service{
363+
ObjectMeta: metav1.ObjectMeta{
364+
Name: "srv-name",
365+
Namespace: "ns-name",
366+
Annotations: map[string]string{"yes": "no"},
367+
Labels: map[string]string{countPodsLabelKey: disableVal},
368+
},
369+
Spec: corev1.ServiceSpec{
370+
Type: corev1.ServiceTypeLoadBalancer,
371+
},
372+
Status: corev1.ServiceStatus{
373+
LoadBalancer: corev1.LoadBalancerStatus{
374+
Ingress: []corev1.LoadBalancerIngress{
375+
{IP: "10.10.10.10"},
376+
},
377+
},
378+
},
379+
})
380+
}(),
381+
},
382+
{
383+
eps: &v1beta1.EndpointSlice{
384+
ObjectMeta: metav1.ObjectMeta{
385+
Namespace: "ns-name",
386+
Labels: map[string]string{
387+
v1beta1.LabelServiceName: "srv-name",
388+
},
389+
},
390+
},
391+
cli: func() client.Client {
392+
scheme := runtime.NewScheme()
393+
corev1.AddToScheme(scheme)
394+
return fakecli.NewFakeClientWithScheme(scheme, &corev1.Namespace{
395+
ObjectMeta: metav1.ObjectMeta{
396+
Name: "ns-name",
397+
Labels: map[string]string{string(types.AllowedKey): "whatever"},
398+
},
399+
}, &corev1.Service{
400+
ObjectMeta: metav1.ObjectMeta{
401+
Name: "srv-name",
402+
Namespace: "ns-name",
403+
Annotations: map[string]string{"yes": "no"},
404+
Labels: map[string]string{countPodsLabelKey: enableVal},
405+
},
406+
Spec: corev1.ServiceSpec{
407+
Type: corev1.ServiceTypeLoadBalancer,
408+
},
409+
Status: corev1.ServiceStatus{
410+
LoadBalancer: corev1.LoadBalancerStatus{
411+
Ingress: []corev1.LoadBalancerIngress{
412+
{IP: "10.10.10.10"},
413+
},
414+
},
415+
},
416+
})
417+
}(),
418+
},
419+
{
420+
eps: &v1beta1.EndpointSlice{
421+
ObjectMeta: metav1.ObjectMeta{
422+
Namespace: "ns-name",
423+
Labels: map[string]string{
424+
v1beta1.LabelServiceName: "srv-name",
425+
},
426+
},
427+
AddressType: v1beta1.AddressTypeIPv4,
428+
},
429+
cli: func() client.Client {
430+
scheme := runtime.NewScheme()
431+
corev1.AddToScheme(scheme)
432+
return fakecli.NewFakeClientWithScheme(scheme, &corev1.Namespace{
433+
ObjectMeta: metav1.ObjectMeta{
434+
Name: "ns-name",
435+
Labels: map[string]string{string(types.AllowedKey): "whatever"},
436+
},
437+
}, &corev1.Service{
438+
ObjectMeta: metav1.ObjectMeta{
439+
Name: "srv-name",
440+
Namespace: "ns-name",
441+
Annotations: map[string]string{"yes": "no"},
442+
Labels: map[string]string{countPodsLabelKey: enableVal},
443+
},
444+
Spec: corev1.ServiceSpec{
445+
Type: corev1.ServiceTypeLoadBalancer,
446+
},
447+
Status: corev1.ServiceStatus{
448+
LoadBalancer: corev1.LoadBalancerStatus{
449+
Ingress: []corev1.LoadBalancerIngress{
450+
{IP: "10.10.10.10"},
451+
},
452+
},
453+
},
454+
})
455+
}(),
456+
expRes: true,
457+
},
458+
}
459+
failed := func(i int) {
460+
a.FailNow(fmt.Sprintf("case %d failed", i))
461+
}
462+
for i, currCase := range cases {
463+
b := &BaseReconciler{
464+
Client: currCase.cli,
465+
Log: ctrl.Log.WithName("test"),
466+
AllowedAnnotations: map[string]bool{"yes": true},
467+
CurrentNsPolicy: types.AllowList,
468+
}
469+
470+
res := b.shouldWatchEpSlice(currCase.eps)
471+
if !a.Equal(currCase.expRes, res) {
472+
failed(i)
473+
}
474+
}
475+
}

controllers/endpointslice_controller.go

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,37 @@ package controllers
1818

1919
import (
2020
"context"
21+
"sync"
2122

23+
"k8s.io/api/discovery/v1beta1"
2224
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
25+
ktypes "k8s.io/apimachinery/pkg/types"
2326
ctrl "sigs.k8s.io/controller-runtime"
27+
"sigs.k8s.io/controller-runtime/pkg/event"
28+
"sigs.k8s.io/controller-runtime/pkg/predicate"
2429
)
2530

31+
const (
32+
countPodsLabelKey string = "operator.cnwan.io/pods-count"
33+
enableVal string = "enabled"
34+
disableVal string = "disabled"
35+
)
36+
37+
type epsData struct {
38+
count int
39+
srv string
40+
}
41+
2642
// EndpointSliceReconciler reconciles a EndpointSlice object
2743
type EndpointSliceReconciler struct {
2844
*BaseReconciler
45+
46+
lock sync.Mutex
47+
epsDataActions map[string]*epsData
48+
srvCounts map[string]map[string]int
2949
}
3050

3151
// +kubebuilder:rbac:groups=discovery.k8s.io,resources=endpointslice,verbs=get;list;watch;create;update;patch;delete
32-
// +kubebuilder:rbac:groups=discovery.k8s.io,resources=endpointslice/status,verbs=get;update;patch
3352

3453
// Reconcile keeps track counts in the endpointslice length
3554
func (r *EndpointSliceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
@@ -43,7 +62,77 @@ func (r *EndpointSliceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
4362

4463
// SetupWithManager sets this reconciler with the manager.
4564
func (r *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error {
65+
r.epsDataActions = map[string]*epsData{}
66+
r.srvCounts = map[string]map[string]int{}
67+
predicates := predicate.Funcs{
68+
CreateFunc: r.createPredicate,
69+
UpdateFunc: r.updatePredicate,
70+
DeleteFunc: r.deletePredicate,
71+
}
72+
4673
return ctrl.NewControllerManagedBy(mgr).
4774
For(&discoveryv1beta1.EndpointSlice{}).
75+
WithEventFilter(predicates).
4876
Complete(r)
4977
}
78+
79+
func (r *EndpointSliceReconciler) createPredicate(ev event.CreateEvent) bool {
80+
epslice, ok := ev.Object.(*v1beta1.EndpointSlice)
81+
if !ok {
82+
return false
83+
}
84+
85+
if !r.shouldWatchEpSlice(epslice) {
86+
return false
87+
}
88+
89+
namespacedName := ktypes.NamespacedName{Namespace: epslice.Namespace, Name: epslice.Name}.String()
90+
epCount := 0
91+
for _, ep := range epslice.Endpoints {
92+
if ep.Conditions.Ready != nil && !*ep.Conditions.Ready {
93+
r.Log.WithValues("name", namespacedName).Info("found some in not ready", "len", len(ep.Addresses))
94+
continue
95+
}
96+
epCount += len(ep.Addresses)
97+
}
98+
99+
r.Log.WithValues("name", namespacedName).Info("calculated len for this epslice", "len", epCount)
100+
101+
r.lock.Lock()
102+
defer r.lock.Unlock()
103+
r.epsDataActions[namespacedName] = &epsData{
104+
srv: epslice.Labels[v1beta1.LabelServiceName],
105+
count: epCount,
106+
}
107+
108+
return true
109+
}
110+
111+
func (r *EndpointSliceReconciler) updatePredicate(ev event.UpdateEvent) bool {
112+
evNew := event.CreateEvent{
113+
Meta: ev.MetaNew,
114+
Object: ev.ObjectNew,
115+
}
116+
return r.createPredicate(evNew)
117+
}
118+
119+
func (r *EndpointSliceReconciler) deletePredicate(ev event.DeleteEvent) bool {
120+
epslice, ok := ev.Object.(*v1beta1.EndpointSlice)
121+
if !ok {
122+
return false
123+
}
124+
125+
if !r.shouldWatchEpSlice(epslice) {
126+
return false
127+
}
128+
129+
namespacedName := ktypes.NamespacedName{Namespace: epslice.Namespace, Name: epslice.Name}.String()
130+
r.lock.Lock()
131+
defer r.lock.Unlock()
132+
r.epsDataActions[namespacedName] = &epsData{
133+
srv: epslice.Labels[v1beta1.LabelServiceName],
134+
count: 0,
135+
}
136+
137+
return true
138+
}

0 commit comments

Comments
 (0)