Skip to content

Commit f897e51

Browse files
authored
chore: fix to update one pod at a time in a rolling update (#9810)
(cherry picked from commit c620878)
1 parent 497af27 commit f897e51

File tree

4 files changed

+168
-51
lines changed

4 files changed

+168
-51
lines changed

controllers/workloads/instanceset_controller_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,18 @@ import (
2323
"context"
2424
"fmt"
2525
"strings"
26+
"time"
2627

2728
. "github.com/onsi/ginkgo/v2"
2829
. "github.com/onsi/gomega"
2930

3031
"github.com/golang/mock/gomock"
32+
appsv1 "k8s.io/api/apps/v1"
3133
corev1 "k8s.io/api/core/v1"
3234
"k8s.io/apimachinery/pkg/api/resource"
3335
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3436
"k8s.io/apimachinery/pkg/types"
37+
"k8s.io/apimachinery/pkg/util/intstr"
3538
"k8s.io/utils/ptr"
3639
"sigs.k8s.io/controller-runtime/pkg/client"
3740

@@ -103,6 +106,35 @@ var _ = Describe("InstanceSet Controller", func() {
103106
).Should(Succeed())
104107
}
105108

109+
mockPodReady := func(podNames ...string) {
110+
By("mock pods ready")
111+
for _, podName := range podNames {
112+
podKey := types.NamespacedName{
113+
Namespace: itsObj.Namespace,
114+
Name: podName,
115+
}
116+
Eventually(testapps.GetAndChangeObjStatus(&testCtx, podKey, func(pod *corev1.Pod) {
117+
pod.Status.Phase = corev1.PodRunning
118+
pod.Status.Conditions = []corev1.PodCondition{
119+
{
120+
Type: corev1.PodReady,
121+
Status: corev1.ConditionTrue,
122+
LastTransitionTime: metav1.Now(),
123+
},
124+
}
125+
pod.Status.ContainerStatuses = []corev1.ContainerStatus{
126+
{
127+
Name: pod.Spec.Containers[0].Name,
128+
State: corev1.ContainerState{
129+
Running: &corev1.ContainerStateRunning{},
130+
},
131+
Image: pod.Spec.Containers[0].Image,
132+
},
133+
}
134+
})()).Should(Succeed())
135+
}
136+
}
137+
106138
Context("reconciliation", func() {
107139
It("should reconcile well", func() {
108140
name := "test-instance-set"
@@ -147,6 +179,71 @@ var _ = Describe("InstanceSet Controller", func() {
147179
Eventually(testapps.CheckObjExists(&testCtx, client.ObjectKeyFromObject(its), &workloads.InstanceSet{}, false)).
148180
Should(Succeed())
149181
})
182+
183+
It("rolling", func() {
184+
replicas := int32(3)
185+
createITSObj(itsName, func(f *testapps.MockInstanceSetFactory) {
186+
f.SetReplicas(replicas).
187+
SetInstanceUpdateStrategy(&workloads.InstanceUpdateStrategy{
188+
Type: kbappsv1.RollingUpdateStrategyType,
189+
RollingUpdate: &workloads.RollingUpdate{
190+
Replicas: &intstr.IntOrString{
191+
Type: intstr.Int,
192+
IntVal: 1, // one instance at a time
193+
},
194+
},
195+
}).SetPodManagementPolicy(appsv1.ParallelPodManagement)
196+
})
197+
198+
podsKey := []types.NamespacedName{
199+
{
200+
Namespace: itsObj.Namespace,
201+
Name: fmt.Sprintf("%s-0", itsObj.Name),
202+
},
203+
{
204+
Namespace: itsObj.Namespace,
205+
Name: fmt.Sprintf("%s-1", itsObj.Name),
206+
},
207+
{
208+
Namespace: itsObj.Namespace,
209+
Name: fmt.Sprintf("%s-2", itsObj.Name),
210+
},
211+
}
212+
mockPodReady(podsKey[0].Name, podsKey[1].Name, podsKey[2].Name)
213+
214+
By("check its ready")
215+
Eventually(testapps.CheckObj(&testCtx, itsKey, func(g Gomega, its *workloads.InstanceSet) {
216+
g.Expect(its.IsInstanceSetReady()).Should(BeTrue())
217+
})).Should(Succeed())
218+
219+
By("update its spec")
220+
beforeUpdate := time.Now()
221+
time.Sleep(1 * time.Second)
222+
Expect(testapps.GetAndChangeObj(&testCtx, itsKey, func(its *workloads.InstanceSet) {
223+
its.Spec.Template.Spec.DNSPolicy = corev1.DNSClusterFirstWithHostNet
224+
})()).ShouldNot(HaveOccurred())
225+
226+
for i := replicas; i > 0; i-- {
227+
By("wait new pod created")
228+
podKey := podsKey[i-1]
229+
Eventually(testapps.CheckObj(&testCtx, podKey, func(g Gomega, pod *corev1.Pod) {
230+
g.Expect(pod.CreationTimestamp.After(beforeUpdate)).Should(BeTrue())
231+
})).Should(Succeed())
232+
233+
// mock new pod ready
234+
mockPodReady(podKey.Name)
235+
236+
By(fmt.Sprintf("check its status updated: %s", podKey.Name))
237+
Eventually(testapps.CheckObj(&testCtx, itsKey, func(g Gomega, its *workloads.InstanceSet) {
238+
g.Expect(its.Status.UpdatedReplicas).Should(Equal(replicas - i + 1))
239+
})).Should(Succeed())
240+
}
241+
242+
By("check its ready")
243+
Eventually(testapps.CheckObj(&testCtx, itsKey, func(g Gomega, its *workloads.InstanceSet) {
244+
g.Expect(its.IsInstanceSetReady()).Should(BeTrue())
245+
})).Should(Succeed())
246+
})
150247
})
151248

152249
Context("PVC retention policy", func() {

pkg/controller/instanceset/reconciler_update.go

Lines changed: 64 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -104,34 +104,18 @@ func (r *updateReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilder
104104
}
105105

106106
// handle 'RollingUpdate'
107-
replicas, maxUnavailable, err := parseReplicasNMaxUnavailable(its.Spec.InstanceUpdateStrategy, len(oldPodList))
107+
rollingUpdateQuota, unavailableQuota, err := r.rollingUpdateQuota(its, oldPodList)
108108
if err != nil {
109109
return kubebuilderx.Continue, err
110110
}
111-
currentUnavailable := 0
112-
for _, pod := range oldPodList {
113-
if !intctrlutil.IsPodAvailable(pod, its.Spec.MinReadySeconds) {
114-
currentUnavailable++
115-
}
116-
}
117-
unavailable := maxUnavailable - currentUnavailable
118111

119-
// if it's a roleful InstanceSet, we use updateCount to represent Pods can be updated according to the spec.memberUpdateStrategy.
120-
updateCount := len(oldPodList)
121-
if len(its.Spec.Roles) > 0 {
122-
plan := NewUpdatePlan(*its, oldPodList, r.isPodOrConfigUpdated)
123-
podsToBeUpdated, err := plan.Execute()
124-
if err != nil {
125-
return kubebuilderx.Continue, err
126-
}
127-
updateCount = len(podsToBeUpdated)
112+
// handle 'MemberUpdate'
113+
memberUpdateQuota, err := r.memberUpdateQuota(its, oldPodList)
114+
if err != nil {
115+
return kubebuilderx.Continue, err
128116
}
129117

130-
updatingPods := 0
131-
updatedPods := 0
132118
priorities := ComposeRolePriorityMap(its.Spec.Roles)
133-
isBlocked := false
134-
needRetry := false
135119
sortObjects(oldPodList, priorities, false)
136120

137121
// treat old and Pending pod as a special case, as they can be updated without a consequence
@@ -148,38 +132,18 @@ func (r *updateReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilder
148132
}
149133
}
150134

151-
canBeUpdated := func(pod *corev1.Pod) bool {
152-
if !isImageMatched(pod) {
153-
tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s does not have the same image(s) in the status and in the spec", its.Namespace, its.Name, pod.Name))
154-
return false
155-
}
156-
if !intctrlutil.IsPodReady(pod) {
157-
tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s is not ready", its.Namespace, its.Name, pod.Name))
158-
return false
159-
}
160-
if !intctrlutil.IsPodAvailable(pod, its.Spec.MinReadySeconds) {
161-
tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s is not available", its.Namespace, its.Name, pod.Name))
162-
// no pod event will trigger the next reconciliation, so retry it
163-
needRetry = true
164-
return false
165-
}
166-
if !isRoleReady(pod, its.Spec.Roles) {
167-
tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the role of pod %s is not ready", its.Namespace, its.Name, pod.Name))
168-
return false
169-
}
170-
171-
return true
172-
}
173-
135+
updatingPods := 0
136+
isBlocked := false
137+
needRetry := false
174138
for _, pod := range oldPodList {
175-
if updatingPods >= updateCount || updatingPods >= unavailable {
139+
if updatingPods >= rollingUpdateQuota || updatingPods >= unavailableQuota {
176140
break
177141
}
178-
if updatedPods >= replicas {
142+
if updatingPods >= memberUpdateQuota {
179143
break
180144
}
181-
182-
if !canBeUpdated(pod) {
145+
if canBeUpdated, retry := r.isPodCanBeUpdated(tree, its, pod); !canBeUpdated {
146+
needRetry = retry
183147
break
184148
}
185149

@@ -248,9 +212,8 @@ func (r *updateReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilder
248212
updatingPods++
249213
}
250214
}
251-
252-
updatedPods++
253215
}
216+
254217
if !isBlocked {
255218
meta.RemoveStatusCondition(&its.Status.Conditions, string(workloads.InstanceUpdateRestricted))
256219
}
@@ -260,6 +223,57 @@ func (r *updateReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilder
260223
return kubebuilderx.Continue, nil
261224
}
262225

226+
func (r *updateReconciler) rollingUpdateQuota(its *workloads.InstanceSet, podList []*corev1.Pod) (int, int, error) {
227+
// handle 'RollingUpdate'
228+
replicas, maxUnavailable, err := parseReplicasNMaxUnavailable(its.Spec.InstanceUpdateStrategy, len(podList))
229+
if err != nil {
230+
return -1, -1, err
231+
}
232+
currentUnavailable := 0
233+
for _, pod := range podList {
234+
if !intctrlutil.IsPodAvailable(pod, its.Spec.MinReadySeconds) {
235+
currentUnavailable++
236+
}
237+
}
238+
unavailable := maxUnavailable - currentUnavailable
239+
return replicas, unavailable, nil
240+
}
241+
242+
func (r *updateReconciler) memberUpdateQuota(its *workloads.InstanceSet, podList []*corev1.Pod) (int, error) {
243+
// if it's a roleful InstanceSet, we use updateCount to represent Pods can be updated according to the spec.memberUpdateStrategy.
244+
updateCount := len(podList)
245+
if len(its.Spec.Roles) > 0 {
246+
plan := NewUpdatePlan(*its, podList, r.isPodOrConfigUpdated)
247+
podsToBeUpdated, err := plan.Execute()
248+
if err != nil {
249+
return -1, err
250+
}
251+
updateCount = len(podsToBeUpdated)
252+
}
253+
return updateCount, nil
254+
}
255+
256+
func (r *updateReconciler) isPodCanBeUpdated(tree *kubebuilderx.ObjectTree, its *workloads.InstanceSet, pod *corev1.Pod) (bool, bool) {
257+
if !isImageMatched(pod) {
258+
tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s does not have the same image(s) in the status and in the spec", its.Namespace, its.Name, pod.Name))
259+
return false, false
260+
}
261+
if !intctrlutil.IsPodReady(pod) {
262+
tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s is not ready", its.Namespace, its.Name, pod.Name))
263+
return false, false
264+
}
265+
if !intctrlutil.IsPodAvailable(pod, its.Spec.MinReadySeconds) {
266+
tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the pod %s is not available", its.Namespace, its.Name, pod.Name))
267+
// no pod event will trigger the next reconciliation, so retry it
268+
return false, true
269+
}
270+
if !isRoleReady(pod, its.Spec.Roles) {
271+
tree.Logger.Info(fmt.Sprintf("InstanceSet %s/%s blocks on update as the role of pod %s is not ready", its.Namespace, its.Name, pod.Name))
272+
return false, false
273+
}
274+
return true, false
275+
}
276+
263277
func (r *updateReconciler) switchover(tree *kubebuilderx.ObjectTree, its *workloads.InstanceSet, pod *corev1.Pod) error {
264278
if its.Spec.MembershipReconfiguration == nil || its.Spec.MembershipReconfiguration.Switchover == nil {
265279
return nil

pkg/controller/instanceset/reconciler_update_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ var _ = Describe("update reconciler test", func() {
213213
res, err = reconciler.Reconcile(partitionTree)
214214
Expect(err).Should(BeNil())
215215
Expect(res).Should(Equal(kubebuilderx.Continue))
216-
expectUpdatedPods(partitionTree, []string{"bar-foo-0"})
216+
expectUpdatedPods(partitionTree, []string{"bar-foo-0", "bar-3"})
217217

218218
By("reconcile with UpdateStrategy='OnDelete'")
219219
onDeleteTree, err := tree.DeepCopy()

pkg/testutil/apps/instance_set_factoy.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
2020
package apps
2121

2222
import (
23+
appsv1 "k8s.io/api/apps/v1"
2324
corev1 "k8s.io/api/core/v1"
2425
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2526

@@ -124,3 +125,8 @@ func (factory *MockInstanceSetFactory) SetInstanceUpdateStrategy(instanceUpdateS
124125
factory.Get().Spec.InstanceUpdateStrategy = instanceUpdateStrategy
125126
return factory
126127
}
128+
129+
func (factory *MockInstanceSetFactory) SetPodManagementPolicy(podManagementPolicy appsv1.PodManagementPolicyType) *MockInstanceSetFactory {
130+
factory.Get().Spec.PodManagementPolicy = podManagementPolicy
131+
return factory
132+
}

0 commit comments

Comments
 (0)