From b60494654cfcfc2c43fa4db2c42253e78c88ffe2 Mon Sep 17 00:00:00 2001 From: Marvin Beckers Date: Mon, 6 Jan 2025 16:58:55 +0100 Subject: [PATCH] Add reconcile.ClusterAwareRequest and matching EventHandlers On-behalf-of: SAP Signed-off-by: Marvin Beckers --- examples/fleet-namespace/cluster.go | 15 --- examples/fleet-namespace/eventhandler.go | 124 --------------------- examples/fleet-namespace/main.go | 8 +- examples/fleet/README.md | 9 ++ examples/fleet/cluster.go | 35 ------ examples/fleet/eventhandler.go | 125 --------------------- examples/fleet/main.go | 73 ++++++++----- pkg/builder/controller.go | 35 ++++-- pkg/handler/enqueue_cluster.go | 131 +++++++++++++++++++++++ pkg/handler/enqueue_owner_cluster.go | 112 +++++++++++++++++++ pkg/reconcile/reconcile.go | 9 ++ 11 files changed, 335 insertions(+), 341 deletions(-) delete mode 100644 examples/fleet-namespace/eventhandler.go create mode 100644 examples/fleet/README.md delete mode 100644 examples/fleet/cluster.go delete mode 100644 examples/fleet/eventhandler.go create mode 100644 pkg/handler/enqueue_cluster.go create mode 100644 pkg/handler/enqueue_owner_cluster.go diff --git a/examples/fleet-namespace/cluster.go b/examples/fleet-namespace/cluster.go index fce755cfe9..ce012cfeff 100644 --- a/examples/fleet-namespace/cluster.go +++ b/examples/fleet-namespace/cluster.go @@ -19,27 +19,12 @@ package main import ( "context" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/cluster" - "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -type clusterRequest struct { - reconcile.Request - ClusterName string -} - -// String returns the general purpose string representation. -func (cr *clusterRequest) String() string { - if cr.ClusterName == "" { - return cr.NamespacedName.String() - } - return "cluster://" + cr.ClusterName + string(types.Separator) + cr.NamespacedName.String() -} - type NamespacedCluster struct { clusterName string cluster.Cluster diff --git a/examples/fleet-namespace/eventhandler.go b/examples/fleet-namespace/eventhandler.go deleted file mode 100644 index bb687b5e27..0000000000 --- a/examples/fleet-namespace/eventhandler.go +++ /dev/null @@ -1,124 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "context" - "reflect" - - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/cluster" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/reconcile" -) - -var _ handler.TypedEventHandler[client.Object, clusterRequest] = &EnqueueClusterRequestForObject{} - -type EnqueueClusterRequestForObject = TypedEnqueueClusterRequestForObject[client.Object] - -type TypedEnqueueClusterRequestForObject[object client.Object] struct { - clusterName string -} - -// Create implements EventHandler. -func (e *TypedEnqueueClusterRequestForObject[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.TypedRateLimitingInterface[clusterRequest]) { - if isNil(evt.Object) { - return - } - - q.Add(clusterRequest{ - ClusterName: e.clusterName, - Request: reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }}}) -} - -// Update implements EventHandler. -func (e *TypedEnqueueClusterRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[clusterRequest]) { - switch { - case !isNil(evt.ObjectNew): - q.Add(clusterRequest{ - ClusterName: e.clusterName, - Request: reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.ObjectNew.GetName(), - Namespace: evt.ObjectNew.GetNamespace(), - }}}) - case !isNil(evt.ObjectOld): - q.Add(clusterRequest{ - ClusterName: e.clusterName, - Request: reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.ObjectOld.GetName(), - Namespace: evt.ObjectOld.GetNamespace(), - }}}) - } -} - -// Delete implements EventHandler. -func (e *TypedEnqueueClusterRequestForObject[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.TypedRateLimitingInterface[clusterRequest]) { - if isNil(evt.Object) { - return - } - - q.Add(clusterRequest{ - ClusterName: e.clusterName, - Request: reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }}}) -} - -// Generic implements EventHandler. -func (e *TypedEnqueueClusterRequestForObject[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.TypedRateLimitingInterface[clusterRequest]) { - if isNil(evt.Object) { - return - } - - q.Add(clusterRequest{ - ClusterName: e.clusterName, - Request: reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }}}) -} - -func (e *TypedEnqueueClusterRequestForObject[T]) DeepCopyFor(c cluster.Cluster) handler.TypedDeepCopyableEventHandler[T, clusterRequest] { - if e == nil { - return nil - } - - out := new(TypedEnqueueClusterRequestForObject[T]) - *out = *e - out.clusterName = c.Name() - - return out -} - -func isNil(arg any) bool { - if v := reflect.ValueOf(arg); !v.IsValid() || ((v.Kind() == reflect.Ptr || - v.Kind() == reflect.Interface || - v.Kind() == reflect.Slice || - v.Kind() == reflect.Map || - v.Kind() == reflect.Chan || - v.Kind() == reflect.Func) && v.IsNil()) { - return true - } - return false -} diff --git a/examples/fleet-namespace/main.go b/examples/fleet-namespace/main.go index cbbce465b8..e82d10d563 100644 --- a/examples/fleet-namespace/main.go +++ b/examples/fleet-namespace/main.go @@ -120,11 +120,11 @@ func main() { os.Exit(1) } - if err := builder.TypedControllerManagedBy[clusterRequest](mgr). + if err := builder.TypedControllerManagedBy[reconcile.ClusterAwareRequest](mgr). Named("fleet-ns-configmap-controller"). - Watches(&corev1.ConfigMap{}, &EnqueueClusterRequestForObject{}). - Complete(reconcile.TypedFunc[clusterRequest]( - func(ctx context.Context, req clusterRequest) (ctrl.Result, error) { + For(&corev1.ConfigMap{}). + Complete(reconcile.TypedFunc[reconcile.ClusterAwareRequest]( + func(ctx context.Context, req reconcile.ClusterAwareRequest) (ctrl.Result, error) { log := log.FromContext(ctx).WithValues("cluster", req.ClusterName) cl, err := mgr.GetCluster(ctx, req.ClusterName) diff --git a/examples/fleet/README.md b/examples/fleet/README.md new file mode 100644 index 0000000000..d28ba25f54 --- /dev/null +++ b/examples/fleet/README.md @@ -0,0 +1,9 @@ +# examples/fleet + +The `fleet` example is a basic implementation of a multi-cluster-capable controller. As a cluster provider it uses `kind`: Every local kind cluster with a `fleet-` name prefix will be picked up as dynamic cluster if multi-cluster support is enabled in `fleet`. + +`fleet` can switch between multi-cluster mode and single-cluster mode via the `--enable-cluster-provider` flag (defaults to `true`) to demonstrate the seamless switch of using the same code base as a "normal" single-cluster controller and as a multi-cluster controller as easy as plugging in a (third-party) cluster provider. To run `fleet` against the current `KUBECONFIG` in single-cluster mode, just run: + +```sh +$ USE_EXISTING_CLUSTER=true go run . --enable-cluster-provider=false +``` diff --git a/examples/fleet/cluster.go b/examples/fleet/cluster.go deleted file mode 100644 index 48ef56f46d..0000000000 --- a/examples/fleet/cluster.go +++ /dev/null @@ -1,35 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/reconcile" -) - -type clusterRequest struct { - reconcile.Request - ClusterName string -} - -// String returns the general purpose string representation. -func (cr *clusterRequest) String() string { - if cr.ClusterName == "" { - return cr.NamespacedName.String() - } - return "cluster://" + cr.ClusterName + string(types.Separator) + cr.NamespacedName.String() -} diff --git a/examples/fleet/eventhandler.go b/examples/fleet/eventhandler.go deleted file mode 100644 index 0e05c5e17b..0000000000 --- a/examples/fleet/eventhandler.go +++ /dev/null @@ -1,125 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "context" - "reflect" - - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/cluster" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/reconcile" -) - -var _ handler.TypedDeepCopyableEventHandler[client.Object, clusterRequest] = &EnqueueClusterRequestForObject{} - -type EnqueueClusterRequestForObject = TypedEnqueueClusterRequestForObject[client.Object] - -type TypedEnqueueClusterRequestForObject[object client.Object] struct { - clusterName string -} - -// Create implements EventHandler. -func (e *TypedEnqueueClusterRequestForObject[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.TypedRateLimitingInterface[clusterRequest]) { - if isNil(evt.Object) { - return - } - - q.Add(clusterRequest{ - ClusterName: e.clusterName, - Request: reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }}}) -} - -// Update implements EventHandler. -func (e *TypedEnqueueClusterRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[clusterRequest]) { - - switch { - case !isNil(evt.ObjectNew): - q.Add(clusterRequest{ - ClusterName: e.clusterName, - Request: reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.ObjectNew.GetName(), - Namespace: evt.ObjectNew.GetNamespace(), - }}}) - case !isNil(evt.ObjectOld): - q.Add(clusterRequest{ - ClusterName: e.clusterName, - Request: reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.ObjectOld.GetName(), - Namespace: evt.ObjectOld.GetNamespace(), - }}}) - } -} - -// Delete implements EventHandler. -func (e *TypedEnqueueClusterRequestForObject[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.TypedRateLimitingInterface[clusterRequest]) { - if isNil(evt.Object) { - return - } - - q.Add(clusterRequest{ - ClusterName: e.clusterName, - Request: reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }}}) -} - -// Generic implements EventHandler. -func (e *TypedEnqueueClusterRequestForObject[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.TypedRateLimitingInterface[clusterRequest]) { - if isNil(evt.Object) { - return - } - - q.Add(clusterRequest{ - ClusterName: e.clusterName, - Request: reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }}}) -} - -func (e *TypedEnqueueClusterRequestForObject[T]) DeepCopyFor(c cluster.Cluster) handler.TypedDeepCopyableEventHandler[T, clusterRequest] { - if e == nil { - return nil - } - - out := new(TypedEnqueueClusterRequestForObject[T]) - *out = *e - out.clusterName = c.Name() - - return out -} - -func isNil(arg any) bool { - if v := reflect.ValueOf(arg); !v.IsValid() || ((v.Kind() == reflect.Ptr || - v.Kind() == reflect.Interface || - v.Kind() == reflect.Slice || - v.Kind() == reflect.Map || - v.Kind() == reflect.Chan || - v.Kind() == reflect.Func) && v.IsNil()) { - return true - } - return false -} diff --git a/examples/fleet/main.go b/examples/fleet/main.go index 0504908e6d..9fc788bf82 100644 --- a/examples/fleet/main.go +++ b/examples/fleet/main.go @@ -25,6 +25,8 @@ import ( "time" "github.com/go-logr/logr" + "github.com/spf13/pflag" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/sets" @@ -48,6 +50,14 @@ func main() { ctx := signals.SetupSignalHandler() entryLog := log.Log.WithName("entrypoint") + var ( + enableClusterProvider bool + provider *KindClusterProvider + ) + + pflag.BoolVar(&enableClusterProvider, "enable-cluster-provider", true, "Enables experimental kind cluster provider") + pflag.Parse() + testEnv := &envtest.Environment{} cfg, err := testEnv.Start() if err != nil { @@ -66,25 +76,28 @@ func main() { // Setup a Manager, note that this not yet engages clusters, only makes them available. entryLog.Info("Setting up manager") - provider := &KindClusterProvider{ - log: log.Log.WithName("kind-cluster-provider"), - clusters: map[string]cluster.Cluster{}, - cancelFns: map[string]context.CancelFunc{}, + opts := manager.Options{} + + if enableClusterProvider { + provider = &KindClusterProvider{ + log: log.Log.WithName("kind-cluster-provider"), + clusters: map[string]cluster.Cluster{}, + cancelFns: map[string]context.CancelFunc{}, + } + opts.ExperimentalClusterProvider = provider } - mgr, err := manager.New( - cfg, - manager.Options{ExperimentalClusterProvider: provider}, - ) + mgr, err := manager.New(cfg, opts) if err != nil { entryLog.Error(err, "unable to set up overall controller manager") os.Exit(1) } - if err := builder.TypedControllerManagedBy[clusterRequest](mgr). - Named("fleet-pod-controller"). - Watches(&corev1.Pod{}, &EnqueueClusterRequestForObject{}). - Complete(reconcile.TypedFunc[clusterRequest]( - func(ctx context.Context, req clusterRequest) (ctrl.Result, error) { + if err := builder.TypedControllerManagedBy[reconcile.ClusterAwareRequest](mgr). + Named("fleet-controller"). + For(&appsv1.ReplicaSet{}). + Owns(&corev1.Pod{}). + Complete(reconcile.TypedFunc[reconcile.ClusterAwareRequest]( + func(ctx context.Context, req reconcile.ClusterAwareRequest) (ctrl.Result, error) { log := log.FromContext(ctx).WithValues("cluster", req.ClusterName) cl, err := mgr.GetCluster(ctx, req.ClusterName) @@ -93,27 +106,27 @@ func main() { } client := cl.GetClient() - // Retrieve the pod from the cluster. - pod := &corev1.Pod{} - if err := client.Get(ctx, req.NamespacedName, pod); err != nil { + // Retrieve the ReplicaSet from the cluster. + replicaSet := &appsv1.ReplicaSet{} + if err := client.Get(ctx, req.NamespacedName, replicaSet); err != nil { if !apierrors.IsNotFound(err) { return reconcile.Result{}, err } - // Pod was deleted. + // ReplicaSet was deleted. return reconcile.Result{}, nil } - // If the pod is being deleted, we can skip it. - if pod.DeletionTimestamp != nil { + // If the ReplicaSet is being deleted, we can skip it. + if replicaSet.DeletionTimestamp != nil { return reconcile.Result{}, nil } - log.Info("Reconciling pod", "ns", pod.GetNamespace(), "name", pod.Name, "uuid", pod.UID) + log.Info("Reconciling ReplicaSet", "ns", replicaSet.GetNamespace(), "name", replicaSet.Name, "uuid", replicaSet.UID) // Print any annotations that start with fleet. - for k, v := range pod.Labels { + for k, v := range replicaSet.Labels { if strings.HasPrefix(k, "fleet-") { - log.Info("Detected fleet label!", "pod", pod.Name, "key", k, "value", v) + log.Info("Detected fleet label!", "pod", replicaSet.Name, "key", k, "value", v) } } @@ -124,13 +137,15 @@ func main() { os.Exit(1) } - entryLog.Info("Starting provider") - go func() { - if err := provider.Run(ctx, mgr); err != nil { - entryLog.Error(err, "unable to run provider") - os.Exit(1) - } - }() + if enableClusterProvider && provider != nil { + entryLog.Info("Starting provider") + go func() { + if err := provider.Run(ctx, mgr); err != nil { + entryLog.Error(err, "unable to run provider") + os.Exit(1) + } + }() + } entryLog.Info("Starting manager") if err := mgr.Start(ctx); err != nil { diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 4a634d4e0e..d07214219e 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -326,12 +326,16 @@ func (cc *typedClusterWatcher[request]) Watch(ctx context.Context, cl cluster.Cl return err } - if reflect.TypeFor[request]() != reflect.TypeOf(reconcile.Request{}) { - return fmt.Errorf("For() can only be used with reconcile.Request, got %T", *new(request)) + var hdler handler.TypedEventHandler[client.Object, request] + switch reflect.TypeFor[request]() { + case reflect.TypeOf(reconcile.Request{}): + reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{}))) + case reflect.TypeOf(reconcile.ClusterAwareRequest{}): + reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(&handler.EnqueueClusterAwareRequestForObject{ClusterName: cl.Name()}))) + default: + return fmt.Errorf("For() can only be used with reconcile.Request or reconcile.ClusterAwareRequest, got %T", *new(request)) } - var hdler handler.TypedEventHandler[client.Object, request] - reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{}))) allPredicates := append([]predicate.Predicate(nil), cc.globalPredicates...) allPredicates = append(allPredicates, cc.forInput.predicates...) src := &ctxBoundedSyncingSource[request]{ctx: ctx, src: source.TypedKind(cl.GetCache(), obj, hdler, allPredicates...)} @@ -351,11 +355,24 @@ func (cc *typedClusterWatcher[request]) Watch(ctx context.Context, cl cluster.Cl } var hdler handler.TypedEventHandler[client.Object, request] - reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(handler.EnqueueRequestForOwner( - cl.GetScheme(), cl.GetRESTMapper(), - cc.forInput.object, - opts..., - )))) + switch reflect.TypeFor[request]() { + case reflect.TypeOf(reconcile.Request{}): + reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(handler.EnqueueRequestForOwner( + cl.GetScheme(), cl.GetRESTMapper(), + cc.forInput.object, + opts..., + )))) + case reflect.TypeOf(reconcile.ClusterAwareRequest{}): + reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(handler.EnqueueClusterAwareRequestForOwner( + cl.GetScheme(), cl.GetRESTMapper(), + cc.forInput.object, + cl, + opts..., + )))) + default: + return fmt.Errorf("Owns() can only be used with reconcile.Request or reconcile.ClusterAwareRequest, got %T", *new(request)) + } + allPredicates := append([]predicate.Predicate(nil), cc.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) src := &ctxBoundedSyncingSource[request]{ctx: ctx, src: source.TypedKind(cl.GetCache(), obj, hdler, allPredicates...)} diff --git a/pkg/handler/enqueue_cluster.go b/pkg/handler/enqueue_cluster.go new file mode 100644 index 0000000000..7aab1267d2 --- /dev/null +++ b/pkg/handler/enqueue_cluster.go @@ -0,0 +1,131 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "context" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/event" + logf "sigs.k8s.io/controller-runtime/pkg/internal/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +var clusterenqueueLog = logf.RuntimeLog.WithName("eventhandler").WithName("EnqueueClusterAwareRequestForObject") + +var _ TypedEventHandler[client.Object, reconcile.ClusterAwareRequest] = &EnqueueClusterAwareRequestForObject{} +var _ TypedDeepCopyableEventHandler[client.Object, reconcile.ClusterAwareRequest] = &EnqueueClusterAwareRequestForObject{} + +// EnqueueClusterAwareRequestForObject enqueues a ClusterAwareRequest containing the Name, Namespace and ClusterName of the object that is the source of the Event. +// (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueClusterAwareRequestForObject should be used by multi-cluster +// Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource. +type EnqueueClusterAwareRequestForObject = TypedEnqueueClusterAwareRequestForObject[client.Object] + +// TypedEnqueueClusterAwareRequestForObject enqueues a ClusterAwareRequest containing the Name, Namespace and ClusterName of the object that is the source of the Event. +// (e.g. the created / deleted / updated objects Name and Namespace). handler.TypedEnqueueClusterAwareRequestForObject should be used by multi-cluster +// Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource. +// +// TypedEnqueueClusterAwareRequestForObject is experimental and subject to future change. +type TypedEnqueueClusterAwareRequestForObject[object client.Object] struct { + ClusterName string +} + +// Create implements EventHandler. +func (e *TypedEnqueueClusterAwareRequestForObject[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.ClusterAwareRequest]) { + if isNil(evt.Object) { + clusterenqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) + return + } + q.Add(reconcile.ClusterAwareRequest{ + Request: reconcile.Request{NamespacedName: types.NamespacedName{ + Name: evt.Object.GetName(), + Namespace: evt.Object.GetNamespace(), + }}, + ClusterName: e.ClusterName, + }) +} + +// Update implements EventHandler. +func (e *TypedEnqueueClusterAwareRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.ClusterAwareRequest]) { + switch { + case !isNil(evt.ObjectNew): + q.Add(reconcile.ClusterAwareRequest{ + Request: reconcile.Request{NamespacedName: types.NamespacedName{ + Name: evt.ObjectNew.GetName(), + Namespace: evt.ObjectNew.GetNamespace(), + }}, + ClusterName: e.ClusterName, + }) + case !isNil(evt.ObjectOld): + q.Add( + reconcile.ClusterAwareRequest{ + Request: reconcile.Request{NamespacedName: types.NamespacedName{ + Name: evt.ObjectOld.GetName(), + Namespace: evt.ObjectOld.GetNamespace(), + }}, + ClusterName: e.ClusterName, + }) + default: + clusterenqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt) + } +} + +// Delete implements EventHandler. +func (e *TypedEnqueueClusterAwareRequestForObject[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.ClusterAwareRequest]) { + if isNil(evt.Object) { + clusterenqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt) + return + } + q.Add(reconcile.ClusterAwareRequest{ + Request: reconcile.Request{NamespacedName: types.NamespacedName{ + Name: evt.Object.GetName(), + Namespace: evt.Object.GetNamespace(), + }}, + ClusterName: e.ClusterName, + }) +} + +// Generic implements EventHandler. +func (e *TypedEnqueueClusterAwareRequestForObject[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.ClusterAwareRequest]) { + if isNil(evt.Object) { + clusterenqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt) + return + } + q.Add(reconcile.ClusterAwareRequest{ + Request: reconcile.Request{NamespacedName: types.NamespacedName{ + Name: evt.Object.GetName(), + Namespace: evt.Object.GetNamespace(), + }}, + ClusterName: e.ClusterName, + }) +} + +// DeepCopyFor implements TypedDeepCopyableEventHandler. +func (e *TypedEnqueueClusterAwareRequestForObject[T]) DeepCopyFor(c cluster.Cluster) TypedDeepCopyableEventHandler[T, reconcile.ClusterAwareRequest] { + if e == nil { + return nil + } + + out := new(TypedEnqueueClusterAwareRequestForObject[T]) + *out = *e + out.ClusterName = c.Name() + + return out +} diff --git a/pkg/handler/enqueue_owner_cluster.go b/pkg/handler/enqueue_owner_cluster.go new file mode 100644 index 0000000000..0d90f97811 --- /dev/null +++ b/pkg/handler/enqueue_owner_cluster.go @@ -0,0 +1,112 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "context" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +var _ TypedEventHandler[client.Object, reconcile.ClusterAwareRequest] = &enqueueClusterAwareRequestForOwner[client.Object]{} + +// EnqueueClusterAwareRequestForOwner enqueues ClusterAwareRequests for the Owners of an object. E.g. the object that created +// the object that was the source of the Event. +// +// If a ReplicaSet creates Pods, users may reconcile the ReplicaSet in response to Pod Events using: +// +// - a source.Kind Source with Type of Pod. +// +// - a handler.enqueueClusterAwareRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true. +func EnqueueClusterAwareRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, cl cluster.Cluster, opts ...OwnerOption) TypedEventHandler[client.Object, reconcile.ClusterAwareRequest] { + return TypedEnqueueClusterAwareRequestForOwner[client.Object](scheme, mapper, ownerType, cl, opts...) +} + +// TypedEnqueueClusterAwareRequestForOwner enqueues ClusterAwareRequests for the Owners of an object. E.g. the object that created +// the object that was the source of the Event. +// +// If a ReplicaSet creates Pods, users may reconcile the ReplicaSet in response to Pod Events using: +// +// - a source.Kind Source with Type of Pod. +// +// - a handler.typedEnqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true. +// +// TypedEnqueueClusterAwareRequestForOwner is experimental and subject to future change. +func TypedEnqueueClusterAwareRequestForOwner[object client.Object](scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, cl cluster.Cluster, opts ...OwnerOption) TypedEventHandler[object, reconcile.ClusterAwareRequest] { + e := &enqueueClusterAwareRequestForOwner[object]{ + enqueueRequestForOwner: enqueueRequestForOwner[object]{ + ownerType: ownerType, + mapper: mapper, + }, + clusterName: cl.Name(), + } + if err := e.parseOwnerTypeGroupKind(scheme); err != nil { + panic(err) + } + for _, opt := range opts { + opt(e) + } + return e +} + +type enqueueClusterAwareRequestForOwner[object client.Object] struct { + enqueueRequestForOwner[object] + clusterName string +} + +// Create implements TypedEventHandler. +func (e *enqueueClusterAwareRequestForOwner[object]) Create(ctx context.Context, evt event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[reconcile.ClusterAwareRequest]) { + reqs := map[reconcile.Request]empty{} + e.getOwnerReconcileRequest(evt.Object, reqs) + for req := range reqs { + q.Add(reconcile.ClusterAwareRequest{Request: req, ClusterName: e.clusterName}) + } +} + +// Update implements TypedEventHandler. +func (e *enqueueClusterAwareRequestForOwner[object]) Update(ctx context.Context, evt event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[reconcile.ClusterAwareRequest]) { + reqs := map[reconcile.Request]empty{} + e.getOwnerReconcileRequest(evt.ObjectOld, reqs) + e.getOwnerReconcileRequest(evt.ObjectNew, reqs) + for req := range reqs { + q.Add(reconcile.ClusterAwareRequest{Request: req, ClusterName: e.clusterName}) + } +} + +// Delete implements TypedEventHandler. +func (e *enqueueClusterAwareRequestForOwner[object]) Delete(ctx context.Context, evt event.TypedDeleteEvent[object], q workqueue.TypedRateLimitingInterface[reconcile.ClusterAwareRequest]) { + reqs := map[reconcile.Request]empty{} + e.getOwnerReconcileRequest(evt.Object, reqs) + for req := range reqs { + q.Add(reconcile.ClusterAwareRequest{Request: req, ClusterName: e.clusterName}) + } +} + +// Generic implements TypedEventHandler. +func (e *enqueueClusterAwareRequestForOwner[object]) Generic(ctx context.Context, evt event.TypedGenericEvent[object], q workqueue.TypedRateLimitingInterface[reconcile.ClusterAwareRequest]) { + reqs := map[reconcile.Request]empty{} + e.getOwnerReconcileRequest(evt.Object, reqs) + for req := range reqs { + q.Add(reconcile.ClusterAwareRequest{Request: req, ClusterName: e.clusterName}) + } +} diff --git a/pkg/reconcile/reconcile.go b/pkg/reconcile/reconcile.go index ee63f681cc..d213c6eec7 100644 --- a/pkg/reconcile/reconcile.go +++ b/pkg/reconcile/reconcile.go @@ -52,6 +52,15 @@ type Request struct { types.NamespacedName } +// ClusterAwareRequest contains the information necessary to reconcile a Kubernetes object in a multi-cluster +// controller. This includes the information of a normal `Request` (Name and Namespace) plus a ClusterName +// that can then be used to call the `Get` function of the `cluster.Provider` interface to fetch the correct +// cluster for a request. +type ClusterAwareRequest struct { + Request + ClusterName string +} + /* Reconciler implements a Kubernetes API for a specific Resource by Creating, Updating or Deleting Kubernetes objects, or by making changes to systems external to the cluster (e.g. cloudproviders, github, etc).