@@ -27,7 +27,6 @@ import (
2727	corev1 "k8s.io/api/core/v1" 
2828	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 
2929	"k8s.io/apimachinery/pkg/util/runtime" 
30- 	"k8s.io/apimachinery/pkg/watch" 
3130	"k8s.io/client-go/rest" 
3231	toolscache "k8s.io/client-go/tools/cache" 
3332	"k8s.io/klog/v2" 
@@ -99,10 +98,9 @@ func main() {
9998		entryLog .Error (err , "unable to set up provider" )
10099		os .Exit (1 )
101100	}
102- 	provider  :=  & NamespacedClusterProvider { Cluster :  cl } 
101+ 	provider  :=  NewNamespacedClusterProvider ( cl ) 
103102
104- 	// Setup a cluster-aware Manager, watching the clusters (= namespaces) through 
105- 	// the cluster provider. 
103+ 	// Setup a cluster-aware Manager, with the provider to lookup clusters. 
106104	entryLog .Info ("Setting up cluster-aware manager" )
107105	mgr , err  :=  manager .New (cfg , manager.Options {
108106		NewCache : func (config  * rest.Config , opts  cache.Options ) (cache.Cache , error ) {
@@ -142,9 +140,15 @@ func main() {
142140	}
143141
144142	entryLog .Info ("Starting provider" )
143+ 	if  err  :=  provider .Start (ctx , mgr ); err  !=  nil  { // does not block 
144+ 		entryLog .Error (err , "unable to start provider" )
145+ 		os .Exit (1 )
146+ 	}
147+ 
148+ 	entryLog .Info ("Starting cluster" )
145149	g , ctx  :=  errgroup .WithContext (ctx )
146150	g .Go (func () error  {
147- 		if  err  :=  ignoreCanceled (provider .Start (ctx )); err  !=  nil  {
151+ 		if  err  :=  ignoreCanceled (cl .Start (ctx )); err  !=  nil  {
148152			return  fmt .Errorf ("failed to start provider: %w" , err )
149153		}
150154		return  nil 
@@ -169,80 +173,93 @@ func main() {
169173// to "default" and vice versa, simulating a multi-cluster setup. It uses one 
170174// informer to watch objects for all namespaces. 
171175type  NamespacedClusterProvider  struct  {
172- 	cluster.Cluster 
173- }
176+ 	cluster  cluster.Cluster 
174177
175- func  (p  * NamespacedClusterProvider ) Get (ctx  context.Context , clusterName  string , opts  ... cluster.Option ) (cluster.Cluster , error ) {
176- 	ns  :=  & corev1.Namespace {}
177- 	if  err  :=  p .Cluster .GetCache ().Get (ctx , client.ObjectKey {Name : clusterName }, ns ); err  !=  nil  {
178- 		return  nil , err 
179- 	}
178+ 	mgr  manager.Manager 
180179
181- 	return  & NamespacedCluster {clusterName : clusterName , Cluster : p .Cluster }, nil 
180+ 	lock       sync.RWMutex 
181+ 	clusters   map [string ]cluster.Cluster 
182+ 	cancelFns  map [string ]context.CancelFunc 
182183}
183184
184- func  (p  * NamespacedClusterProvider ) List (ctx  context.Context ) ([]string , error ) {
185- 	nss  :=  & corev1.NamespaceList {}
186- 	if  err  :=  p .Cluster .GetCache ().List (ctx , nss ); err  !=  nil  {
187- 		return  nil , err 
188- 	}
189- 
190- 	res  :=  make ([]string , 0 , len (nss .Items ))
191- 	for  _ , ns  :=  range  nss .Items  {
192- 		res  =  append (res , ns .Name )
185+ func  NewNamespacedClusterProvider (cl  cluster.Cluster ) * NamespacedClusterProvider  {
186+ 	return  & NamespacedClusterProvider {
187+ 		cluster :   cl ,
188+ 		clusters :  map [string ]cluster.Cluster {},
189+ 		cancelFns : map [string ]context.CancelFunc {},
193190	}
194- 	return  res , nil 
195191}
196192
197- func  (p  * NamespacedClusterProvider ) Watch (ctx  context.Context ) (cluster. Watcher ,  error )  {
198- 	inf , err  :=  p .Cluster .GetCache ().GetInformer (ctx , & corev1.Namespace {})
193+ func  (p  * NamespacedClusterProvider ) Start (ctx  context.Context ,  mgr  manager. Manager )  error  {
194+ 	nsInf , err  :=  p .cluster .GetCache ().GetInformer (ctx , & corev1.Namespace {})
199195	if  err  !=  nil  {
200- 		return  nil ,  err 
196+ 		return  err 
201197	}
202- 	return  & NamespaceWatcher {inf : inf , ch : make (chan  cluster.WatchEvent )}, nil 
203- }
204198
205- type  NamespaceWatcher  struct  {
206- 	inf   cache.Informer 
207- 	init  sync.Once 
208- 	ch    chan  cluster.WatchEvent 
209- 	reg   toolscache.ResourceEventHandlerRegistration 
210- }
199+ 	if  _ , err  :=  nsInf .AddEventHandler (toolscache.ResourceEventHandlerFuncs {
200+ 		AddFunc : func (obj  interface {}) {
201+ 			ns  :=  obj .(* corev1.Namespace )
202+ 
203+ 			p .lock .RLock ()
204+ 			if  _ , ok  :=  p .clusters [ns .Name ]; ok  {
205+ 				defer  p .lock .RUnlock ()
206+ 				return 
207+ 			}
208+ 
209+ 			// create new cluster 
210+ 			p .lock .Lock ()
211+ 			clusterCtx , cancel  :=  context .WithCancel (ctx )
212+ 			cl  :=  & NamespacedCluster {clusterName : ns .Name , Cluster : p .cluster }
213+ 			p .clusters [ns .Name ] =  cl 
214+ 			p .cancelFns [ns .Name ] =  cancel 
215+ 			p .lock .Unlock ()
216+ 
217+ 			if  err  :=  mgr .Engage (clusterCtx , cl ); err  !=  nil  {
218+ 				runtime .HandleError (fmt .Errorf ("failed to engage manager with cluster %q: %w" , ns .Name , err ))
219+ 
220+ 				// cleanup 
221+ 				p .lock .Lock ()
222+ 				delete (p .clusters , ns .Name )
223+ 				delete (p .cancelFns , ns .Name )
224+ 				p .lock .Unlock ()
225+ 			}
226+ 		},
227+ 		DeleteFunc : func (obj  interface {}) {
228+ 			ns  :=  obj .(* corev1.Namespace )
229+ 
230+ 			p .lock .RLock ()
231+ 			cl , ok  :=  p .clusters [ns .Name ]
232+ 			if  ! ok  {
233+ 				p .lock .RUnlock ()
234+ 				return 
235+ 			}
236+ 			p .lock .RUnlock ()
237+ 
238+ 			if  err  :=  mgr .Disengage (ctx , cl ); err  !=  nil  {
239+ 				runtime .HandleError (fmt .Errorf ("failed to disengage manager with cluster %q: %w" , ns .Name , err ))
240+ 			}
211241
212- func  (w  * NamespaceWatcher ) Stop () {
213- 	if  w .reg  !=  nil  {
214- 		_  =  w .inf .RemoveEventHandler (w .reg )
242+ 			// stop and forget 
243+ 			p .lock .Lock ()
244+ 			p .cancelFns [ns.Name ]()
245+ 			delete (p .clusters , ns .Name )
246+ 			delete (p .cancelFns , ns .Name )
247+ 			p .lock .Unlock ()
248+ 		},
249+ 	}); err  !=  nil  {
250+ 		return  err 
215251	}
216- 	close (w .ch )
252+ 
253+ 	return  nil 
217254}
218255
219- func  (w  * NamespaceWatcher ) ResultChan () <- chan  cluster.WatchEvent  {
220- 	w .init .Do (func () {
221- 		w .reg , _  =  w .inf .AddEventHandler (toolscache.ResourceEventHandlerFuncs {
222- 			AddFunc : func (obj  interface {}) {
223- 				ns  :=  obj .(* corev1.Namespace )
224- 				w .ch  <-  cluster.WatchEvent {
225- 					Type :        watch .Added ,
226- 					ClusterName : ns .Name ,
227- 				}
228- 			},
229- 			DeleteFunc : func (obj  interface {}) {
230- 				ns  :=  obj .(* corev1.Namespace )
231- 				w .ch  <-  cluster.WatchEvent {
232- 					Type :        watch .Deleted ,
233- 					ClusterName : ns .Name ,
234- 				}
235- 			},
236- 			UpdateFunc : func (oldObj , newObj  interface {}) {
237- 				ns  :=  newObj .(* corev1.Namespace )
238- 				w .ch  <-  cluster.WatchEvent {
239- 					Type :        watch .Modified ,
240- 					ClusterName : ns .Name ,
241- 				}
242- 			},
243- 		})
244- 	})
245- 	return  w .ch 
256+ func  (p  * NamespacedClusterProvider ) Get (ctx  context.Context , clusterName  string ) (cluster.Cluster , error ) {
257+ 	p .lock .RLock ()
258+ 	defer  p .lock .RUnlock ()
259+ 	if  cl , ok  :=  p .clusters [clusterName ]; ok  {
260+ 		return  cl , nil 
261+ 	}
262+ 	return  nil , fmt .Errorf ("cluster %s not found" , clusterName )
246263}
247264
248265func  ignoreCanceled (err  error ) error  {
0 commit comments