Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions controllers/workloads/instanceset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@ import (
"context"
"fmt"
"strings"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/golang/mock/gomock"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -103,6 +106,35 @@ var _ = Describe("InstanceSet Controller", func() {
).Should(Succeed())
}

mockPodReady := func(podNames ...string) {
By("mock pods ready")
for _, podName := range podNames {
podKey := types.NamespacedName{
Namespace: itsObj.Namespace,
Name: podName,
}
Eventually(testapps.GetAndChangeObjStatus(&testCtx, podKey, func(pod *corev1.Pod) {
pod.Status.Phase = corev1.PodRunning
pod.Status.Conditions = []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Now(),
},
}
pod.Status.ContainerStatuses = []corev1.ContainerStatus{
{
Name: pod.Spec.Containers[0].Name,
State: corev1.ContainerState{
Running: &corev1.ContainerStateRunning{},
},
Image: pod.Spec.Containers[0].Image,
},
}
})()).Should(Succeed())
}
}

Context("reconciliation", func() {
It("should reconcile well", func() {
name := "test-instance-set"
Expand Down Expand Up @@ -147,6 +179,71 @@ var _ = Describe("InstanceSet Controller", func() {
Eventually(testapps.CheckObjExists(&testCtx, client.ObjectKeyFromObject(its), &workloads.InstanceSet{}, false)).
Should(Succeed())
})

It("rolling", func() {
replicas := int32(3)
createITSObj(itsName, func(f *testapps.MockInstanceSetFactory) {
f.SetReplicas(replicas).
SetInstanceUpdateStrategy(&workloads.InstanceUpdateStrategy{
Type: kbappsv1.RollingUpdateStrategyType,
RollingUpdate: &workloads.RollingUpdate{
Replicas: &intstr.IntOrString{
Type: intstr.Int,
IntVal: 1, // one instance at a time
},
},
}).SetPodManagementPolicy(appsv1.ParallelPodManagement)
})

podsKey := []types.NamespacedName{
{
Namespace: itsObj.Namespace,
Name: fmt.Sprintf("%s-0", itsObj.Name),
},
{
Namespace: itsObj.Namespace,
Name: fmt.Sprintf("%s-1", itsObj.Name),
},
{
Namespace: itsObj.Namespace,
Name: fmt.Sprintf("%s-2", itsObj.Name),
},
}
mockPodReady(podsKey[0].Name, podsKey[1].Name, podsKey[2].Name)

By("check its ready")
Eventually(testapps.CheckObj(&testCtx, itsKey, func(g Gomega, its *workloads.InstanceSet) {
g.Expect(its.IsInstanceSetReady()).Should(BeTrue())
})).Should(Succeed())

By("update its spec")
beforeUpdate := time.Now()
time.Sleep(1 * time.Second)
Expect(testapps.GetAndChangeObj(&testCtx, itsKey, func(its *workloads.InstanceSet) {
its.Spec.Template.Spec.DNSPolicy = corev1.DNSClusterFirstWithHostNet
})()).ShouldNot(HaveOccurred())

for i := replicas; i > 0; i-- {
By("wait new pod created")
podKey := podsKey[i-1]
Eventually(testapps.CheckObj(&testCtx, podKey, func(g Gomega, pod *corev1.Pod) {
g.Expect(pod.CreationTimestamp.After(beforeUpdate)).Should(BeTrue())
})).Should(Succeed())

// mock new pod ready
mockPodReady(podKey.Name)

By(fmt.Sprintf("check its status updated: %s", podKey.Name))
Eventually(testapps.CheckObj(&testCtx, itsKey, func(g Gomega, its *workloads.InstanceSet) {
g.Expect(its.Status.UpdatedReplicas).Should(Equal(replicas - i + 1))
})).Should(Succeed())
}

By("check its ready")
Eventually(testapps.CheckObj(&testCtx, itsKey, func(g Gomega, its *workloads.InstanceSet) {
g.Expect(its.IsInstanceSetReady()).Should(BeTrue())
})).Should(Succeed())
})
})

Context("PVC retention policy", func() {
Expand Down
114 changes: 64 additions & 50 deletions pkg/controller/instanceset/reconciler_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,34 +104,18 @@ func (r *updateReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilder
}

// handle 'RollingUpdate'
replicas, maxUnavailable, err := parseReplicasNMaxUnavailable(its.Spec.InstanceUpdateStrategy, len(oldPodList))
rollingUpdateQuota, unavailableQuota, err := r.rollingUpdateQuota(its, oldPodList)
if err != nil {
return kubebuilderx.Continue, err
}
currentUnavailable := 0
for _, pod := range oldPodList {
if !intctrlutil.IsPodAvailable(pod, its.Spec.MinReadySeconds) {
currentUnavailable++
}
}
unavailable := maxUnavailable - currentUnavailable

// if it's a roleful InstanceSet, we use updateCount to represent Pods can be updated according to the spec.memberUpdateStrategy.
updateCount := len(oldPodList)
if len(its.Spec.Roles) > 0 {
plan := NewUpdatePlan(*its, oldPodList, r.isPodOrConfigUpdated)
podsToBeUpdated, err := plan.Execute()
if err != nil {
return kubebuilderx.Continue, err
}
updateCount = len(podsToBeUpdated)
// handle 'MemberUpdate'
memberUpdateQuota, err := r.memberUpdateQuota(its, oldPodList)
if err != nil {
return kubebuilderx.Continue, err
}

updatingPods := 0
updatedPods := 0
priorities := ComposeRolePriorityMap(its.Spec.Roles)
isBlocked := false
needRetry := false
sortObjects(oldPodList, priorities, false)

// treat old and Pending pod as a special case, as they can be updated without a consequence
Expand All @@ -148,38 +132,18 @@ func (r *updateReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilder
}
}

canBeUpdated := func(pod *corev1.Pod) bool {
if !isImageMatched(pod) {
tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s does not have the same image(s) in the status and in the spec", its.Namespace, its.Name, pod.Name))
return false
}
if !intctrlutil.IsPodReady(pod) {
tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s is not ready", its.Namespace, its.Name, pod.Name))
return false
}
if !intctrlutil.IsPodAvailable(pod, its.Spec.MinReadySeconds) {
tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s is not available", its.Namespace, its.Name, pod.Name))
// no pod event will trigger the next reconciliation, so retry it
needRetry = true
return false
}
if !isRoleReady(pod, its.Spec.Roles) {
tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the role of pod %s is not ready", its.Namespace, its.Name, pod.Name))
return false
}

return true
}

updatingPods := 0
isBlocked := false
needRetry := false
for _, pod := range oldPodList {
if updatingPods >= updateCount || updatingPods >= unavailable {
if updatingPods >= rollingUpdateQuota || updatingPods >= unavailableQuota {
break
}
if updatedPods >= replicas {
if updatingPods >= memberUpdateQuota {
break
}

if !canBeUpdated(pod) {
if canBeUpdated, retry := r.isPodCanBeUpdated(tree, its, pod); !canBeUpdated {
needRetry = retry
break
}

Expand Down Expand Up @@ -248,9 +212,8 @@ func (r *updateReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilder
updatingPods++
}
}

updatedPods++
}

if !isBlocked {
meta.RemoveStatusCondition(&its.Status.Conditions, string(workloads.InstanceUpdateRestricted))
}
Expand All @@ -260,6 +223,57 @@ func (r *updateReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilder
return kubebuilderx.Continue, nil
}

func (r *updateReconciler) rollingUpdateQuota(its *workloads.InstanceSet, podList []*corev1.Pod) (int, int, error) {
// handle 'RollingUpdate'
replicas, maxUnavailable, err := parseReplicasNMaxUnavailable(its.Spec.InstanceUpdateStrategy, len(podList))
if err != nil {
return -1, -1, err
}
currentUnavailable := 0
for _, pod := range podList {
if !intctrlutil.IsPodAvailable(pod, its.Spec.MinReadySeconds) {
currentUnavailable++
}
}
unavailable := maxUnavailable - currentUnavailable
return replicas, unavailable, nil
}

func (r *updateReconciler) memberUpdateQuota(its *workloads.InstanceSet, podList []*corev1.Pod) (int, error) {
// if it's a roleful InstanceSet, we use updateCount to represent Pods can be updated according to the spec.memberUpdateStrategy.
updateCount := len(podList)
if len(its.Spec.Roles) > 0 {
plan := NewUpdatePlan(*its, podList, r.isPodOrConfigUpdated)
podsToBeUpdated, err := plan.Execute()
if err != nil {
return -1, err
}
updateCount = len(podsToBeUpdated)
}
return updateCount, nil
}

func (r *updateReconciler) isPodCanBeUpdated(tree *kubebuilderx.ObjectTree, its *workloads.InstanceSet, pod *corev1.Pod) (bool, bool) {
if !isImageMatched(pod) {
tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s does not have the same image(s) in the status and in the spec", its.Namespace, its.Name, pod.Name))
return false, false
}
if !intctrlutil.IsPodReady(pod) {
tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s is not ready", its.Namespace, its.Name, pod.Name))
return false, false
}
if !intctrlutil.IsPodAvailable(pod, its.Spec.MinReadySeconds) {
tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s is not available", its.Namespace, its.Name, pod.Name))
// no pod event will trigger the next reconciliation, so retry it
return false, true
}
if !isRoleReady(pod, its.Spec.Roles) {
tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the role of pod %s is not ready", its.Namespace, its.Name, pod.Name))
return false, false
}
return true, false
}

func (r *updateReconciler) switchover(tree *kubebuilderx.ObjectTree, its *workloads.InstanceSet, pod *corev1.Pod) error {
if its.Spec.MembershipReconfiguration == nil || its.Spec.MembershipReconfiguration.Switchover == nil {
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/instanceset/reconciler_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ var _ = Describe("update reconciler test", func() {
res, err = reconciler.Reconcile(partitionTree)
Expect(err).Should(BeNil())
Expect(res).Should(Equal(kubebuilderx.Continue))
expectUpdatedPods(partitionTree, []string{"bar-foo-0"})
expectUpdatedPods(partitionTree, []string{"bar-foo-0", "bar-3"})

By("reconcile with UpdateStrategy='OnDelete'")
onDeleteTree, err := tree.DeepCopy()
Expand Down
6 changes: 6 additions & 0 deletions pkg/testutil/apps/instance_set_factoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package apps

import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -124,3 +125,8 @@ func (factory *MockInstanceSetFactory) SetInstanceUpdateStrategy(instanceUpdateS
factory.Get().Spec.InstanceUpdateStrategy = instanceUpdateStrategy
return factory
}

func (factory *MockInstanceSetFactory) SetPodManagementPolicy(podManagementPolicy appsv1.PodManagementPolicyType) *MockInstanceSetFactory {
factory.Get().Spec.PodManagementPolicy = podManagementPolicy
return factory
}