Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 supervisor: create ClusterModule per MachineDeployment and re-reconcile VirtualMachineResourceSetPolicy to update VirtualMachineSetResourcePolicy #48

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion controllers/vmware/test/controllers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ var _ = Describe("Reconciliation tests", func() {
Eventually(func() error {
return k8sClient.Get(ctx, rpKey, resourcePolicy)
}, time.Second*30).Should(Succeed())
Expect(len(resourcePolicy.Spec.ClusterModuleGroups)).To(BeEquivalentTo(2))
Expect(len(resourcePolicy.Spec.ClusterModuleGroups)).To(BeEquivalentTo(1))

By("Create the CAPI Machine and wait for it to exist")
machineKey, machine := deployCAPIMachine(ns.Name, cluster, k8sClient)
Expand Down
29 changes: 29 additions & 0 deletions controllers/vmware/vspherecluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type ClusterReconciler struct {
// +kubebuilder:rbac:groups=netoperator.vmware.com,resources=networks,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;update;create;delete
// +kubebuilder:rbac:groups="",resources=persistentvolumeclaims/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machinedeployments,verbs=get;list;watch

func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
log := ctrl.LoggerFrom(ctx)
Expand Down Expand Up @@ -370,6 +371,34 @@ func (r *ClusterReconciler) VSphereMachineToCluster(ctx context.Context, o clien
}}
}

// MachineDeploymentToCluster adds reconcile requests for a Cluster when one of its machineDeployments has an event.
func (r *ClusterReconciler) MachineDeploymentToCluster(ctx context.Context, o client.Object) []reconcile.Request {
log := ctrl.LoggerFrom(ctx)

machineDeployment, ok := o.(*clusterv1.MachineDeployment)
if !ok {
log.Error(nil, fmt.Sprintf("Expected a MachineDeployment but got a %T", o))
return nil
}
log = log.WithValues("MachineDeployment", klog.KObj(machineDeployment))
ctx = ctrl.LoggerInto(ctx, log)

vsphereCluster, err := util.GetVMwareVSphereClusterFromMachineDeployment(ctx, r.Client, machineDeployment)
if err != nil {
log.V(4).Error(err, "Failed to get VSphereCluster from MachineDeployment")
return nil
}

// Can add further filters on Cluster state so that we don't keep reconciling Cluster
log.V(6).Info("Triggering VSphereCluster reconcile from MachineDeployment")
return []ctrl.Request{{
NamespacedName: types.NamespacedName{
Namespace: vsphereCluster.Namespace,
Name: vsphereCluster.Name,
},
}}
}

// ZoneToVSphereClusters adds reconcile requests for VSphereClusters when Zone has an event.
func (r *ClusterReconciler) ZoneToVSphereClusters(ctx context.Context, o client.Object) []reconcile.Request {
log := ctrl.LoggerFrom(ctx)
Expand Down
4 changes: 4 additions & 0 deletions controllers/vspherecluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func AddClusterControllerToManager(ctx context.Context, controllerManagerCtx *ca
&vmwarev1.VSphereMachine{},
handler.EnqueueRequestsFromMapFunc(reconciler.VSphereMachineToCluster),
).
Watches(
&clusterv1.MachineDeployment{},
handler.EnqueueRequestsFromMapFunc(reconciler.MachineDeploymentToCluster),
).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, controllerManagerCtx.WatchFilterValue))

// Conditionally add a Watch for topologyv1.Zone when the feature gate is enabled
Expand Down
69 changes: 57 additions & 12 deletions pkg/services/vmoperator/resource_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ package vmoperator

