Skip to content

Commit

Permalink
changed revision logic based on updated design
Browse files Browse the repository at this point in the history
  • Loading branch information
Edwinhr716 committed Dec 23, 2024
1 parent 1f5adfa commit 1b978cf
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 122 deletions.
107 changes: 82 additions & 25 deletions pkg/controllers/leaderworkerset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,6 @@ func (r *LeaderWorkerSetReconciler) Reconcile(ctx context.Context, req ctrl.Requ
log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
ctx = ctrl.LoggerInto(ctx, log)

currentRevision, updateRevision, collisionCount, err := controllerutils.GetLeaderWorkerSetRevisions(ctx, r.Client, lws)
if err != nil {
log.Error(err, "Getting lws revisions")
return ctrl.Result{}, err
}
lws.Status.CurrentRevision = currentRevision.Name
lws.Status.UpdateRevision = updateRevision.Name
lws.Status.CollisionCount = new(int32)
lws.Status.CollisionCount = &collisionCount

partition, replicas, err := r.rollingUpdateParameters(ctx, lws)
if err != nil {
log.Error(err, "Rolling partition error")
Expand Down Expand Up @@ -204,19 +194,31 @@ func SetupIndexes(indexer client.FieldIndexer) error {
// - One exception here is when unready replicas of leaderWorkerSet is equal to MaxSurge,
// we should reclaim the extra replicas gradually to accommodate for the new replicas.
func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) (int32, int32, error) {
log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
ctx = ctrl.LoggerInto(ctx, log)
lwsReplicas := *lws.Spec.Replicas

sts := &appsv1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts)
stsExists, sts, err := stsCreated(ctx, r.Client, lws)
if err != nil {
return 0, 0, err
}

if !stsExists {
return 0, lwsReplicas, nil
}

existingControllerRevisions, err := controllerutils.ExistingControllerRevisions(ctx, r.Client, lws)
if err != nil {
// Case 1:
// If sts not created yet, all partitions should be updated,
// replicas should not change.
if apierrors.IsNotFound(err) {
return 0, lwsReplicas, nil
}
return 0, 0, err
}
if !existingControllerRevisions {
// Updating from version that did not support Controller Revision. Need to create one first before checking if template has been updated
log.V(2).Info(fmt.Sprintf("Creating new controller revision create/update operation for %+v ", lws))
if err := controllerutils.CreateLeaderWorkerSetRevision(ctx, r.Client, lws, sts.Labels[leaderworkerset.TemplateRevisionHashKey]); err != nil {
return 0, 0, nil
}
return 0, lwsReplicas, nil
}

stsReplicas := *sts.Spec.Replicas
maxSurge, err := intstr.GetValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxSurge, int(lwsReplicas), true)
Expand All @@ -242,7 +244,11 @@ func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context,

// Case 2:
// Indicates a new rolling update here.
if templateUpdated(sts, lws) {
hasTemplateUdated, err := templateUpdated(ctx, r.Client, sts, lws)
if err != nil {
return 0, 0, err
}
if hasTemplateUdated {
// Processing scaling up/down first prior to rolling update.
return min(lwsReplicas, stsReplicas), wantReplicas(lwsReplicas), nil
}
Expand Down Expand Up @@ -290,8 +296,32 @@ func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context,
func (r *LeaderWorkerSetReconciler) SSAWithStatefulset(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32) error {
log := ctrl.LoggerFrom(ctx)

// templateHash is not a reliable way to determine whether or not an lws object has been updated as seen in
// https://github.com/kubernetes-sigs/lws/issues/281
// If a leader sts already exists, but the template has not been updated, the templateHash of the leader is
// used to keep consistency in cases where two different templateHashes are calculated from the same LWS object
stsExists, sts, err := stsCreated(ctx, r.Client, lws)
if err != nil {
return err
}
templateHash := utils.LeaderWorkerTemplateHash(lws)
if stsExists {
templateUpdated, err := templateUpdated(ctx, r.Client, sts, lws)
if err != nil {
return err
}
if !templateUpdated {
templateHash = sts.Labels[leaderworkerset.TemplateRevisionHashKey]
}
}

if err = controllerutils.CreateLeaderWorkerSetRevision(ctx, r.Client, lws, templateHash); err != nil {
log.Error(err, "Creating LWS Revision")
return err
}

// construct the statefulset apply configuration
leaderStatefulSetApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(lws, partition, replicas)
leaderStatefulSetApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(lws, partition, replicas, templateHash)
if err != nil {
log.Error(err, "Constructing StatefulSet apply configuration.")
return err
Expand Down Expand Up @@ -400,7 +430,6 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l
conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetUpgradeInProgress))
} else if updatedAndReadyCount == int(*lws.Spec.Replicas) {
conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetAvailable))
lws.Status.CurrentRevision = lws.Status.UpdateRevision
} else {
conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetProgressing))
}
Expand Down Expand Up @@ -538,7 +567,7 @@ func (r *LeaderWorkerSetReconciler) iterateReplicas(ctx context.Context, lws *le
}

