@@ -32,7 +32,6 @@ import (
3232 "sigs.k8s.io/controller-runtime/pkg/cluster"
3333 "sigs.k8s.io/controller-runtime/pkg/handler"
3434 ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
35- internal "sigs.k8s.io/controller-runtime/pkg/internal/source"
3635 logf "sigs.k8s.io/controller-runtime/pkg/log"
3736 "sigs.k8s.io/controller-runtime/pkg/predicate"
3837 "sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -82,7 +81,7 @@ type Controller struct {
8281 startWatches []* watchDescription
8382
8483 // clusterAwareWatches maintains a list of cluster aware sources, handlers, and predicates to start when the controller is started.
85- clusterAwareWatches []* watchDescription
84+ clusterAwareWatches []* deepcopyableWatchDescription
8685
8786 // clustersByName is used to manage the fleet of clusters.
8887 clustersByName map [string ]* clusterDescription
@@ -124,37 +123,13 @@ type watchDescription struct {
124123 predicates []predicate.Predicate
125124}
126125
127- func (w * watchDescription ) IsClusterAware () bool {
128- if _ , ok := w .src .(cluster.AwareDeepCopy [* internal.Kind ]); ! ok {
129- if _ , ok := w .src .(cluster.AwareDeepCopy [source.Source ]); ! ok {
130- return false
131- }
132- }
133- if _ , ok := w .handler .(cluster.AwareDeepCopy [handler.EventHandler ]); ! ok {
134- return false
135- }
136- return true
137- }
138-
139- func (w * watchDescription ) DeepCopyFor (c cluster.Cluster ) * watchDescription {
140- copy := & watchDescription {
141- predicates : w .predicates ,
142- }
143- if clusterAwareSource , ok := w .src .(cluster.AwareDeepCopy [* internal.Kind ]); ok {
144- copy .src = clusterAwareSource .DeepCopyFor (c )
145- } else if clusterAwareSource , ok := w .src .(cluster.AwareDeepCopy [source.Source ]); ok {
146- copy .src = clusterAwareSource .DeepCopyFor (c )
147- } else {
148- return nil
149- }
150-
151- if clusterAwareHandler , ok := w .handler .(cluster.AwareDeepCopy [handler.EventHandler ]); ok {
152- copy .handler = clusterAwareHandler .DeepCopyFor (c )
153- } else {
154- return nil
155- }
156-
157- return copy
126+ // deepcopyableWatchDescription contains all the information necessary to start
127+ // a watch. In addition to watchDescription it also contains the DeepCopyFor
128+ // method to adapt it to a different cluster.
129+ type deepcopyableWatchDescription struct {
130+ src source.DeepCopyableSyncingSource
131+ handler handler.DeepCopyableEventHandler
132+ predicates []predicate.Predicate
158133}
159134
160135// Reconcile implements reconcile.Reconciler.
@@ -182,14 +157,22 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
182157 c .mu .Lock ()
183158 defer c .mu .Unlock ()
184159
185- watchDesc := & watchDescription {src : src , handler : evthdler , predicates : prct }
186-
187160 // If the source is cluster aware, store it in a separate list.
188- _ , forceDefaultClsuter := src .(ClusterAwareSource )
189- if c .WatchProviderClusters && ! forceDefaultClsuter {
190- if ! watchDesc .IsClusterAware () {
191- return fmt .Errorf ("source %s is not cluster aware, but WatchProviderClusters is true" , src )
161+ var forceDefaultCluster bool
162+ if src , ok := src .(ClusterAwareSource ); ok {
163+ forceDefaultCluster = src .ForceDefaultCluster ()
164+ }
165+ if c .WatchProviderClusters && ! forceDefaultCluster {
166+ src , ok := src .(source.DeepCopyableSyncingSource )
167+ if ! ok {
168+ return fmt .Errorf ("source %T is not cluster aware, but WatchProviderClusters is true" , src )
192169 }
170+ evthdler , ok := evthdler .(handler.DeepCopyableEventHandler )
171+ if ! ok {
172+ return fmt .Errorf ("handler %T is not cluster aware, but WatchProviderClusters is true" , evthdler )
173+ }
174+
175+ watchDesc := & deepcopyableWatchDescription {src : src , handler : evthdler , predicates : prct }
193176 c .clusterAwareWatches = append (c .clusterAwareWatches , watchDesc )
194177
195178 // If the watch is cluster aware, start it for all the clusters
@@ -208,7 +191,7 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
208191 //
209192 // These watches are going to be held on the controller struct until the manager or user calls Start(...).
210193 if ! c .Started {
211- c .startWatches = append (c .startWatches , watchDesc )
194+ c .startWatches = append (c .startWatches , & watchDescription { src : src , handler : evthdler , predicates : prct } )
212195 return nil
213196 }
214197
@@ -260,8 +243,8 @@ func (c *Controller) Disengage(ctx context.Context, cluster cluster.Cluster) err
260243 return nil
261244}
262245
263- func (c * Controller ) startClusterAwareWatchLocked (cldesc * clusterDescription , watchDesc * watchDescription ) error {
264- watch := watchDesc .DeepCopyFor (cldesc )
246+ func (c * Controller ) startClusterAwareWatchLocked (cldesc * clusterDescription , watchDesc * deepcopyableWatchDescription ) error {
247+ watch := & deepcopyableWatchDescription { src : watchDesc .src . DeepCopyFor (cldesc . Cluster ), handler : watchDesc . handler . DeepCopyFor ( cldesc . Cluster ), predicates : watchDesc . predicates }
265248 if watch == nil {
266249 return nil
267250 }
0 commit comments