diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 515be2191a..1e68bf21e4 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -144,6 +144,14 @@ rules: - get - list - watch +- apiGroups: + - cdi.kubevirt.io + resources: + - datavolumes + verbs: + - get + - list + - watch - apiGroups: - cluster.open-cluster-management.io resources: @@ -227,13 +235,24 @@ rules: - patch - update - watch +- apiGroups: + - kubevirt.io + resources: + - virtualmachineinstances + verbs: + - get + - list + - watch - apiGroups: - kubevirt.io resources: - virtualmachines verbs: + - delete - get - list + - patch + - watch - apiGroups: - multicluster.x-k8s.io resources: diff --git a/internal/controller/util/vm_util.go b/internal/controller/util/vm_util.go index 1a6398640e..b76f29a42c 100644 --- a/internal/controller/util/vm_util.go +++ b/internal/controller/util/vm_util.go @@ -3,9 +3,11 @@ package util import ( "context" "fmt" + "strings" "github.com/go-logr/logr" - k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" virtv1 "kubevirt.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -13,6 +15,11 @@ import ( "github.com/ramendr/ramen/internal/controller/core" ) +const ( + KindVirtualMachine = "VirtualMachine" + KubeVirtAPIVersionPrefix = "kubevirt.io/" // prefix match on group; version follows (e.g., v1) +) + func ListVMsByLabelSelector( ctx context.Context, apiReader client.Reader, @@ -57,35 +64,169 @@ func ListVMsByVMNamespace( log logr.Logger, vmNamespaceList []string, vmList []string, -) ([]string, error) { - var foundVMList []string - - var notFoundErr error - +) []virtv1.VirtualMachine { foundVM := &virtv1.VirtualMachine{} + foundVMList := make([]virtv1.VirtualMachine, 0, len(vmList)) for _, ns := range vmNamespaceList { for _, vm := range vmList { vmLookUp := types.NamespacedName{Namespace: ns, Name: vm} if err := apiReader.Get(ctx, vmLookUp, foundVM); err != nil { - if !k8serrors.IsNotFound(err) { - return nil, err - } + continue + } - if notFoundErr == nil { - notFoundErr = err - } + foundVMList = append(foundVMList, *foundVM) + } + } + if len(foundVMList) > 0 { + return foundVMList + } + + return nil +} + +// IsVMDeletionInProgress returns true if any listed KubeVirt VM within the given protected NS is in deletion state. +// Skips VMs that cannot be fetched (likely already deleted); checks all (namespace, name) pairs. +func IsVMDeletionInProgress(ctx context.Context, + k8sclient client.Client, + vmList []string, + vmNamespaceList []string, + log logr.Logger, +) bool { + log.Info("Checking if VirtualMachines are being deleted", + "vmCount", len(vmList), + "vmNames", vmList) + + foundVM := &virtv1.VirtualMachine{} + + for _, ns := range vmNamespaceList { + for _, vm := range vmList { + vmLookUp := types.NamespacedName{Namespace: ns, Name: vm} + if err := k8sclient.Get(ctx, vmLookUp, foundVM); err != nil { + // Continuing with remaining list of VMs as the current one might already have been deleted continue } - foundVMList = append(foundVMList, foundVM.Name) + if !foundVM.GetDeletionTimestamp().IsZero() { + // Deletion of vm has been requested + log.Info("VM deletion is in progress", "VM", vm) + + return true + } } } - if len(foundVMList) > 0 { - return foundVMList, nil + return false +} + +// DeleteVMs deletes the given KubeVirt VMs across the provided namespaces. +// Stops on the first get/delete error and returns it; +func DeleteVMs( + ctx context.Context, + k8sclient client.Client, + foundVMs []virtv1.VirtualMachine, + vmList []string, + vmNamespaceList []string, + log logr.Logger, +) error { + for _, vm := range foundVMs { + ns := vm.GetNamespace() + + vmName := vm.GetName() + + // Foreground deletion option + deleteOpts := &client.DeleteOptions{ + GracePeriodSeconds: nil, + PropagationPolicy: func() *metav1.DeletionPropagation { + p := metav1.DeletePropagationForeground + + return &p + }(), + } + if err := k8sclient.Delete(ctx, &vm, deleteOpts); err != nil { + log.Error(err, "Failed to delete VM", "namespace", ns, "name", vmName) + + return fmt.Errorf("failed to delete VM %s/%s: %w", ns, vmName, err) + } + + log.Info("Deleted VMs successfully", "from namespace", ns, "VM name", vmName) + } + + return nil +} + +// IsOwnedByVM walks the owner chain and returns the VM metadata object if found. +// It prefers KubeVirt VM owners (kind=VirtualMachine, apigroup starts with kubevirt.io/) +// Assuming all the owners are from same namespace +// Typical KubeVirt ownership depth (PVC→DV→VM or PVC→VMI→VM or virt-launcher-pod->VMI->VM) +func IsOwnedByVM( + ctx context.Context, + c client.Client, + obj client.Object, + log logr.Logger, +) (client.Object, error) { + owners := obj.GetOwnerReferences() + // Breadth-first/flat traversal to reduce cognitive complexity + type queued struct { + ns string + owner metav1.OwnerReference + } + + q := make([]queued, 0, len(owners)) + for _, o := range owners { + q = append(q, queued{ns: obj.GetNamespace(), owner: o}) + } + + for len(q) > 0 { + cur := q[0] + q = q[1:] + + // Try fetching only the owner's metadata + ownerMeta, err := fetchPartialMeta(ctx, c, cur.ns, cur.owner) + if err != nil { + log.Info("Failed to fetch owner", "gvk", cur.owner.APIVersion+"/"+cur.owner.Kind, "name", cur.owner.Name, "err", err) + + continue + } + + if ownerMeta.GetUID() != cur.owner.UID { + // UID mismatch; skip + continue + } + + // If this owner is a KubeVirt VM, return it + if isKubeVirtVM(cur.owner) { + return ownerMeta, nil + } + + // Otherwise, enqueue its parents (same namespace assumption for KubeVirt chain) + nestedOwners := ownerMeta.GetOwnerReferences() + for _, nestedOwner := range nestedOwners { + q = append(q, queued{ns: cur.ns, owner: nestedOwner}) + } + } + + return nil, fmt.Errorf("no VM owner found") +} + +func isKubeVirtVM(o metav1.OwnerReference) bool { + return o.Kind == KindVirtualMachine && strings.HasPrefix(o.APIVersion, KubeVirtAPIVersionPrefix) +} + +// Fetch only metadata of the owner +func fetchPartialMeta( + ctx context.Context, + c client.Client, + ns string, + o metav1.OwnerReference, +) (*metav1.PartialObjectMetadata, error) { + objMeta := &metav1.PartialObjectMetadata{} + objMeta.SetGroupVersionKind(schema.FromAPIVersionAndKind(o.APIVersion, o.Kind)) + + if err := c.Get(ctx, client.ObjectKey{Namespace: ns, Name: o.Name}, objMeta); err != nil { + return nil, err } - return nil, notFoundErr + return objMeta, nil } diff --git a/internal/controller/volumereplicationgroup_controller.go b/internal/controller/volumereplicationgroup_controller.go index 280416f455..a69d32ba26 100644 --- a/internal/controller/volumereplicationgroup_controller.go +++ b/internal/controller/volumereplicationgroup_controller.go @@ -10,6 +10,7 @@ import ( "maps" "reflect" "slices" + "strings" "sync" "time" @@ -28,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" + virtv1 "kubevirt.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -397,7 +399,9 @@ func filterPVC(reader client.Reader, pvc *corev1.PersistentVolumeClaim, log logr // +kubebuilder:rbac:groups="",resources=configmaps,verbs=list;watch // +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions,verbs=get;list;watch // +kubebuilder:rbac:groups=core,resources=pods/exec,verbs=create -// +kubebuilder:rbac:groups="kubevirt.io",resources=virtualmachines,verbs=get;list +// +kubebuilder:rbac:groups="kubevirt.io",resources=virtualmachines,verbs=get;list;watch;delete +// +kubebuilder:rbac:groups="kubevirt.io",resources=virtualmachineinstances,verbs=get;list;watch +// +kubebuilder:rbac:groups="cdi.kubevirt.io",resources=datavolumes,verbs=get;list;watch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -1660,6 +1664,8 @@ func (v *VRGInstance) processAsSecondary() ctrl.Result { func (v *VRGInstance) reconcileAsSecondary() ctrl.Result { result := ctrl.Result{} + + result.Requeue = v.ShouldCleanupForSecondary() result.Requeue = v.reconcileVolSyncAsSecondary() || result.Requeue result.Requeue = v.reconcileVolRepsAsSecondary() || result.Requeue @@ -1678,12 +1684,6 @@ func (v *VRGInstance) reconcileAsSecondary() ctrl.Result { v.instance.Status.Conditions = []metav1.Condition{} } - if !result.Requeue && v.isVMRecipeProtection() { - if err := v.validateVMsForStandaloneProtection(); err != nil { - result.Requeue = true - } - } - return result } @@ -1791,6 +1791,8 @@ func (v *VRGInstance) updateVRGStatus(result ctrl.Result) ctrl.Result { v.updateStatusState() + result.Requeue = v.instance.Status.State == ramendrv1alpha1.UnknownState + v.instance.Status.ObservedGeneration = v.instance.Generation if !reflect.DeepEqual(v.savedInstanceStatus, v.instance.Status) { @@ -2441,24 +2443,28 @@ func (v *VRGInstance) CheckForVMConflictOnSecondary() error { } func (v *VRGInstance) CheckForVMNameConflictOnSecondary(vmNamespaceList, vmList []string) error { - var foundVMs []string - - var err error - if foundVMs, err = util.ListVMsByVMNamespace(v.ctx, v.reconciler.APIReader, - v.log, vmNamespaceList, vmList); err != nil { - if !k8serrors.IsNotFound(err) { - return fmt.Errorf("failed to lookup virtualmachine resources, check rbacs") - } + var foundVMs []virtv1.VirtualMachine + if foundVMs = util.ListVMsByVMNamespace(v.ctx, v.reconciler.APIReader, + v.log, vmNamespaceList, vmList); len(foundVMs) == 0 { return nil } - v.log.Info(fmt.Sprintf("found conflicting VM[%v] on secondary", foundVMs)) + v.log.Info("found conflicting VMs on secondary", "foundVMs", vmNamesString(foundVMs)) return fmt.Errorf("protected VMs on the primary cluster share names with VMs on " + "the secondary site, which may impact failover or recovery") } +func vmNamesString(vms []virtv1.VirtualMachine) string { + names := make([]string, len(vms)) + for i := range vms { + names[i] = vms[i].Name + } + + return strings.Join(names, ", ") +} + func (v *VRGInstance) aggregateVRGNoClusterDataConflictCondition() *metav1.Condition { var vmResourceConflict, pvcResourceConflict bool diff --git a/internal/controller/vrg_volrep.go b/internal/controller/vrg_volrep.go index 435ae692c4..2d152ac71f 100644 --- a/internal/controller/vrg_volrep.go +++ b/internal/controller/vrg_volrep.go @@ -18,11 +18,13 @@ import ( k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + virtv1 "kubevirt.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ramendrv1alpha1 "github.com/ramendr/ramen/api/v1alpha1" + recipecore "github.com/ramendr/ramen/internal/controller/core" rmnutil "github.com/ramendr/ramen/internal/controller/util" ) @@ -249,7 +251,8 @@ func (v *VRGInstance) isPVCReadyForSecondary(pvc *corev1.PersistentVolumeClaim, // If PVC is not being deleted, it is not ready for Secondary, unless action is failover if v.instance.Spec.Action != ramendrv1alpha1.VRGActionFailover && !rmnutil.ResourceIsDeleted(pvc) { - log.Info("VolumeReplication cannot become Secondary, as its PersistentVolumeClaim is not marked for deletion") + log.Info("VolumeReplication cannot become Secondary, as its PersistentVolumeClaim is not marked for deletion", + "pvc", pvc.Name) msg := "unable to transition to Secondary as PVC is not deleted" v.updatePVCDataReadyCondition(pvc.Namespace, pvc.Name, VRGConditionReasonProgressing, msg) @@ -2895,3 +2898,222 @@ func PruneAnnotations(annotations map[string]string) map[string]string { return result } + +// Checks and requeues reconciler of VM resource cleanup. +func (v *VRGInstance) ShouldCleanupForSecondary() bool { + if !v.isVMRecipeProtection() { + return false + } + + v.log.Info("Checking VM cleanup and cross-cluster resource conflicts", + "name", v.instance.GetName(), + "namespace", v.instance.GetNamespace(), + "recipeName", "vm-recipe") + + if v.isResourceConflict() { + v.log.Info("Skipping resource cleanup; waiting for conflict resolution") + + return false + } + + v.log.Info("VRG status observed", + "name", v.instance.GetName(), + "namespace", v.instance.GetNamespace(), + "replicationState", v.instance.Spec.ReplicationState, + "statusState", v.instance.Status.State, + "generation", v.instance.GetGeneration(), + "resourceVersion", v.instance.GetResourceVersion(), + ) + + if !v.IsDRActionInProgress() { + v.log.Info("Skip resource cleanup; reconcile as secondary") + + return false + } + + if v.ShouldCleanupVMForSecondary() { + v.log.Info("Requeuing until VM cleanup is complete") + + return true + } + + return false +} + +// Checks if there are conflicting protected resources across managed clusters +func (v *VRGInstance) isResourceConflict() bool { + if err := v.validateVMsForStandaloneProtection(); err != nil { + return true + } + + return false +} + +// 1. If VM resource cleanup is in progress, requeue for reconciliation +// 2. If protected VMs are not found in protected NS, consider VM cleanup complete +// 3. Validates if all protected VMs owns PVCs from the lis tof all the protected volumes directly or indirectly +// a) VM manifest has references to PVC through DataVolumeTemplate or DataVolume +// 4. for step 3(b) VM ownership is set on the respective PVCs +// 5. Deletes the VM using cascade foreground deletion API +func (v *VRGInstance) ShouldCleanupVMForSecondary() bool { + v.log.Info( + "DR action progressing", + "component", "VRGController", + "action", "reconcile", + "vrgName", v.instance.GetName(), + "namespace", v.instance.GetNamespace(), + "desiredState", v.instance.Spec.ReplicationState, + "currentState", v.instance.Status.State, + ) + + vmNamespaceList := v.instance.Spec.KubeObjectProtection.RecipeParameters[recipecore.ProtectedVMNamespace] + vmList := v.instance.Spec.KubeObjectProtection.RecipeParameters[recipecore.VMList] + + foundVMs, yes := v.skipVMCleanupVerificationCheck() + if yes { // VM cleanup is in progress + return true + } + + if len(foundVMs) == 0 && !yes { // VM cleanup complete + v.log.Info("VM cleanup completed") + + return false + } + + // Proceed to cleanup VMs + + v.log.Info("Validate PVC ownerReferences") + + yes = v.ValidatePVCOwnershipOnVMs() + if !yes { + return yes + } + + v.log.Info("Proceed to cleanup VM resources") + // Cleanup VM resources + err := rmnutil.DeleteVMs(v.ctx, v.reconciler.Client, foundVMs, vmList, vmNamespaceList, v.log) + if err != nil { // Requeue and retry + v.log.Error(err, "Failed to delete VMs", + "vmList", vmList, + ) + + // return false + } + + return true + + // return false +} + +// pvcProcessResult captures the decision for a single PVC. +type pvcProcessResult struct { + vm *metav1.PartialObjectMetadata + skip bool // manual cleanup required; abort overall cleanup + retry bool // transient error; request requeue +} + +// ValidatePVCOwnershipOnVMs ensures PVCs used by protected VMs carry VM OwnerReferences, +// or decides to skip/retry cleanup based on current state. +func (v *VRGInstance) ValidatePVCOwnershipOnVMs() bool { + // ToDo: Skip Volsync PVCs for now. Can be considered later + protectedPVCs := make([]corev1.PersistentVolumeClaim, 0, len(v.volRepPVCs)) + protectedPVCs = append(protectedPVCs, v.volRepPVCs...) + + // Map PVC name → VM for patching later + pvcToVM := make(map[string]*metav1.PartialObjectMetadata) + + var ( + skipCleanup bool + retryCleanup bool + ) + + for i := range protectedPVCs { + res := v.processPVC(&protectedPVCs[i]) + if res.skip { + skipCleanup = true + + continue + } + + if res.retry { + retryCleanup = true + + continue + } + + if res.vm != nil { + pvcToVM[protectedPVCs[i].Name] = res.vm + } + } + + // If any PVC cannot be associated → skip cleanup + if skipCleanup { + return false + } + + // true if no retry needed, false if retry required + return !retryCleanup +} + +// processPVC decides what to do with a single PVC: +// - If it already has an OwnerReference, validate it. +// - Else, check if used by virt-launcher, and whether VM is protected. +// Returns mapping info or flags indicating skip/retry. +func (v *VRGInstance) processPVC( + pvc *corev1.PersistentVolumeClaim, +) pvcProcessResult { + log := logWithPvcName(v.log, pvc) + + // 1. If PVC already has OwnerReference, validate it + if len(pvc.OwnerReferences) > 0 { + vm, err := rmnutil.IsOwnedByVM(v.ctx, v.reconciler.Client, pvc, log) + if err != nil { + log.Error(err, "Skipping cleanup", + "pvc", pvc.Name, + "reason", "invalid ownerReferences", + "action", "manual cleanup required") + + return pvcProcessResult{skip: true} + } + + log.Info("Continue with cleanup", + "pvc", pvc.GetName(), + "ownedByVM", vm.GetName(), + "ownerReferences", pvc.OwnerReferences) + + // Already valid; no patching needed + return pvcProcessResult{} + } + + return pvcProcessResult{skip: true} +} + +// skip VM cleanup if cleanup is already in progress or cleanup just completed. +func (v *VRGInstance) skipVMCleanupVerificationCheck() ([]virtv1.VirtualMachine, bool) { + vmNamespaceList := v.instance.Spec.KubeObjectProtection.RecipeParameters[recipecore.ProtectedVMNamespace] + vmList := v.instance.Spec.KubeObjectProtection.RecipeParameters[recipecore.VMList] + + var foundVMs []virtv1.VirtualMachine + + if len(vmList) > 0 { + if rmnutil.IsVMDeletionInProgress(v.ctx, v.reconciler.Client, vmList, vmNamespaceList, v.log) { + v.log.Info("VM deletion is in progress, skipping ownerreferences check") + + return nil, true + } + + foundVMs = rmnutil.ListVMsByVMNamespace(v.ctx, v.reconciler.APIReader, + v.log, vmNamespaceList, vmList) + if len(foundVMs) == 0 { + v.log.Info( + "No VirtualMachines found for cleanup; deletion appears complete", + "vmList", vmList, + "namespaceList", vmNamespaceList, + ) + + return foundVMs, false + } + } + + return foundVMs, false +}