Skip to content

Commit

Permalink
addressed comments, refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
Edwinhr716 committed Dec 23, 2024
1 parent 522f1b0 commit 9436aeb
Show file tree
Hide file tree
Showing 14 changed files with 422 additions and 631 deletions.
180 changes: 96 additions & 84 deletions pkg/controllers/leaderworkerset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"sigs.k8s.io/lws/pkg/utils"
controllerutils "sigs.k8s.io/lws/pkg/utils/controller"
podutils "sigs.k8s.io/lws/pkg/utils/pod"
revisionutils "sigs.k8s.io/lws/pkg/utils/revision"
statefulsetutils "sigs.k8s.io/lws/pkg/utils/statefulset"
)

Expand Down Expand Up @@ -99,13 +100,31 @@ func (r *LeaderWorkerSetReconciler) Reconcile(ctx context.Context, req ctrl.Requ
log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
ctx = ctrl.LoggerInto(ctx, log)

partition, replicas, err := r.rollingUpdateParameters(ctx, lws)
leaderSts, err := r.getLeaderStatefulSet(ctx, lws)
if err != nil {
log.Error(err, "Fetching leader statefulset")
return ctrl.Result{}, err
}

if err := r.createControllerRevisionIfNonExist(ctx, leaderSts, lws); err != nil {
log.Error(err, "Creating controller revision")
return ctrl.Result{}, err
}

lwsUpdated, err := r.leaderWorkerSetUpdated(ctx, leaderSts, lws)
if err != nil {
log.Error(err, "Validating if LWS has been updated")
return ctrl.Result{}, err
}

templateHash := getLeaderWorkerTemplateHash(leaderSts, lws, lwsUpdated)
partition, replicas, err := r.rollingUpdateParameters(ctx, lws, leaderSts, lwsUpdated)
if err != nil {
log.Error(err, "Rolling partition error")
return ctrl.Result{}, err
}

if err := r.SSAWithStatefulset(ctx, lws, partition, replicas); err != nil {
if err := r.SSAWithStatefulset(ctx, lws, partition, replicas, templateHash); err != nil {
return ctrl.Result{}, err
}

Expand All @@ -117,7 +136,7 @@ func (r *LeaderWorkerSetReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, err
}

err = r.updateStatus(ctx, lws)
err = r.updateStatus(ctx, lws, templateHash)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -193,34 +212,15 @@ func SetupIndexes(indexer client.FieldIndexer) error {
// - Otherwise, Replicas is equal to spec.Replicas
// - One exception here is when unready replicas of leaderWorkerSet is equal to MaxSurge,
// we should reclaim the extra replicas gradually to accommodate for the new replicas.
func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) (int32, int32, error) {
func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, sts *appsv1.StatefulSet, leaderWorkerSetUpdated bool) (int32, int32, error) {
log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
ctx = ctrl.LoggerInto(ctx, log)
lwsReplicas := *lws.Spec.Replicas

// Case 1:
// If sts not created yet, all partitions should be updated,
// replicas should not change.
stsExists, sts, err := stsCreated(ctx, r.Client, lws)
if err != nil {
return 0, 0, err
}

if !stsExists {
return 0, lwsReplicas, nil
}

existingControllerRevisions, err := controllerutils.ExistingControllerRevisions(ctx, r.Client, lws)
if err != nil {
return 0, 0, err
}

if !existingControllerRevisions {
// Updating from version that did not support Controller Revision. Need to create one first before checking if template has been updated
log.V(2).Info(fmt.Sprintf("Creating new controller revision create/update operation for %+v ", lws))
if err := controllerutils.CreateLeaderWorkerSetRevision(ctx, r.Client, lws, sts.Labels[leaderworkerset.TemplateRevisionHashKey]); err != nil {
return 0, 0, nil
}
if sts == nil {
return 0, lwsReplicas, nil
}

Expand Down Expand Up @@ -248,11 +248,7 @@ func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context,

// Case 2:
// Indicates a new rolling update here.
hasTemplateUdated, err := templateUpdated(ctx, r.Client, sts, lws)
if err != nil {
return 0, 0, err
}
if hasTemplateUdated {
if leaderWorkerSetUpdated {
// Processing scaling up/down first prior to rolling update.
return min(lwsReplicas, stsReplicas), wantReplicas(lwsReplicas), nil
}
Expand Down Expand Up @@ -297,29 +293,10 @@ func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context,
return min(partition, utils.NonZeroValue(stsReplicas-int32(rollingStep)-continuousReadyReplicas)), wantReplicas(lwsUnreadyReplicas), nil
}

func (r *LeaderWorkerSetReconciler) SSAWithStatefulset(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32) error {
func (r *LeaderWorkerSetReconciler) SSAWithStatefulset(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32, templateHash string) error {
log := ctrl.LoggerFrom(ctx)

// templateHash is not a reliable way to determine whether or not an lws object has been updated as seen in
// https://github.com/kubernetes-sigs/lws/issues/281
// If a leader sts already exists, but the template has not been updated, the templateHash of the leader is
// used to keep consistency in cases where two different templateHashes are calculated from the same LWS object
stsExists, sts, err := stsCreated(ctx, r.Client, lws)
if err != nil {
return err
}
templateHash := utils.LeaderWorkerTemplateHash(lws)
if stsExists {
templateUpdated, err := templateUpdated(ctx, r.Client, sts, lws)
if err != nil {
return err
}
if !templateUpdated {
templateHash = sts.Labels[leaderworkerset.TemplateRevisionHashKey]
}
}

if err = controllerutils.CreateLeaderWorkerSetRevision(ctx, r.Client, lws, templateHash); err != nil {
if err := revisionutils.CreateLeaderWorkerSetRevision(ctx, r.Client, lws, templateHash); err != nil {
log.Error(err, "Creating LWS Revision")
return err
}
Expand Down Expand Up @@ -359,7 +336,7 @@ func (r *LeaderWorkerSetReconciler) SSAWithStatefulset(ctx context.Context, lws
}

// updates the condition of the leaderworkerset to either Progressing or Available.
func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) (bool, error) {
func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, templateHash string) (bool, error) {
log := ctrl.LoggerFrom(ctx)
podSelector := client.MatchingLabels(map[string]string{
leaderworkerset.SetNameLabelKey: lws.Name,
Expand All @@ -373,7 +350,6 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l

updateStatus := false
readyCount, updatedCount, updatedNonBurstWorkerCount, currentNonBurstWorkerCount, updatedAndReadyCount := 0, 0, 0, 0, 0
templateHash := utils.LeaderWorkerTemplateHash(lws)
noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1

// Iterate through all leaderPods.
Expand Down Expand Up @@ -434,7 +410,7 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l
conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetUpgradeInProgress))
} else if updatedAndReadyCount == int(*lws.Spec.Replicas) {
conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetAvailable))
controllerutils.TruncateHistory(ctx, r.Client, lws, templateHash)
revisionutils.TruncateHistory(ctx, r.Client, lws, templateHash)
} else {
conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetProgressing))
}
Expand All @@ -448,7 +424,7 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l
}

// Updates status and condition of LeaderWorkerSet and returns whether or not an update actually occurred.
func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) error {
func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, templateHash string) error {
updateStatus := false
log := ctrl.LoggerFrom(ctx)

Expand Down Expand Up @@ -484,7 +460,7 @@ func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leade
}

