Skip to content

Commit

Permalink
Merge pull request #277 from Edwinhr716/controllerRevision
Browse files Browse the repository at this point in the history
Add Controller Revision (Implementation of KEP #238)
  • Loading branch information
k8s-ci-robot authored Dec 28, 2024
2 parents c468ab7 + 1faace7 commit c03d4c9
Show file tree
Hide file tree
Showing 13 changed files with 732 additions and 163 deletions.
6 changes: 2 additions & 4 deletions api/leaderworkerset/v1/leaderworkerset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@ const (
// Worker pods will have an annotation that is the leader pod's name.
LeaderPodNameAnnotationKey string = "leaderworkerset.sigs.k8s.io/leader-name"

// SHAed leaderWorkerTemplate value for version tracking.
// This will be applied to all API objects including:
// leaderStatefulset, leaderPods, workerStatefulsets, workerPods.
TemplateRevisionHashKey string = "leaderworkerset.sigs.k8s.io/template-revision-hash"
// Hash to track the controller revision that matches an LWS object
RevisionKey string = "leaderworkerset.sigs.k8s.io/template-revision-hash"

// Environment variable added to all containers in the LeaderWorkerSet to
// address the leader via the headless service.
Expand Down
3 changes: 3 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ rules:
- apiGroups:
- apps
resources:
- controllerrevisions
- statefulsets
verbs:
- create
Expand All @@ -78,12 +79,14 @@ rules:
- apiGroups:
- apps
resources:
- controllerrevisions/finalizers
- statefulsets/finalizers
verbs:
- update
- apiGroups:
- apps
resources:
- controllerrevisions/status
- statefulsets/status
verbs:
- get
Expand Down
174 changes: 125 additions & 49 deletions pkg/controllers/leaderworkerset_controller.go

Large diffs are not rendered by default.

63 changes: 42 additions & 21 deletions pkg/controllers/leaderworkerset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controllers

import (
"context"
"testing"

"github.com/google/go-cmp/cmp"
Expand All @@ -30,24 +31,40 @@ import (
"k8s.io/utils/ptr"

leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"
"sigs.k8s.io/lws/pkg/utils"

"sigs.k8s.io/controller-runtime/pkg/client/fake"
revisionutils "sigs.k8s.io/lws/pkg/utils/revision"
testutils "sigs.k8s.io/lws/test/testutils"
)

func TestLeaderStatefulSetApplyConfig(t *testing.T) {
hash1 := utils.LeaderWorkerTemplateHash(testutils.BuildBasicLeaderWorkerSet("test-sample", "default").
client := fake.NewClientBuilder().Build()
lws1 := testutils.BuildBasicLeaderWorkerSet("test-sample", "default").
LeaderTemplateSpec(testutils.MakeLeaderPodSpec()).
WorkerTemplateSpec(testutils.MakeWorkerPodSpec()).Obj())
hash2 := utils.LeaderWorkerTemplateHash(testutils.BuildBasicLeaderWorkerSet("test-sample", "default").
WorkerTemplateSpec(testutils.MakeWorkerPodSpec()).Obj())
WorkerTemplateSpec(testutils.MakeWorkerPodSpec()).Obj()
cr1, err := revisionutils.NewRevision(context.TODO(), client, lws1, "")
if err != nil {
t.Fatal(err)
}
revisionKey1 := revisionutils.GetRevisionKey(cr1)

lws2 := testutils.BuildBasicLeaderWorkerSet("test-sample", "default").
WorkerTemplateSpec(testutils.MakeWorkerPodSpec()).Obj()
cr2, err := revisionutils.NewRevision(context.TODO(), client, lws2, "")
if err != nil {
t.Fatal(err)
}
revisionKey2 := revisionutils.GetRevisionKey(cr2)

tests := []struct {
name string
revisionKey string
lws *leaderworkerset.LeaderWorkerSet
wantApplyConfig *appsapplyv1.StatefulSetApplyConfiguration
}{
{
name: "1 replica, size 1, with empty leader template, exclusive placement disabled",
name: "1 replica, size 1, with empty leader template, exclusive placement disabled",
revisionKey: revisionKey2,
lws: testutils.BuildBasicLeaderWorkerSet("test-sample", "default").
Replica(1).
RolloutStrategy(leaderworkerset.RolloutStrategy{
Expand All @@ -69,7 +86,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
Namespace: ptr.To[string]("default"),
Labels: map[string]string{
"leaderworkerset.sigs.k8s.io/name": "test-sample",
"leaderworkerset.sigs.k8s.io/template-revision-hash": hash2,
"leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2,
},
Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/replicas": "1"},
},
Expand All @@ -86,7 +103,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
Labels: map[string]string{
"leaderworkerset.sigs.k8s.io/name": "test-sample",
"leaderworkerset.sigs.k8s.io/worker-index": "0",
"leaderworkerset.sigs.k8s.io/template-revision-hash": hash2,
"leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2,
},
Annotations: map[string]string{
"leaderworkerset.sigs.k8s.io/size": "1",
Expand All @@ -112,7 +129,8 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
},
},
{
name: "1 replica, size 2 , with empty leader template, exclusive placement enabled",
name: "1 replica, size 2 , with empty leader template, exclusive placement enabled",
revisionKey: revisionKey2,
lws: testutils.BuildBasicLeaderWorkerSet("test-sample", "default").
Annotation(map[string]string{
"leaderworkerset.sigs.k8s.io/exclusive-topology": "topologyKey",
Expand All @@ -136,7 +154,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
Namespace: ptr.To[string]("default"),
Labels: map[string]string{
"leaderworkerset.sigs.k8s.io/name": "test-sample",
"leaderworkerset.sigs.k8s.io/template-revision-hash": hash2,
"leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2,
},
Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/replicas": "1"},
},
Expand All @@ -153,7 +171,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
Labels: map[string]string{
"leaderworkerset.sigs.k8s.io/name": "test-sample",
"leaderworkerset.sigs.k8s.io/worker-index": "0",
"leaderworkerset.sigs.k8s.io/template-revision-hash": hash2,
"leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2,
},
Annotations: map[string]string{
"leaderworkerset.sigs.k8s.io/size": "2",
Expand All @@ -180,7 +198,8 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
},
},
{
name: "2 replica, size 2, with leader template, exclusive placement enabled",
name: "2 replica, size 2, with leader template, exclusive placement enabled",
revisionKey: revisionKey1,
lws: testutils.BuildBasicLeaderWorkerSet("test-sample", "default").Annotation(map[string]string{
"leaderworkerset.sigs.k8s.io/exclusive-topology": "topologyKey",
}).Replica(2).
Expand All @@ -204,7 +223,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
Namespace: ptr.To[string]("default"),
Labels: map[string]string{
"leaderworkerset.sigs.k8s.io/name": "test-sample",
"leaderworkerset.sigs.k8s.io/template-revision-hash": hash1,
"leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey1,
},
Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/replicas": "2"},
},
Expand All @@ -221,7 +240,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
Labels: map[string]string{
"leaderworkerset.sigs.k8s.io/name": "test-sample",
"leaderworkerset.sigs.k8s.io/worker-index": "0",
"leaderworkerset.sigs.k8s.io/template-revision-hash": hash1,
"leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey1,
},
Annotations: map[string]string{
"leaderworkerset.sigs.k8s.io/size": "2",
Expand All @@ -247,7 +266,8 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
},
},
{
name: "2 maxUnavailable, 1 maxSurge, with empty leader template, exclusive placement disabled",
name: "2 maxUnavailable, 1 maxSurge, with empty leader template, exclusive placement disabled",
revisionKey: revisionKey2,
lws: testutils.BuildBasicLeaderWorkerSet("test-sample", "default").
Replica(1).
RolloutStrategy(leaderworkerset.RolloutStrategy{
Expand All @@ -270,7 +290,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
Namespace: ptr.To[string]("default"),
Labels: map[string]string{
"leaderworkerset.sigs.k8s.io/name": "test-sample",
"leaderworkerset.sigs.k8s.io/template-revision-hash": hash2,
"leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2,
},
Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/replicas": "1"},
},
Expand All @@ -287,7 +307,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
Labels: map[string]string{
"leaderworkerset.sigs.k8s.io/name": "test-sample",
"leaderworkerset.sigs.k8s.io/worker-index": "0",
"leaderworkerset.sigs.k8s.io/template-revision-hash": hash2,
"leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2,
},
Annotations: map[string]string{
"leaderworkerset.sigs.k8s.io/size": "1",
Expand All @@ -313,7 +333,8 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
},
},
{
name: "1 replica, size 2, with leader template, exclusive placement enabled, subgroupsize enabled",
name: "1 replica, size 2, with leader template, exclusive placement enabled, subgroupsize enabled",
revisionKey: revisionKey1,
lws: testutils.BuildBasicLeaderWorkerSet("test-sample", "default").Annotation(map[string]string{
leaderworkerset.SubGroupExclusiveKeyAnnotationKey: "topologyKey",
}).SubGroupSize(2).Replica(1).
Expand All @@ -337,7 +358,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
Namespace: ptr.To[string]("default"),
Labels: map[string]string{
"leaderworkerset.sigs.k8s.io/name": "test-sample",
"leaderworkerset.sigs.k8s.io/template-revision-hash": hash1,
"leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey1,
},
Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/replicas": "1"},
},
Expand All @@ -354,7 +375,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
Labels: map[string]string{
"leaderworkerset.sigs.k8s.io/name": "test-sample",
"leaderworkerset.sigs.k8s.io/worker-index": "0",
"leaderworkerset.sigs.k8s.io/template-revision-hash": hash1,
"leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey1,
},
Annotations: map[string]string{
"leaderworkerset.sigs.k8s.io/size": "2",
Expand Down Expand Up @@ -383,7 +404,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
stsApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(tc.lws, 0, *tc.lws.Spec.Replicas)
stsApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(tc.lws, 0, *tc.lws.Spec.Replicas, tc.revisionKey)
if err != nil {
t.Errorf("failed with error: %s", err.Error())
}
Expand Down
23 changes: 18 additions & 5 deletions pkg/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
acceleratorutils "sigs.k8s.io/lws/pkg/utils/accelerators"
controllerutils "sigs.k8s.io/lws/pkg/utils/controller"
podutils "sigs.k8s.io/lws/pkg/utils/pod"
revisionutils "sigs.k8s.io/lws/pkg/utils/revision"
statefulsetutils "sigs.k8s.io/lws/pkg/utils/statefulset"
)

Expand Down Expand Up @@ -118,8 +119,12 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
log.V(2).Info("defer the creation of the worker statefulset because leader pod is not ready.")
return ctrl.Result{}, nil
}

statefulSet, err := constructWorkerStatefulSetApplyConfiguration(pod, leaderWorkerSet)
revision, err := revisionutils.GetRevision(ctx, r.Client, &leaderWorkerSet, revisionutils.GetRevisionKey(&pod))
if err != nil {
log.Error(err, "Getting lws revisions")
return ctrl.Result{}, err
}
statefulSet, err := constructWorkerStatefulSetApplyConfiguration(pod, leaderWorkerSet, revision)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -180,6 +185,10 @@ func (r *PodReconciler) handleRestartPolicy(ctx context.Context, pod corev1.Pod,
if err := r.Get(ctx, types.NamespacedName{Name: leaderPodName, Namespace: pod.Namespace}, &leader); err != nil {
return false, err
}
// Different revision key means that this pod will be deleted soon and alternative will be created with the matching key
if revisionutils.GetRevisionKey(&leader) != revisionutils.GetRevisionKey(&pod) {
return false, nil
}
} else {
leader = pod
}
Expand Down Expand Up @@ -259,8 +268,12 @@ func setControllerReferenceWithStatefulSet(owner metav1.Object, sts *appsapplyv1
}

// constructWorkerStatefulSetApplyConfiguration constructs the applied configuration for the leader StatefulSet
func constructWorkerStatefulSetApplyConfiguration(leaderPod corev1.Pod, lws leaderworkerset.LeaderWorkerSet) (*appsapplyv1.StatefulSetApplyConfiguration, error) {
podTemplateSpec := *lws.Spec.LeaderWorkerTemplate.WorkerTemplate.DeepCopy()
func constructWorkerStatefulSetApplyConfiguration(leaderPod corev1.Pod, lws leaderworkerset.LeaderWorkerSet, currentRevision *appsv1.ControllerRevision) (*appsapplyv1.StatefulSetApplyConfiguration, error) {
currentLws, err := revisionutils.ApplyRevision(&lws, currentRevision)
if err != nil {
return nil, err
}
podTemplateSpec := *currentLws.Spec.LeaderWorkerTemplate.WorkerTemplate.DeepCopy()
// construct pod template spec configuration
obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&podTemplateSpec)
if err != nil {
Expand All @@ -280,7 +293,7 @@ func constructWorkerStatefulSetApplyConfiguration(leaderPod corev1.Pod, lws lead
leaderworkerset.GroupIndexLabelKey: leaderPod.Labels[leaderworkerset.GroupIndexLabelKey],
leaderworkerset.SetNameLabelKey: lws.Name,
leaderworkerset.GroupUniqueHashLabelKey: leaderPod.Labels[leaderworkerset.GroupUniqueHashLabelKey],
leaderworkerset.TemplateRevisionHashKey: leaderPod.Labels[leaderworkerset.TemplateRevisionHashKey],
leaderworkerset.RevisionKey: revisionutils.GetRevisionKey(&leaderPod),
}

podTemplateApplyConfiguration.WithLabels(labelMap)
Expand Down
Loading

0 comments on commit c03d4c9

Please sign in to comment.