import (
"context"
"sort"

"github.com/pkg/errors"
vmoprv1 "github.com/vmware-tanzu/vm-operator/api/v1alpha2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

Expand All @@ -37,15 +40,31 @@ type RPService struct {
// ReconcileResourcePolicy ensures that a VirtualMachineSetResourcePolicy exists for the cluster
// Returns the name of a policy if it exists, otherwise returns an error.
func (s *RPService) ReconcileResourcePolicy(ctx context.Context, clusterCtx *vmware.ClusterContext) (string, error) {
resourcePolicy, err := s.getVirtualMachineSetResourcePolicy(ctx, clusterCtx)
clusterModuleGroups, err := getClusterModuleGroups(ctx, s.Client, clusterCtx.Cluster)
if err != nil {
return "", err
}

resourcePolicy, err := getVirtualMachineSetResourcePolicy(ctx, s.Client, clusterCtx.Cluster)
if err != nil {
if !apierrors.IsNotFound(err) {
return "", errors.Errorf("unexpected error in getting the Resource policy: %+v", err)
}
resourcePolicy, err = s.createVirtualMachineSetResourcePolicy(ctx, clusterCtx)
resourcePolicy, err = s.createVirtualMachineSetResourcePolicy(ctx, clusterCtx, clusterModuleGroups)
if err != nil {
return "", errors.Errorf("failed to create Resource Policy: %+v", err)
}
return resourcePolicy.Name, nil
}

// Ensure .spec.clusterModuleGroups is up to date.
helper, err := patch.NewHelper(resourcePolicy, s.Client)
if err != nil {
return "", err
}
resourcePolicy.Spec.ClusterModuleGroups = clusterModuleGroups
if err := helper.Patch(ctx, resourcePolicy); err != nil {
return "", err
}

return resourcePolicy.Name, nil
Expand All @@ -60,29 +79,26 @@ func (s *RPService) newVirtualMachineSetResourcePolicy(clusterCtx *vmware.Cluste
}
}

func (s *RPService) getVirtualMachineSetResourcePolicy(ctx context.Context, clusterCtx *vmware.ClusterContext) (*vmoprv1.VirtualMachineSetResourcePolicy, error) {
func getVirtualMachineSetResourcePolicy(ctx context.Context, ctrlClient client.Client, cluster *clusterv1.Cluster) (*vmoprv1.VirtualMachineSetResourcePolicy, error) {
vmResourcePolicy := &vmoprv1.VirtualMachineSetResourcePolicy{}
vmResourcePolicyName := client.ObjectKey{
Namespace: clusterCtx.Cluster.Namespace,
Name: clusterCtx.Cluster.Name,
Namespace: cluster.Namespace,
Name: cluster.Name,
}
err := s.Client.Get(ctx, vmResourcePolicyName, vmResourcePolicy)
err := ctrlClient.Get(ctx, vmResourcePolicyName, vmResourcePolicy)
return vmResourcePolicy, err
}

func (s *RPService) createVirtualMachineSetResourcePolicy(ctx context.Context, clusterCtx *vmware.ClusterContext) (*vmoprv1.VirtualMachineSetResourcePolicy, error) {
func (s *RPService) createVirtualMachineSetResourcePolicy(ctx context.Context, clusterCtx *vmware.ClusterContext, clusterModuleGroups []string) (*vmoprv1.VirtualMachineSetResourcePolicy, error) {
vmResourcePolicy := s.newVirtualMachineSetResourcePolicy(clusterCtx)

_, err := ctrlutil.CreateOrPatch(ctx, s.Client, vmResourcePolicy, func() error {
vmResourcePolicy.Spec = vmoprv1.VirtualMachineSetResourcePolicySpec{
ResourcePool: vmoprv1.ResourcePoolSpec{
Name: clusterCtx.Cluster.Name,
},
Folder: clusterCtx.Cluster.Name,
ClusterModuleGroups: []string{
ControlPlaneVMClusterModuleGroupName,
getMachineDeploymentNameForCluster(clusterCtx.Cluster),
},
Folder: clusterCtx.Cluster.Name,
ClusterModuleGroups: clusterModuleGroups,
}
// Ensure that the VirtualMachineSetResourcePolicy is owned by the VSphereCluster
if err := ctrlutil.SetOwnerReference(
Expand All @@ -106,3 +122,32 @@ func (s *RPService) createVirtualMachineSetResourcePolicy(ctx context.Context, c
}
return vmResourcePolicy, nil
}

func getClusterModuleGroups(ctx context.Context, ctrlClient client.Client, cluster *clusterv1.Cluster) ([]string, error) {
machineDeploymentNames, err := getMachineDeploymentNamesForCluster(ctx, ctrlClient, cluster)
if err != nil {
return nil, err
}

clusterModuleGroups := append([]string{ControlPlaneVMClusterModuleGroupName}, machineDeploymentNames...)

// sort elements to have deterministic output.
sort.Strings(clusterModuleGroups)

return clusterModuleGroups, nil
}

func checkClusterModuleGroup(ctx context.Context, ctrlClient client.Client, cluster *clusterv1.Cluster, clusterModuleGroupName string) error {
resourcePolicy, err := getVirtualMachineSetResourcePolicy(ctx, ctrlClient, cluster)
if err != nil {
return err
}

for _, cm := range resourcePolicy.Status.ClusterModules {
if cm.GroupName == clusterModuleGroupName {
return nil
}
}

return errors.Errorf("VirtualMachineSetResourcePolicy's .status.clusterModules does not yet contain %s", clusterModuleGroupName)
}
2 changes: 1 addition & 1 deletion pkg/services/vmoperator/resource_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestRPService(t *testing.T) {
g.Expect(err).NotTo(HaveOccurred())
g.Expect(name).To(Equal(clusterName))

resourcePolicy, err := rpService.getVirtualMachineSetResourcePolicy(ctx, clusterCtx)
resourcePolicy, err := getVirtualMachineSetResourcePolicy(ctx, controllerCtx.Client, clusterCtx.Cluster)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(resourcePolicy.Spec.ResourcePool.Name).To(Equal(clusterName))
g.Expect(resourcePolicy.Spec.Folder).To(Equal(clusterName))
Expand Down
42 changes: 35 additions & 7 deletions pkg/services/vmoperator/vmopmachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,9 @@ func (v *VmopMachineService) reconcileVMOperatorVM(ctx context.Context, supervis
// Assign the VM's labels.
vmOperatorVM.Labels = getVMLabels(supervisorMachineCtx, vmOperatorVM.Labels)

addResourcePolicyAnnotations(supervisorMachineCtx, vmOperatorVM)
if err := addResourcePolicyAnnotations(ctx, v.Client, supervisorMachineCtx, vmOperatorVM); err != nil {
return err
}

if err := v.addVolumes(ctx, supervisorMachineCtx, vmOperatorVM); err != nil {
return err
Expand Down Expand Up @@ -580,7 +582,7 @@ func (v *VmopMachineService) getVirtualMachinesInCluster(ctx context.Context, su

// Helper function to add annotations to indicate which tag vm-operator should add as well as which clusterModule VM
// should be associated.
func addResourcePolicyAnnotations(supervisorMachineCtx *vmware.SupervisorMachineContext, vm *vmoprv1.VirtualMachine) {
func addResourcePolicyAnnotations(ctx context.Context, ctrlClient client.Client, supervisorMachineCtx *vmware.SupervisorMachineContext, vm *vmoprv1.VirtualMachine) error {
annotations := vm.ObjectMeta.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
Expand All @@ -591,10 +593,17 @@ func addResourcePolicyAnnotations(supervisorMachineCtx *vmware.SupervisorMachine
annotations[ClusterModuleNameAnnotationKey] = ControlPlaneVMClusterModuleGroupName
} else {
annotations[ProviderTagsAnnotationKey] = WorkerVMVMAntiAffinityTagValue
annotations[ClusterModuleNameAnnotationKey] = getMachineDeploymentNameForCluster(supervisorMachineCtx.Cluster)
clusterModuleName := getMachineDeploymentNameForMachine(supervisorMachineCtx.Machine)

if err := checkClusterModuleGroup(ctx, ctrlClient, supervisorMachineCtx.Cluster, clusterModuleName); err != nil {
return err
}

annotations[ClusterModuleNameAnnotationKey] = clusterModuleName
}

vm.ObjectMeta.SetAnnotations(annotations)
return nil
}

func volumeName(machine *vmwarev1.VSphereMachine, volume vmwarev1.VSphereMachineVolume) string {
Expand Down Expand Up @@ -742,8 +751,27 @@ func getTopologyLabels(supervisorMachineCtx *vmware.SupervisorMachineContext) ma
return nil
}

// getMachineDeploymentName returns the MachineDeployment name for a Cluster.
// This is also the name used by VSphereMachineTemplate and KubeadmConfigTemplate.
func getMachineDeploymentNameForCluster(cluster *clusterv1.Cluster) string {
return fmt.Sprintf("%s-workers-0", cluster.Name)
func getMachineDeploymentNameForMachine(machine *clusterv1.Machine) string {
if mdName, ok := machine.Labels[clusterv1.MachineDeploymentNameLabel]; ok {
return mdName
}
return ""
}

func getMachineDeploymentNamesForCluster(ctx context.Context, ctrlClient client.Client, cluster *clusterv1.Cluster) ([]string, error) {
mdNames := []string{}
labels := map[string]string{clusterv1.ClusterNameLabel: cluster.GetName()}
mdList := &clusterv1.MachineDeploymentList{}
if err := ctrlClient.List(
ctx, mdList,
client.InNamespace(cluster.GetNamespace()),
client.MatchingLabels(labels)); err != nil {
return nil, errors.Wrapf(err, "failed to list MachineDeployment objects")
}
for _, md := range mdList.Items {
if md.DeletionTimestamp.IsZero() {
mdNames = append(mdNames, md.Name)
}
}
return mdNames, nil
}
30 changes: 30 additions & 0 deletions pkg/util/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,36 @@ func GetVSphereClusterFromVMwareMachine(ctx context.Context, c client.Client, ma
return vsphereCluster, err
}

// GetVMwareVSphereClusterFromMachineDeployment gets the vmware.infrastructure.cluster.x-k8s.io.VSphereCluster resource for the given MachineDeployment$.
func GetVMwareVSphereClusterFromMachineDeployment(ctx context.Context, c client.Client, machineDeployment *clusterv1.MachineDeployment) (*vmwarev1.VSphereCluster, error) {
clusterName := machineDeployment.Labels[clusterv1.ClusterNameLabel]
if clusterName == "" {
return nil, errors.Errorf("error getting VSphereCluster name from MachineDeployment %s/%s",
machineDeployment.Namespace, machineDeployment.Name)
}
namespacedName := apitypes.NamespacedName{
Namespace: machineDeployment.Namespace,
Name: clusterName,
}
cluster := &clusterv1.Cluster{}
if err := c.Get(ctx, namespacedName, cluster); err != nil {
return nil, err
}

if cluster.Spec.InfrastructureRef == nil {
return nil, errors.Errorf("error getting VSphereCluster name from MachineDeployment %s/%s: Cluster.spec.infrastructureRef not yet set",
machineDeployment.Namespace, machineDeployment.Name)
}

vsphereClusterKey := apitypes.NamespacedName{
Namespace: machineDeployment.Namespace,
Name: cluster.Spec.InfrastructureRef.Name,
}
vsphereCluster := &vmwarev1.VSphereCluster{}
err := c.Get(ctx, vsphereClusterKey, vsphereCluster)
return vsphereCluster, err
}

// GetVSphereClusterFromVSphereMachine gets the infrastructure.cluster.x-k8s.io.VSphereCluster resource for the given VSphereMachine.
func GetVSphereClusterFromVSphereMachine(ctx context.Context, c client.Client, machine *infrav1.VSphereMachine) (*infrav1.VSphereCluster, error) {
clusterName := machine.Labels[clusterv1.ClusterNameLabel]
Expand Down