// check if an update is needed
updateConditions, err := r.updateConditions(ctx, lws)
updateConditions, err := r.updateConditions(ctx, lws, templateHash)
if err != nil {
return err
}
Expand Down Expand Up @@ -529,7 +505,7 @@ func (r *LeaderWorkerSetReconciler) iterateReplicas(ctx context.Context, lws *le
return strconv.Atoi(sts.Labels[leaderworkerset.GroupIndexLabelKey])
}, stsList.Items, int(stsReplicas))

templateHash := utils.LeaderWorkerTemplateHash(lws)
templateHash := revisionutils.LeaderWorkerTemplateHash(lws)
// Once size==1, no worker statefulSets will be created.
noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1
processReplica := func(index int32) (ready bool) {
Expand Down Expand Up @@ -571,6 +547,70 @@ func (r *LeaderWorkerSetReconciler) iterateReplicas(ctx context.Context, lws *le
return continuousReadyReplicas, lwsUnreadyReplicas, nil
}

func (r *LeaderWorkerSetReconciler) getLeaderStatefulSet(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) (*appsv1.StatefulSet, error) {
sts := &appsv1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, nil
}
return nil, err
}

return sts, nil
}

// Creates a Controller Revision if the leader statefulset exists but no revisions have been created yet. This happens when updating from a version that doesn't
// support controller revision
func (r *LeaderWorkerSetReconciler) createControllerRevisionIfNonExist(ctx context.Context, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet) error {

if sts == nil {
return nil
}

existingControllerRevisions, err := revisionutils.ExistingControllerRevisions(ctx, r.Client, lws)
if err != nil {
return err
}

if !existingControllerRevisions {
return revisionutils.CreateLeaderWorkerSetRevision(ctx, r.Client, lws, sts.Labels[leaderworkerset.TemplateRevisionHashKey])
}

return nil
}

