Skip to content

Commit a4c30dc

Browse files
committed
make namespace parsing and creating informers pluggable
1 parent 4c9ac06 commit a4c30dc

File tree

8 files changed

+150
-50
lines changed

8 files changed

+150
-50
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package options
2+
3+
import (
4+
mpijobclientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
5+
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
6+
kubeinformers "k8s.io/client-go/informers"
7+
kubeclientset "k8s.io/client-go/kubernetes"
8+
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
9+
schedinformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"
10+
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
11+
volcanoinformers "volcano.sh/apis/pkg/client/informers/externalversions"
12+
)
13+
14+
type NamespaceParserFunc func(namespace string, kubeClient kubeclientset.Interface) ([]string, error)
15+
16+
type NamespaceOptions struct {
17+
Namespaces NamespaceParserFunc
18+
}
19+
20+
func DefaultNamespaceParser(namespace string, kubeClient kubeclientset.Interface) ([]string, error) {
21+
return []string{namespace}, nil
22+
}
23+
24+
type KubeInformerFunc func(namespaces []string, kubeClient kubeclientset.Interface) kubeinformers.SharedInformerFactory
25+
type MpiJobInformerFunc func(namespaces []string, mpiJobClient mpijobclientset.Interface) informers.SharedInformerFactory
26+
type VolcanoInformerFunc func(namespaces []string, volcanoClient volcanoclient.Interface) volcanoinformers.SharedInformerFactory
27+
type SchedulerPluginsInformerFunc func(namespaces []string, schedClient schedclientset.Interface) schedinformers.SharedInformerFactory
28+
29+
type InformerOptions struct {
30+
KubeInformer KubeInformerFunc
31+
MpiJobInformer MpiJobInformerFunc
32+
VolcanoInformer VolcanoInformerFunc
33+
SchedulerPluginsInformer SchedulerPluginsInformerFunc
34+
}
35+
type AdditionalOptions struct {
36+
NamespaceOptions
37+
InformerOptions
38+
}

cmd/mpi-operator/app/options/options.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package options
1616