// constructLeaderStatefulSetApplyConfiguration constructs the applied configuration for the leader StatefulSet
func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32) (*appsapplyv1.StatefulSetApplyConfiguration, error) {
func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32, templateHash string) (*appsapplyv1.StatefulSetApplyConfiguration, error) {
var podTemplateSpec corev1.PodTemplateSpec
if lws.Spec.LeaderWorkerTemplate.LeaderTemplate != nil {
podTemplateSpec = *lws.Spec.LeaderWorkerTemplate.LeaderTemplate.DeepCopy()
Expand All @@ -556,7 +585,6 @@ func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWor
return nil, err
}

templateHash := utils.LeaderWorkerTemplateHash(lws)
podTemplateApplyConfiguration.WithLabels(map[string]string{
leaderworkerset.WorkerIndexLabelKey: "0",
leaderworkerset.SetNameLabelKey: lws.Name,
Expand Down Expand Up @@ -689,6 +717,35 @@ func exclusiveConditionTypes(condition1 metav1.Condition, condition2 metav1.Cond
return false
}

func templateUpdated(sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet) bool {
return sts.Labels[leaderworkerset.TemplateRevisionHashKey] != utils.LeaderWorkerTemplateHash(lws)
func templateUpdated(ctx context.Context, k8sClient client.Client, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet) (bool, error) {
log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
ctx = ctrl.LoggerInto(ctx, log)
controllerRevision, err := controllerutils.GetLeaderWorkerSetRevisionFromTemplateHash(ctx, k8sClient, lws, sts.Labels[leaderworkerset.TemplateRevisionHashKey])
if err != nil {
return false, err
}

baselineLws, err := controllerutils.ApplyRevision(lws, controllerRevision)
if err != nil {
return false, err
}
log.V(2).Info(fmt.Sprintf("comparing networkConfig %s, with %s", string(*lws.Spec.NetworkConfig.SubdomainPolicy), string(*baselineLws.Spec.NetworkConfig.SubdomainPolicy)))
log.V(2).Info(fmt.Sprintf("Fetching controller revision with hash %s", sts.Labels[leaderworkerset.TemplateRevisionHashKey]))
return !utils.EqualLeaderWorkerTemplates(baselineLws, lws), nil
}

func stsCreated(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet) (bool, *appsv1.StatefulSet, error) {
sts := &appsv1.StatefulSet{}
err := k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts)
if err != nil {
// Case 1:
// If sts not created yet, all partitions should be updated,
// replicas should not change.
if apierrors.IsNotFound(err) {
return false, nil, nil
}
return false, nil, err
}

return true, sts, nil
}
2 changes: 1 addition & 1 deletion pkg/controllers/leaderworkerset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
stsApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(tc.lws, 0, *tc.lws.Spec.Replicas)
stsApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(tc.lws, 0, *tc.lws.Spec.Replicas, "")
if err != nil {
t.Errorf("failed with error: %s", err.Error())
}
Expand Down
15 changes: 5 additions & 10 deletions pkg/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"

leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"
"sigs.k8s.io/lws/pkg/utils"
acceleratorutils "sigs.k8s.io/lws/pkg/utils/accelerators"
controllerutils "sigs.k8s.io/lws/pkg/utils/controller"
podutils "sigs.k8s.io/lws/pkg/utils/pod"
Expand Down Expand Up @@ -119,7 +118,7 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
log.V(2).Info("defer the creation of the worker statefulset because leader pod is not ready.")
return ctrl.Result{}, nil
}
currentRevision, _, _, err := controllerutils.GetLeaderWorkerSetRevisions(ctx, r.Client, &leaderWorkerSet)
currentRevision, err := controllerutils.GetLeaderWorkerSetRevisionFromTemplateHash(ctx, r.Client, &leaderWorkerSet, pod.Labels[leaderworkerset.TemplateRevisionHashKey])
if err != nil {
log.Error(err, "Getting lws revisions")
return ctrl.Result{}, err
Expand Down Expand Up @@ -265,15 +264,11 @@ func setControllerReferenceWithStatefulSet(owner metav1.Object, sts *appsapplyv1

// constructWorkerStatefulSetApplyConfiguration constructs the applied configuration for the leader StatefulSet
func constructWorkerStatefulSetApplyConfiguration(leaderPod corev1.Pod, lws leaderworkerset.LeaderWorkerSet, currentRevision *appsv1.ControllerRevision) (*appsapplyv1.StatefulSetApplyConfiguration, error) {
updatedTemplateHash := utils.LeaderWorkerTemplateHash(&lws)
podTemplateSpec := *lws.Spec.LeaderWorkerTemplate.WorkerTemplate.DeepCopy()
if updatedTemplateHash != leaderPod.Labels[leaderworkerset.TemplateRevisionHashKey] {
originalLws, err := controllerutils.ApplyRevision(&lws, currentRevision)
if err != nil {
return nil, err
}
podTemplateSpec = *originalLws.Spec.LeaderWorkerTemplate.WorkerTemplate.DeepCopy()
currentLws, err := controllerutils.ApplyRevision(&lws, currentRevision)
if err != nil {
return nil, err
}
podTemplateSpec := *currentLws.Spec.LeaderWorkerTemplate.WorkerTemplate.DeepCopy()
// construct pod template spec configuration
obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&podTemplateSpec)
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/history/controller_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/davecgh/go-spew/spew"
appsv1 "k8s.io/api/apps/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"

apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -110,6 +111,11 @@ func SortControllerRevisions(revisions []*appsv1.ControllerRevision) {
// contain semantically equivalent data. Otherwise this method returns false.
func EqualRevision(lhs *appsv1.ControllerRevision, rhs *appsv1.ControllerRevision) bool {
var lhsHash, rhsHash *uint32

if lhs.Labels[leaderworkerset.TemplateRevisionHashKey] == rhs.Labels[leaderworkerset.TemplateRevisionHashKey] {
return true
}

if lhs == nil || rhs == nil {
return lhs == rhs
}
Expand Down Expand Up @@ -202,7 +208,7 @@ type Interface interface {

// NewHistory returns an instance of Interface that uses client to communicate with the API Server and lister to list
// ControllerRevisions. This method should be used to create an Interface for all scenarios other than testing.
func NewHistory(k8sclient client.Client, context context.Context) Interface {
func NewHistory(context context.Context, k8sclient client.Client) Interface {
return &realHistory{k8sclient, context}
}

Expand Down
Loading

0 comments on commit 1b978cf

Please sign in to comment.