func (r *LeaderWorkerSetReconciler) leaderWorkerSetUpdated(ctx context.Context, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet) (bool, error) {

if sts == nil {
return false, nil
}

controllerRevision, err := revisionutils.GetLeaderWorkerSetRevisionFromTemplateHash(ctx, r.Client, lws, sts.Labels[leaderworkerset.TemplateRevisionHashKey])
if err != nil {
return false, err
}
baselineLws, err := revisionutils.ApplyRevision(lws, controllerRevision)
if err != nil {
return false, err
}
return !revisionutils.EqualLeaderWorkerTemplates(baselineLws, lws), nil
}

// templateHash is not a reliable way to determine whether or not an lws object has been updated as seen in https://github.com/kubernetes-sigs/lws/issues/281
// If a leader sts already exists, but the template has not been updated, the templateHash of the leader is used to keep consistency in cases where two
// different templateHashes are calculated from the same LWS object
func getLeaderWorkerTemplateHash(sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet, leaderWorkerSetUpdated bool) string {

if sts != nil {
if !leaderWorkerSetUpdated {
return sts.Labels[leaderworkerset.TemplateRevisionHashKey]
}
}

return revisionutils.LeaderWorkerTemplateHash(lws)
}

// constructLeaderStatefulSetApplyConfiguration constructs the applied configuration for the leader StatefulSet
func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32, templateHash string) (*appsapplyv1.StatefulSetApplyConfiguration, error) {
var podTemplateSpec corev1.PodTemplateSpec
Expand Down Expand Up @@ -721,31 +761,3 @@ func exclusiveConditionTypes(condition1 metav1.Condition, condition2 metav1.Cond

return false
}

func templateUpdated(ctx context.Context, k8sClient client.Client, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet) (bool, error) {
log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
ctx = ctrl.LoggerInto(ctx, log)
controllerRevision, err := controllerutils.GetLeaderWorkerSetRevisionFromTemplateHash(ctx, k8sClient, lws, sts.Labels[leaderworkerset.TemplateRevisionHashKey])
if err != nil {
return false, err
}

baselineLws, err := controllerutils.ApplyRevision(lws, controllerRevision)
if err != nil {
return false, err
}
return !utils.EqualLeaderWorkerTemplates(baselineLws, lws), nil
}

func stsCreated(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet) (bool, *appsv1.StatefulSet, error) {
sts := &appsv1.StatefulSet{}
err := k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts)
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil, nil
}
return false, nil, err
}

return true, sts, nil
}
7 changes: 4 additions & 3 deletions pkg/controllers/leaderworkerset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ import (
"k8s.io/utils/ptr"

leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"
"sigs.k8s.io/lws/pkg/utils"

revisionutils "sigs.k8s.io/lws/pkg/utils/revision"
testutils "sigs.k8s.io/lws/test/testutils"
)