1717
import (
1818
"flag"
19+
"github.com/kubeflow/mpi-operator/pkg/informers"
1920
"os"
2021

2122
"github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
@@ -38,11 +39,21 @@ type ServerOption struct {
3839
LockNamespace string
3940
QPS int
4041
Burst int
42+
43+
NamespaceOptions
44+
InformerOptions
4145
}
4246

4347
// NewServerOption creates a new CMServer with a default config.
4448
func NewServerOption() *ServerOption {
4549
s := ServerOption{}
50+
51+
s.Namespaces = DefaultNamespaceParser
52+
s.KubeInformer = informers.DefaultKubeInformer
53+
s.MpiJobInformer = informers.DefaultMpiJobInformer
54+
s.VolcanoInformer = informers.DefaultVolcanoInformer
55+
s.SchedulerPluginsInformer = informers.DefaultSchedulerPluginsInformer
56+
4657
return &s
4758
}
4859

cmd/mpi-operator/app/server.go

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"k8s.io/apimachinery/pkg/util/uuid"
3030
kubeapiserver "k8s.io/apiserver/pkg/server"
3131
"k8s.io/apiserver/pkg/server/healthz"
32-
kubeinformers "k8s.io/client-go/informers"
3332
kubeclientset "k8s.io/client-go/kubernetes"
3433
clientgokubescheme "k8s.io/client-go/kubernetes/scheme"
3534
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -45,7 +44,6 @@ import (
4544
"github.com/kubeflow/mpi-operator/cmd/mpi-operator/app/options"
4645
mpijobclientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
4746
kubeflowscheme "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme"
48-
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
4947
controllersv1 "github.com/kubeflow/mpi-operator/pkg/controller"
5048
"github.com/kubeflow/mpi-operator/pkg/version"
5149
)
@@ -82,13 +80,6 @@ func Run(opt *options.ServerOption) error {
8280
version.PrintVersionAndExit(apiVersion)
8381
}
8482

85-
namespace := opt.Namespace
86-
if namespace == corev1.NamespaceAll {
87-
klog.Info("Using cluster scoped operator")
88-
} else {
89-
klog.Infof("Scoping operator to namespace %s", namespace)
90-
}
91-
9283
// To help debugging, immediately log version.
9384
klog.Infof("%+v", version.Info(apiVersion))
9485

@@ -118,9 +109,23 @@ func Run(opt *options.ServerOption) error {
118109
if err != nil {
119110
return err
120111
}
121-
if !checkCRDExists(mpiJobClientSet, namespace) {
122-
klog.Info("CRD doesn't exist. Exiting")
123-
os.Exit(1)
112+
113+
namespaces, err := opt.Namespaces(opt.Namespace, kubeClient)
114+
if err != nil {
115+
return err
116+
}
117+
118+
if namespaces[0] == corev1.NamespaceAll {
119+
klog.Info("Using cluster scoped operator")
120+
} else {
121+
klog.Infof("Scoping operator to namespace %s", namespaces)
122+
}
123+
124+
for _, namespace := range namespaces {
125+
if !checkCRDExists(mpiJobClientSet, namespace) {
126+
klog.Info("CRD doesn't exist. Exiting")
127+
os.Exit(1)
128+
}
124129
}
125130

126131
// Add mpi-job-controller types to the default Kubernetes Scheme so Events
@@ -132,14 +137,8 @@ func Run(opt *options.ServerOption) error {
132137

133138
// Set leader election start function.
134139
run := func(ctx context.Context) {
135-
var kubeInformerFactoryOpts []kubeinformers.SharedInformerOption
136-
var kubeflowInformerFactoryOpts []informers.SharedInformerOption
137-
if namespace != metav1.NamespaceAll {
138-
kubeInformerFactoryOpts = append(kubeInformerFactoryOpts, kubeinformers.WithNamespace(namespace))
139-
kubeflowInformerFactoryOpts = append(kubeflowInformerFactoryOpts, informers.WithNamespace(namespace))
140-
}
141-
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeInformerFactoryOpts...)
142-
kubeflowInformerFactory := informers.NewSharedInformerFactoryWithOptions(mpiJobClientSet, 0, kubeflowInformerFactoryOpts...)
140+
kubeInformerFactory := opt.KubeInformer(namespaces, kubeClient)
141+
mpiJobInformerFactory := opt.MpiJobInformer(namespaces, mpiJobClientSet)
143142

144143
controller, err := controllersv1.NewMPIJobController(
145144
kubeClient,
@@ -152,14 +151,15 @@ func Run(opt *options.ServerOption) error {
152151
kubeInformerFactory.Batch().V1().Jobs(),
153152
kubeInformerFactory.Core().V1().Pods(),
154153
kubeInformerFactory.Scheduling().V1().PriorityClasses(),
155-
kubeflowInformerFactory.Kubeflow().V2beta1().MPIJobs(),
156-
namespace, opt.GangSchedulingName)
154+
mpiJobInformerFactory.Kubeflow().V2beta1().MPIJobs(),
155+
opt.VolcanoInformer, opt.SchedulerPluginsInformer,
156+
namespaces, opt.GangSchedulingName)
157157
if err != nil {
158158
klog.Fatalf("Failed to setup the controller")
159159
}
160160

161161
go kubeInformerFactory.Start(ctx.Done())
162-
go kubeflowInformerFactory.Start(ctx.Done())
162+
go mpiJobInformerFactory.Start(ctx.Done())
163163
if controller.PodGroupCtrl != nil {
164164
controller.PodGroupCtrl.StartInformerFactory(ctx.Done())
165165
}

pkg/controller/mpi_job_controller.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -272,10 +272,13 @@ func NewMPIJobController(
272272
podInformer coreinformers.PodInformer,
273273
priorityClassInformer schedulinginformers.PriorityClassInformer,
274274
mpiJobInformer informers.MPIJobInformer,
275-
namespace, gangSchedulingName string) (*MPIJobController, error) {
275+
volcanoInformerFunc options.VolcanoInformerFunc, schedulerPluginsInformerFunc options.SchedulerPluginsInformerFunc,
276+
namespaces []string, gangSchedulingName string) (*MPIJobController, error) {
276277
return NewMPIJobControllerWithClock(kubeClient, kubeflowClient, volcanoClient, schedClient,
277278
configMapInformer, secretInformer, serviceInformer, jobInformer, podInformer,
278-
priorityClassInformer, mpiJobInformer, &clock.RealClock{}, namespace, gangSchedulingName)
279+
priorityClassInformer, mpiJobInformer, &clock.RealClock{},
280+
volcanoInformerFunc, schedulerPluginsInformerFunc,
281+
namespaces, gangSchedulingName)
279282
}
280283

281284
// NewMPIJobControllerWithClock returns a new MPIJob controller.
@@ -292,7 +295,8 @@ func NewMPIJobControllerWithClock(
292295
priorityClassInformer schedulinginformers.PriorityClassInformer,
293296
mpiJobInformer informers.MPIJobInformer,
294297
clock clock.WithTicker,
295-
namespace, gangSchedulingName string) (*MPIJobController, error) {
298+
volcanoInformer options.VolcanoInformerFunc, schedulerPluginsInformer options.SchedulerPluginsInformerFunc,
299+
namespaces []string, gangSchedulingName string) (*MPIJobController, error) {
296300

297301
// Create event broadcaster.
298302
klog.V(4).Info("Creating event broadcaster")
@@ -311,10 +315,12 @@ func NewMPIJobControllerWithClock(
311315
priorityClassLister = priorityClassInformer.Lister()
312316
priorityClassSynced = priorityClassInformer.Informer().HasSynced
313317
if gangSchedulingName == options.GangSchedulerVolcano {
314-
podGroupCtrl = NewVolcanoCtrl(volcanoClient, namespace, priorityClassLister)
318+
volcanoInformer := volcanoInformer(namespaces, volcanoClient)
319+
podGroupCtrl = NewVolcanoCtrl(volcanoClient, volcanoInformer, priorityClassLister)
315320
} else if len(gangSchedulingName) != 0 {
316321
// Use scheduler-plugins as a default gang-scheduler.
317-
podGroupCtrl = NewSchedulerPluginsCtrl(schedClient, namespace, gangSchedulingName, priorityClassLister)
322+
pgInformer := schedulerPluginsInformer(namespaces, schedClient)
323+
podGroupCtrl = NewSchedulerPluginsCtrl(schedClient, pgInformer, gangSchedulingName, priorityClassLister)
318324
}
319325
if podGroupCtrl != nil {
320326
podGroupSynced = podGroupCtrl.PodGroupSharedIndexInformer().HasSynced

pkg/controller/mpi_job_controller_test.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package controller
1616

1717
import (
1818
"fmt"
19+
"github.com/kubeflow/mpi-operator/pkg/informers"
1920
"reflect"
2021
"testing"
2122
"time"
@@ -46,7 +47,7 @@ import (
4647
kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
4748
"github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/fake"
4849
"github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme"
49-
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
50+
mpijobinformers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
5051
)
5152

5253
var (
@@ -86,6 +87,8 @@ type fixture struct {
8687
objects []runtime.Object
8788

8889
gangSchedulingName string
90+
91+
namespaces []string
8992
}
9093

9194
func newFixture(t *testing.T, gangSchedulingName string) *fixture {
@@ -94,6 +97,7 @@ func newFixture(t *testing.T, gangSchedulingName string) *fixture {
9497
f.objects = []runtime.Object{}
9598
f.kubeObjects = []runtime.Object{}
9699
f.gangSchedulingName = gangSchedulingName
100+
f.namespaces = []string{metav1.NamespaceAll}
97101
return f
98102
}
99103

@@ -155,11 +159,12 @@ func newMPIJob(name string, replicas *int32, startTime, completionTime *metav1.T
155159
return mpiJob
156160
}
157161

158-
func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) {
162+
func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, mpijobinformers.SharedInformerFactory, kubeinformers.SharedInformerFactory) {
159163
f.client = fake.NewSimpleClientset(f.objects...)
160164
f.kubeClient = k8sfake.NewSimpleClientset(f.kubeObjects...)
161-
i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc())
162-
k8sI := kubeinformers.NewSharedInformerFactory(f.kubeClient, noResyncPeriodFunc())
165+
166+
i := informers.DefaultMpiJobInformer(f.namespaces, f.client)
167+
k8sI := informers.DefaultKubeInformer(f.namespaces, f.kubeClient)
163168

164169
c, err := NewMPIJobControllerWithClock(
165170
f.kubeClient,
@@ -174,7 +179,8 @@ func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, info
174179
k8sI.Scheduling().V1().PriorityClasses(),
175180
i.Kubeflow().V2beta1().MPIJobs(),
176181
clock,
177-
metav1.NamespaceAll,
182+
informers.DefaultVolcanoInformer, informers.DefaultSchedulerPluginsInformer,
183+
f.namespaces,
178184
f.gangSchedulingName,
179185
)
180186
if err != nil {

pkg/controller/podgroup.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,7 @@ type VolcanoCtrl struct {
7373
schedulerName string
7474
}
7575

76-
func NewVolcanoCtrl(c volcanoclient.Interface, watchNamespace string, pcLister schedulinglisters.PriorityClassLister) *VolcanoCtrl {
77-
var informerFactoryOpts []volcanoinformers.SharedInformerOption
78-
if watchNamespace != metav1.NamespaceAll {
79-
informerFactoryOpts = append(informerFactoryOpts, volcanoinformers.WithNamespace(watchNamespace))
80-
}
81-
informerFactory := volcanoinformers.NewSharedInformerFactoryWithOptions(c, 0, informerFactoryOpts...)
76+
func NewVolcanoCtrl(c volcanoclient.Interface, informerFactory volcanoinformers.SharedInformerFactory, pcLister schedulinglisters.PriorityClassLister) *VolcanoCtrl {
8277
return &VolcanoCtrl{
8378
Client: c,
8479
InformerFactory: informerFactory,
@@ -204,14 +199,9 @@ type SchedulerPluginsCtrl struct {
204199

205200
func NewSchedulerPluginsCtrl(
206201
c schedclientset.Interface,
207-
watchNamespace, schedulerName string,
202+
pgInformerFactory schedinformers.SharedInformerFactory, schedulerName string,
208203
pcLister schedulinglisters.PriorityClassLister,
209204
) *SchedulerPluginsCtrl {
210-
var informerFactoryOpts []schedinformers.SharedInformerOption
211-
if watchNamespace != metav1.NamespaceAll {
212-
informerFactoryOpts = append(informerFactoryOpts, schedinformers.WithNamespace(watchNamespace))
213-
}
214-
pgInformerFactory := schedinformers.NewSharedInformerFactoryWithOptions(c, 0, informerFactoryOpts...)
215205
return &SchedulerPluginsCtrl{
216206
Client: c,
217207
InformerFactory: pgInformerFactory,

pkg/informers/informers.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package informers
2+
3+
import (
4+
mpijobclientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
5+
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
6+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
7+
kubeinformers "k8s.io/client-go/informers"
8+
kubeclientset "k8s.io/client-go/kubernetes"
9+
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
10+
schedinformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"
11+
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
12+
volcanoinformers "volcano.sh/apis/pkg/client/informers/externalversions"
13+
)
14+
15+
func DefaultKubeInformer(namespaces []string, kubeClient kubeclientset.Interface) kubeinformers.SharedInformerFactory {
16+
var kubeInformerFactoryOpts []kubeinformers.SharedInformerOption
17+
if namespaces[0] != metav1.NamespaceAll {
18+
kubeInformerFactoryOpts = append(kubeInformerFactoryOpts, kubeinformers.WithNamespace(namespaces[0]))
19+
}
20+
21+
return kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeInformerFactoryOpts...)
22+
}
23+
24+
func DefaultMpiJobInformer(namespaces []string, mpiJobClient mpijobclientset.Interface) informers.SharedInformerFactory {
25+
var kubeflowInformerFactoryOpts []informers.SharedInformerOption
26+
if namespaces[0] != metav1.NamespaceAll {
27+
kubeflowInformerFactoryOpts = append(kubeflowInformerFactoryOpts, informers.WithNamespace(namespaces[0]))
28+
}
29+
30+
return informers.NewSharedInformerFactoryWithOptions(mpiJobClient, 0, kubeflowInformerFactoryOpts...)
31+
}
32+
33+
func DefaultVolcanoInformer(namespaces []string, volcanoClient volcanoclient.Interface) volcanoinformers.SharedInformerFactory {
34+
var informerFactoryOpts []volcanoinformers.SharedInformerOption
35+
if namespaces[0] != metav1.NamespaceAll {
36+
informerFactoryOpts = append(informerFactoryOpts, volcanoinformers.WithNamespace(namespaces[0]))
37+
}
38+
return volcanoinformers.NewSharedInformerFactoryWithOptions(volcanoClient, 0, informerFactoryOpts...)
39+
}
40+
41+
func DefaultSchedulerPluginsInformer(namespaces []string, schedClient schedclientset.Interface) schedinformers.SharedInformerFactory {
42+
var informerFactoryOpts []schedinformers.SharedInformerOption
43+
if namespaces[0] != metav1.NamespaceAll {
44+
informerFactoryOpts = append(informerFactoryOpts, schedinformers.WithNamespace(namespaces[0]))
45+
}
46+
return schedinformers.NewSharedInformerFactoryWithOptions(schedClient, 0, informerFactoryOpts...)
47+
}

test/integration/mpi_job_controller_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package integration
1717
import (
1818
"context"
1919
"fmt"
20+
"github.com/kubeflow/mpi-operator/pkg/informers"
2021
"testing"
2122
"time"
2223

@@ -29,7 +30,6 @@ import (
2930
"k8s.io/apimachinery/pkg/labels"
3031
"k8s.io/apimachinery/pkg/util/runtime"
3132
"k8s.io/apimachinery/pkg/util/wait"
32-
kubeinformers "k8s.io/client-go/informers"
3333
"k8s.io/client-go/kubernetes"
3434
"k8s.io/client-go/tools/reference"
3535
"k8s.io/utils/pointer"
@@ -41,7 +41,6 @@ import (
4141
kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
4242
clientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
4343
"github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme"
44-
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
4544
"github.com/kubeflow/mpi-operator/pkg/controller"
4645
)
4746

@@ -828,8 +827,10 @@ func startController(
828827
mpiClient clientset.Interface,
829828
gangSchedulerCfg *gangSchedulerConfig,
830829
) {
831-
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kClient, 0)
832-
mpiInformerFactory := informers.NewSharedInformerFactory(mpiClient, 0)
830+
namespaces := []string{metav1.NamespaceAll}
831+
832+
kubeInformerFactory := informers.DefaultKubeInformer(namespaces, kClient)
833+
mpiInformerFactory := informers.DefaultMpiJobInformer(namespaces, mpiClient)
833834
var (
834835
volcanoClient volcanoclient.Interface
835836
schedClient schedclientset.Interface
@@ -855,7 +856,8 @@ func startController(
855856
kubeInformerFactory.Core().V1().Pods(),
856857
kubeInformerFactory.Scheduling().V1().PriorityClasses(),
857858
mpiInformerFactory.Kubeflow().V2beta1().MPIJobs(),
858-
metav1.NamespaceAll, schedulerName,
859+
informers.DefaultVolcanoInformer, informers.DefaultSchedulerPluginsInformer,
860+
namespaces, schedulerName,
859861
)
860862
if err != nil {
861863
panic(err)

0 commit comments

Comments
 (0)