Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ rules:
- get
- list
- watch
- apiGroups:
- cdi.kubevirt.io
resources:
- datavolumes
verbs:
- get
- list
- watch
- apiGroups:
- cluster.open-cluster-management.io
resources:
Expand Down Expand Up @@ -227,13 +235,24 @@ rules:
- patch
- update
- watch
- apiGroups:
- kubevirt.io
resources:
- virtualmachineinstances
verbs:
- get
- list
- watch
- apiGroups:
- kubevirt.io
resources:
- virtualmachines
verbs:
- delete
- get
- list
- patch
- watch
- apiGroups:
- multicluster.x-k8s.io
resources:
Expand Down
173 changes: 157 additions & 16 deletions internal/controller/util/vm_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,23 @@ package util
import (
"context"
"fmt"
"strings"

"github.com/go-logr/logr"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
virtv1 "kubevirt.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/ramendr/ramen/internal/controller/core"
)

const (
KindVirtualMachine = "VirtualMachine"
KubeVirtAPIVersionPrefix = "kubevirt.io/" // prefix match on group; version follows (e.g., v1)
)

func ListVMsByLabelSelector(
ctx context.Context,
apiReader client.Reader,
Expand Down Expand Up @@ -57,35 +64,169 @@ func ListVMsByVMNamespace(
log logr.Logger,
vmNamespaceList []string,
vmList []string,
) ([]string, error) {
var foundVMList []string

var notFoundErr error

) []virtv1.VirtualMachine {
foundVM := &virtv1.VirtualMachine{}

foundVMList := make([]virtv1.VirtualMachine, 0, len(vmList))
for _, ns := range vmNamespaceList {
for _, vm := range vmList {
vmLookUp := types.NamespacedName{Namespace: ns, Name: vm}
if err := apiReader.Get(ctx, vmLookUp, foundVM); err != nil {
if !k8serrors.IsNotFound(err) {
return nil, err
}
continue
}

if notFoundErr == nil {
notFoundErr = err
}
foundVMList = append(foundVMList, *foundVM)
}
}

if len(foundVMList) > 0 {
return foundVMList
}

return nil
}

// IsVMDeletionInProgress returns true if any listed KubeVirt VM within the given protected NS is in deletion state.
// Skips VMs that cannot be fetched (likely already deleted); checks all (namespace, name) pairs.
func IsVMDeletionInProgress(ctx context.Context,
k8sclient client.Client,
vmList []string,
vmNamespaceList []string,
log logr.Logger,
) bool {
log.Info("Checking if VirtualMachines are being deleted",
"vmCount", len(vmList),
"vmNames", vmList)

foundVM := &virtv1.VirtualMachine{}

for _, ns := range vmNamespaceList {
for _, vm := range vmList {
vmLookUp := types.NamespacedName{Namespace: ns, Name: vm}
if err := k8sclient.Get(ctx, vmLookUp, foundVM); err != nil {
// Continuing with remaining list of VMs as the current one might already have been deleted
continue
}

foundVMList = append(foundVMList, foundVM.Name)
if !foundVM.GetDeletionTimestamp().IsZero() {
// Deletion of vm has been requested
log.Info("VM deletion is in progress", "VM", vm)

return true
}
}
}

if len(foundVMList) > 0 {
return foundVMList, nil
return false
}

// DeleteVMs deletes the given KubeVirt VMs across the provided namespaces.
// Stops on the first get/delete error and returns it;
func DeleteVMs(
ctx context.Context,
k8sclient client.Client,
foundVMs []virtv1.VirtualMachine,
vmList []string,
vmNamespaceList []string,
log logr.Logger,
) error {
for _, vm := range foundVMs {
ns := vm.GetNamespace()

vmName := vm.GetName()

// Foreground deletion option
deleteOpts := &client.DeleteOptions{
GracePeriodSeconds: nil,
PropagationPolicy: func() *metav1.DeletionPropagation {
p := metav1.DeletePropagationForeground

return &p
}(),
}
if err := k8sclient.Delete(ctx, &vm, deleteOpts); err != nil {
log.Error(err, "Failed to delete VM", "namespace", ns, "name", vmName)

return fmt.Errorf("failed to delete VM %s/%s: %w", ns, vmName, err)
}

log.Info("Deleted VMs successfully", "from namespace", ns, "VM name", vmName)
}

return nil
}

// IsOwnedByVM walks the owner chain and returns the VM metadata object if found.
// It prefers KubeVirt VM owners (kind=VirtualMachine, apigroup starts with kubevirt.io/)
// Assuming all the owners are from same namespace
// Typical KubeVirt ownership depth (PVC→DV→VM or PVC→VMI→VM or virt-launcher-pod->VMI->VM)
func IsOwnedByVM(
ctx context.Context,
c client.Client,
obj client.Object,
log logr.Logger,
) (client.Object, error) {
owners := obj.GetOwnerReferences()
// Breadth-first/flat traversal to reduce cognitive complexity
type queued struct {
ns string
owner metav1.OwnerReference
}

q := make([]queued, 0, len(owners))
for _, o := range owners {
q = append(q, queued{ns: obj.GetNamespace(), owner: o})
}

for len(q) > 0 {
cur := q[0]
q = q[1:]

// Try fetching only the owner's metadata
ownerMeta, err := fetchPartialMeta(ctx, c, cur.ns, cur.owner)
if err != nil {
log.Info("Failed to fetch owner", "gvk", cur.owner.APIVersion+"/"+cur.owner.Kind, "name", cur.owner.Name, "err", err)

continue
}

if ownerMeta.GetUID() != cur.owner.UID {
// UID mismatch; skip
continue
}

// If this owner is a KubeVirt VM, return it
if isKubeVirtVM(cur.owner) {
return ownerMeta, nil
}

// Otherwise, enqueue its parents (same namespace assumption for KubeVirt chain)
nestedOwners := ownerMeta.GetOwnerReferences()
for _, nestedOwner := range nestedOwners {
q = append(q, queued{ns: cur.ns, owner: nestedOwner})
}
}

return nil, fmt.Errorf("no VM owner found")
}

func isKubeVirtVM(o metav1.OwnerReference) bool {
return o.Kind == KindVirtualMachine && strings.HasPrefix(o.APIVersion, KubeVirtAPIVersionPrefix)
}

// Fetch only metadata of the owner
func fetchPartialMeta(
ctx context.Context,
c client.Client,
ns string,
o metav1.OwnerReference,
) (*metav1.PartialObjectMetadata, error) {
objMeta := &metav1.PartialObjectMetadata{}
objMeta.SetGroupVersionKind(schema.FromAPIVersionAndKind(o.APIVersion, o.Kind))

if err := c.Get(ctx, client.ObjectKey{Namespace: ns, Name: o.Name}, objMeta); err != nil {
return nil, err
}

return nil, notFoundErr
return objMeta, nil
}
38 changes: 22 additions & 16 deletions internal/controller/volumereplicationgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"maps"
"reflect"
"slices"
"strings"
"sync"
"time"

Expand All @@ -28,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
virtv1 "kubevirt.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -397,7 +399,9 @@ func filterPVC(reader client.Reader, pvc *corev1.PersistentVolumeClaim, log logr
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=list;watch
// +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions,verbs=get;list;watch
// +kubebuilder:rbac:groups=core,resources=pods/exec,verbs=create
// +kubebuilder:rbac:groups="kubevirt.io",resources=virtualmachines,verbs=get;list
// +kubebuilder:rbac:groups="kubevirt.io",resources=virtualmachines,verbs=get;list;watch;delete
// +kubebuilder:rbac:groups="kubevirt.io",resources=virtualmachineinstances,verbs=get;list;watch
// +kubebuilder:rbac:groups="cdi.kubevirt.io",resources=datavolumes,verbs=get;list;watch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand Down Expand Up @@ -1660,6 +1664,8 @@ func (v *VRGInstance) processAsSecondary() ctrl.Result {

func (v *VRGInstance) reconcileAsSecondary() ctrl.Result {
result := ctrl.Result{}

result.Requeue = v.ShouldCleanupForSecondary()
result.Requeue = v.reconcileVolSyncAsSecondary() || result.Requeue
result.Requeue = v.reconcileVolRepsAsSecondary() || result.Requeue

Expand All @@ -1678,12 +1684,6 @@ func (v *VRGInstance) reconcileAsSecondary() ctrl.Result {
v.instance.Status.Conditions = []metav1.Condition{}
}

if !result.Requeue && v.isVMRecipeProtection() {
if err := v.validateVMsForStandaloneProtection(); err != nil {
result.Requeue = true
}
}

return result
}

Expand Down Expand Up @@ -1791,6 +1791,8 @@ func (v *VRGInstance) updateVRGStatus(result ctrl.Result) ctrl.Result {

v.updateStatusState()

result.Requeue = v.instance.Status.State == ramendrv1alpha1.UnknownState

v.instance.Status.ObservedGeneration = v.instance.Generation

if !reflect.DeepEqual(v.savedInstanceStatus, v.instance.Status) {
Expand Down Expand Up @@ -2441,24 +2443,28 @@ func (v *VRGInstance) CheckForVMConflictOnSecondary() error {
}

func (v *VRGInstance) CheckForVMNameConflictOnSecondary(vmNamespaceList, vmList []string) error {
var foundVMs []string

var err error
if foundVMs, err = util.ListVMsByVMNamespace(v.ctx, v.reconciler.APIReader,
v.log, vmNamespaceList, vmList); err != nil {
if !k8serrors.IsNotFound(err) {
return fmt.Errorf("failed to lookup virtualmachine resources, check rbacs")
}
var foundVMs []virtv1.VirtualMachine

if foundVMs = util.ListVMsByVMNamespace(v.ctx, v.reconciler.APIReader,
v.log, vmNamespaceList, vmList); len(foundVMs) == 0 {
return nil
}

v.log.Info(fmt.Sprintf("found conflicting VM[%v] on secondary", foundVMs))
v.log.Info("found conflicting VMs on secondary", "foundVMs", vmNamesString(foundVMs))

return fmt.Errorf("protected VMs on the primary cluster share names with VMs on " +
"the secondary site, which may impact failover or recovery")
}

func vmNamesString(vms []virtv1.VirtualMachine) string {
names := make([]string, len(vms))
for i := range vms {
names[i] = vms[i].Name
}

return strings.Join(names, ", ")
}

func (v *VRGInstance) aggregateVRGNoClusterDataConflictCondition() *metav1.Condition {
var vmResourceConflict, pvcResourceConflict bool

Expand Down
Loading
Loading