func TestLeaderStatefulSetApplyConfig(t *testing.T) {
hash1 := utils.LeaderWorkerTemplateHash(testutils.BuildBasicLeaderWorkerSet("test-sample", "default").
hash1 := revisionutils.LeaderWorkerTemplateHash(testutils.BuildBasicLeaderWorkerSet("test-sample", "default").
LeaderTemplateSpec(testutils.MakeLeaderPodSpec()).
WorkerTemplateSpec(testutils.MakeWorkerPodSpec()).Obj())
hash2 := utils.LeaderWorkerTemplateHash(testutils.BuildBasicLeaderWorkerSet("test-sample", "default").
hash2 := revisionutils.LeaderWorkerTemplateHash(testutils.BuildBasicLeaderWorkerSet("test-sample", "default").
WorkerTemplateSpec(testutils.MakeWorkerPodSpec()).Obj())

tests := []struct {
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
acceleratorutils "sigs.k8s.io/lws/pkg/utils/accelerators"
controllerutils "sigs.k8s.io/lws/pkg/utils/controller"
podutils "sigs.k8s.io/lws/pkg/utils/pod"
revisionutils "sigs.k8s.io/lws/pkg/utils/revision"
statefulsetutils "sigs.k8s.io/lws/pkg/utils/statefulset"
)

Expand Down Expand Up @@ -118,7 +119,7 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
log.V(2).Info("defer the creation of the worker statefulset because leader pod is not ready.")
return ctrl.Result{}, nil
}
currentRevision, err := controllerutils.GetLeaderWorkerSetRevisionFromTemplateHash(ctx, r.Client, &leaderWorkerSet, pod.Labels[leaderworkerset.TemplateRevisionHashKey])
currentRevision, err := revisionutils.GetLeaderWorkerSetRevisionFromTemplateHash(ctx, r.Client, &leaderWorkerSet, pod.Labels[leaderworkerset.TemplateRevisionHashKey])
if err != nil {
log.Error(err, "Getting lws revisions")
return ctrl.Result{}, err
Expand Down Expand Up @@ -264,7 +265,7 @@ func setControllerReferenceWithStatefulSet(owner metav1.Object, sts *appsapplyv1

// constructWorkerStatefulSetApplyConfiguration constructs the applied configuration for the leader StatefulSet
func constructWorkerStatefulSetApplyConfiguration(leaderPod corev1.Pod, lws leaderworkerset.LeaderWorkerSet, currentRevision *appsv1.ControllerRevision) (*appsapplyv1.StatefulSetApplyConfiguration, error) {
currentLws, err := controllerutils.ApplyRevision(&lws, currentRevision)
currentLws, err := revisionutils.ApplyRevision(&lws, currentRevision)
if err != nil {
return nil, err
}
Expand Down
19 changes: 14 additions & 5 deletions pkg/controllers/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,39 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
appsapplyv1 "k8s.io/client-go/applyconfigurations/apps/v1"
coreapplyv1 "k8s.io/client-go/applyconfigurations/core/v1"
metaapplyv1 "k8s.io/client-go/applyconfigurations/meta/v1"
"k8s.io/utils/ptr"
leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"
"sigs.k8s.io/lws/pkg/history"
"sigs.k8s.io/lws/pkg/utils"
revisionutils "sigs.k8s.io/lws/pkg/utils/revision"
testutils "sigs.k8s.io/lws/test/testutils"
)

func TestConstructWorkerStatefulSetApplyConfiguration(t *testing.T) {
parentKind := appsv1.SchemeGroupVersion.WithKind("LeaderWorkerSet")
lws := testutils.BuildBasicLeaderWorkerSet("test-sample", "default").Replica(1).WorkerTemplateSpec(testutils.MakeWorkerPodSpec()).Size(1).Obj()
updateTemplateHash := utils.LeaderWorkerTemplateHash(lws)
updateRevision, err := history.NewControllerRevision(lws, parentKind, lws.Labels, testutils.RawLWSTemplate(lws), 1)
updateTemplateHash := revisionutils.LeaderWorkerTemplateHash(lws)
patch, err := revisionutils.GetPatch(lws)
if err != nil {
t.Fatal(err)
}
updateRevision, err := history.NewControllerRevision(lws, parentKind, lws.Labels, runtime.RawExtension{Raw: patch}, 1)
if err != nil {
t.Fatal(err)
}
lws.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Containers[0].Name = "worker"
currentRevision, err := history.NewControllerRevision(lws, parentKind, lws.Labels, testutils.RawLWSTemplate(lws), 2)
patch, err = revisionutils.GetPatch(lws)
if err != nil {
t.Fatal(err)
}
currentRevision, err := history.NewControllerRevision(lws, parentKind, lws.Labels, runtime.RawExtension{Raw: patch}, 2)
if err != nil {
t.Fatal(err)
}
currentTemplateHash := utils.LeaderWorkerTemplateHash(lws)
currentTemplateHash := revisionutils.LeaderWorkerTemplateHash(lws)

tests := []struct {
name string
Expand Down
Loading

0 comments on commit 9436aeb

Please sign in to comment.