diff --git a/controllers/workloads/instanceset_controller_test.go b/controllers/workloads/instanceset_controller_test.go index 70c8195324c..757c2bbd260 100644 --- a/controllers/workloads/instanceset_controller_test.go +++ b/controllers/workloads/instanceset_controller_test.go @@ -23,15 +23,18 @@ import ( "context" "fmt" "strings" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/golang/mock/gomock" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -103,6 +106,35 @@ var _ = Describe("InstanceSet Controller", func() { ).Should(Succeed()) } + mockPodReady := func(podNames ...string) { + By("mock pods ready") + for _, podName := range podNames { + podKey := types.NamespacedName{ + Namespace: itsObj.Namespace, + Name: podName, + } + Eventually(testapps.GetAndChangeObjStatus(&testCtx, podKey, func(pod *corev1.Pod) { + pod.Status.Phase = corev1.PodRunning + pod.Status.Conditions = []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + } + pod.Status.ContainerStatuses = []corev1.ContainerStatus{ + { + Name: pod.Spec.Containers[0].Name, + State: corev1.ContainerState{ + Running: &corev1.ContainerStateRunning{}, + }, + Image: pod.Spec.Containers[0].Image, + }, + } + })()).Should(Succeed()) + } + } + Context("reconciliation", func() { It("should reconcile well", func() { name := "test-instance-set" @@ -147,6 +179,71 @@ var _ = Describe("InstanceSet Controller", func() { Eventually(testapps.CheckObjExists(&testCtx, client.ObjectKeyFromObject(its), &workloads.InstanceSet{}, false)). Should(Succeed()) }) + + It("rolling", func() { + replicas := int32(3) + createITSObj(itsName, func(f *testapps.MockInstanceSetFactory) { + f.SetReplicas(replicas). + SetInstanceUpdateStrategy(&workloads.InstanceUpdateStrategy{ + Type: kbappsv1.RollingUpdateStrategyType, + RollingUpdate: &workloads.RollingUpdate{ + Replicas: &intstr.IntOrString{ + Type: intstr.Int, + IntVal: 1, // one instance at a time + }, + }, + }).SetPodManagementPolicy(appsv1.ParallelPodManagement) + }) + + podsKey := []types.NamespacedName{ + { + Namespace: itsObj.Namespace, + Name: fmt.Sprintf("%s-0", itsObj.Name), + }, + { + Namespace: itsObj.Namespace, + Name: fmt.Sprintf("%s-1", itsObj.Name), + }, + { + Namespace: itsObj.Namespace, + Name: fmt.Sprintf("%s-2", itsObj.Name), + }, + } + mockPodReady(podsKey[0].Name, podsKey[1].Name, podsKey[2].Name) + + By("check its ready") + Eventually(testapps.CheckObj(&testCtx, itsKey, func(g Gomega, its *workloads.InstanceSet) { + g.Expect(its.IsInstanceSetReady()).Should(BeTrue()) + })).Should(Succeed()) + + By("update its spec") + beforeUpdate := time.Now() + time.Sleep(1 * time.Second) + Expect(testapps.GetAndChangeObj(&testCtx, itsKey, func(its *workloads.InstanceSet) { + its.Spec.Template.Spec.DNSPolicy = corev1.DNSClusterFirstWithHostNet + })()).ShouldNot(HaveOccurred()) + + for i := replicas; i > 0; i-- { + By("wait new pod created") + podKey := podsKey[i-1] + Eventually(testapps.CheckObj(&testCtx, podKey, func(g Gomega, pod *corev1.Pod) { + g.Expect(pod.CreationTimestamp.After(beforeUpdate)).Should(BeTrue()) + })).Should(Succeed()) + + // mock new pod ready + mockPodReady(podKey.Name) + + By(fmt.Sprintf("check its status updated: %s", podKey.Name)) + Eventually(testapps.CheckObj(&testCtx, itsKey, func(g Gomega, its *workloads.InstanceSet) { + g.Expect(its.Status.UpdatedReplicas).Should(Equal(replicas - i + 1)) + })).Should(Succeed()) + } + + By("check its ready") + Eventually(testapps.CheckObj(&testCtx, itsKey, func(g Gomega, its *workloads.InstanceSet) { + g.Expect(its.IsInstanceSetReady()).Should(BeTrue()) + })).Should(Succeed()) + }) }) Context("PVC retention policy", func() { diff --git a/pkg/controller/instanceset/reconciler_update.go b/pkg/controller/instanceset/reconciler_update.go index 77252b5ae6c..d64ce5978a1 100644 --- a/pkg/controller/instanceset/reconciler_update.go +++ b/pkg/controller/instanceset/reconciler_update.go @@ -104,34 +104,18 @@ func (r *updateReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilder } // handle 'RollingUpdate' - replicas, maxUnavailable, err := parseReplicasNMaxUnavailable(its.Spec.InstanceUpdateStrategy, len(oldPodList)) + rollingUpdateQuota, unavailableQuota, err := r.rollingUpdateQuota(its, oldPodList) if err != nil { return kubebuilderx.Continue, err } - currentUnavailable := 0 - for _, pod := range oldPodList { - if !intctrlutil.IsPodAvailable(pod, its.Spec.MinReadySeconds) { - currentUnavailable++ - } - } - unavailable := maxUnavailable - currentUnavailable - // if it's a roleful InstanceSet, we use updateCount to represent Pods can be updated according to the spec.memberUpdateStrategy. - updateCount := len(oldPodList) - if len(its.Spec.Roles) > 0 { - plan := NewUpdatePlan(*its, oldPodList, r.isPodOrConfigUpdated) - podsToBeUpdated, err := plan.Execute() - if err != nil { - return kubebuilderx.Continue, err - } - updateCount = len(podsToBeUpdated) + // handle 'MemberUpdate' + memberUpdateQuota, err := r.memberUpdateQuota(its, oldPodList) + if err != nil { + return kubebuilderx.Continue, err } - updatingPods := 0 - updatedPods := 0 priorities := ComposeRolePriorityMap(its.Spec.Roles) - isBlocked := false - needRetry := false sortObjects(oldPodList, priorities, false) // treat old and Pending pod as a special case, as they can be updated without a consequence @@ -148,38 +132,18 @@ func (r *updateReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilder } } - canBeUpdated := func(pod *corev1.Pod) bool { - if !isImageMatched(pod) { - tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s does not have the same image(s) in the status and in the spec", its.Namespace, its.Name, pod.Name)) - return false - } - if !intctrlutil.IsPodReady(pod) { - tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s is not ready", its.Namespace, its.Name, pod.Name)) - return false - } - if !intctrlutil.IsPodAvailable(pod, its.Spec.MinReadySeconds) { - tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s is not available", its.Namespace, its.Name, pod.Name)) - // no pod event will trigger the next reconciliation, so retry it - needRetry = true - return false - } - if !isRoleReady(pod, its.Spec.Roles) { - tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the role of pod %s is not ready", its.Namespace, its.Name, pod.Name)) - return false - } - - return true - } - + updatingPods := 0 + isBlocked := false + needRetry := false for _, pod := range oldPodList { - if updatingPods >= updateCount || updatingPods >= unavailable { + if updatingPods >= rollingUpdateQuota || updatingPods >= unavailableQuota { break } - if updatedPods >= replicas { + if updatingPods >= memberUpdateQuota { break } - - if !canBeUpdated(pod) { + if canBeUpdated, retry := r.isPodCanBeUpdated(tree, its, pod); !canBeUpdated { + needRetry = retry break } @@ -248,9 +212,8 @@ func (r *updateReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilder updatingPods++ } } - - updatedPods++ } + if !isBlocked { meta.RemoveStatusCondition(&its.Status.Conditions, string(workloads.InstanceUpdateRestricted)) } @@ -260,6 +223,57 @@ func (r *updateReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilder return kubebuilderx.Continue, nil } +func (r *updateReconciler) rollingUpdateQuota(its *workloads.InstanceSet, podList []*corev1.Pod) (int, int, error) { + // handle 'RollingUpdate' + replicas, maxUnavailable, err := parseReplicasNMaxUnavailable(its.Spec.InstanceUpdateStrategy, len(podList)) + if err != nil { + return -1, -1, err + } + currentUnavailable := 0 + for _, pod := range podList { + if !intctrlutil.IsPodAvailable(pod, its.Spec.MinReadySeconds) { + currentUnavailable++ + } + } + unavailable := maxUnavailable - currentUnavailable + return replicas, unavailable, nil +} + +func (r *updateReconciler) memberUpdateQuota(its *workloads.InstanceSet, podList []*corev1.Pod) (int, error) { + // if it's a roleful InstanceSet, we use updateCount to represent Pods can be updated according to the spec.memberUpdateStrategy. + updateCount := len(podList) + if len(its.Spec.Roles) > 0 { + plan := NewUpdatePlan(*its, podList, r.isPodOrConfigUpdated) + podsToBeUpdated, err := plan.Execute() + if err != nil { + return -1, err + } + updateCount = len(podsToBeUpdated) + } + return updateCount, nil +} + +func (r *updateReconciler) isPodCanBeUpdated(tree *kubebuilderx.ObjectTree, its *workloads.InstanceSet, pod *corev1.Pod) (bool, bool) { + if !isImageMatched(pod) { + tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s does not have the same image(s) in the status and in the spec", its.Namespace, its.Name, pod.Name)) + return false, false + } + if !intctrlutil.IsPodReady(pod) { + tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s is not ready", its.Namespace, its.Name, pod.Name)) + return false, false + } + if !intctrlutil.IsPodAvailable(pod, its.Spec.MinReadySeconds) { + tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s is not available", its.Namespace, its.Name, pod.Name)) + // no pod event will trigger the next reconciliation, so retry it + return false, true + } + if !isRoleReady(pod, its.Spec.Roles) { + tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the role of pod %s is not ready", its.Namespace, its.Name, pod.Name)) + return false, false + } + return true, false +} + func (r *updateReconciler) switchover(tree *kubebuilderx.ObjectTree, its *workloads.InstanceSet, pod *corev1.Pod) error { if its.Spec.MembershipReconfiguration == nil || its.Spec.MembershipReconfiguration.Switchover == nil { return nil diff --git a/pkg/controller/instanceset/reconciler_update_test.go b/pkg/controller/instanceset/reconciler_update_test.go index 76ebe8ea7be..a720d8d48c4 100644 --- a/pkg/controller/instanceset/reconciler_update_test.go +++ b/pkg/controller/instanceset/reconciler_update_test.go @@ -213,7 +213,7 @@ var _ = Describe("update reconciler test", func() { res, err = reconciler.Reconcile(partitionTree) Expect(err).Should(BeNil()) Expect(res).Should(Equal(kubebuilderx.Continue)) - expectUpdatedPods(partitionTree, []string{"bar-foo-0"}) + expectUpdatedPods(partitionTree, []string{"bar-foo-0", "bar-3"}) By("reconcile with UpdateStrategy='OnDelete'") onDeleteTree, err := tree.DeepCopy() diff --git a/pkg/testutil/apps/instance_set_factoy.go b/pkg/testutil/apps/instance_set_factoy.go index dcf1af81457..3ac8a09b6d9 100644 --- a/pkg/testutil/apps/instance_set_factoy.go +++ b/pkg/testutil/apps/instance_set_factoy.go @@ -20,6 +20,7 @@ along with this program. If not, see . package apps import ( + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -124,3 +125,8 @@ func (factory *MockInstanceSetFactory) SetInstanceUpdateStrategy(instanceUpdateS factory.Get().Spec.InstanceUpdateStrategy = instanceUpdateStrategy return factory } + +func (factory *MockInstanceSetFactory) SetPodManagementPolicy(podManagementPolicy appsv1.PodManagementPolicyType) *MockInstanceSetFactory { + factory.Get().Spec.PodManagementPolicy = podManagementPolicy + return factory +}