Skip to content

Commit

Permalink
fix: fix bouncy scaling bug
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 21, 2021
1 parent 2d661ab commit 22b0b8b
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 123 deletions.
3 changes: 1 addition & 2 deletions api/v1alpha1/step_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ type Step struct {
Status StepStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}

func (in *Step) GetTargetReplicas(scalingDelay, peekDelay time.Duration) int {
func (in *Step) GetTargetReplicas(currentReplicas int, scalingDelay, peekDelay time.Duration) int {
lastScaledAt := in.Status.LastScaledAt.Time
currentReplicas := in.Status.GetReplicas() // can be -1

if time.Since(lastScaledAt) < scalingDelay {
return currentReplicas
Expand Down
52 changes: 26 additions & 26 deletions api/v1alpha1/step_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,67 +17,67 @@ func TestStep_GetTargetReplicas(t *testing.T) {
t.Run("Init", func(t *testing.T) {
t.Run("Min=0", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 0}}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
assert.Equal(t, 1, s.GetTargetReplicas(0, scalingDelay, peekDelay))
})
t.Run("Min=1", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 1}}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
assert.Equal(t, 1, s.GetTargetReplicas(0, scalingDelay, peekDelay))
})
})
t.Run("ScalingUp", func(t *testing.T) {
t.Run("Min=2,Replicas=1,LastScaledAt=old", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 2}}, Status: StepStatus{Replicas: 1, LastScaledAt: old}}
assert.Equal(t, 2, s.GetTargetReplicas(scalingDelay, peekDelay))
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 2}}, Status: StepStatus{LastScaledAt: old}}
assert.Equal(t, 2, s.GetTargetReplicas(1, scalingDelay, peekDelay))
})
t.Run("Min=2,Replicas=1,LastScaledAt=recent", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 2}}, Status: StepStatus{Replicas: 1, LastScaledAt: recent}}
assert.Equal(t, 2, s.GetTargetReplicas(scalingDelay, peekDelay))
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 2}}, Status: StepStatus{LastScaledAt: recent}}
assert.Equal(t, 2, s.GetTargetReplicas(1, scalingDelay, peekDelay))
})
t.Run("Min=2,Replicas=1,LastScaledAt=now", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 2}}, Status: StepStatus{Replicas: 1, LastScaledAt: now}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 2}}, Status: StepStatus{LastScaledAt: now}}
assert.Equal(t, 1, s.GetTargetReplicas(1, scalingDelay, peekDelay))
})
})
t.Run("ScalingDown", func(t *testing.T) {
t.Run("Min=1,Replicas=2,LastScaledAt=old", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 1}}, Status: StepStatus{Replicas: 2, LastScaledAt: old}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 1}}, Status: StepStatus{LastScaledAt: old}}
assert.Equal(t, 1, s.GetTargetReplicas(2, scalingDelay, peekDelay))
})
t.Run("Min=1,Replicas=2,LastScaledAt=recent", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 1}}, Status: StepStatus{Replicas: 2, LastScaledAt: recent}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 1}}, Status: StepStatus{LastScaledAt: recent}}
assert.Equal(t, 1, s.GetTargetReplicas(2, scalingDelay, peekDelay))
})
t.Run("Min=1,Replicas=2,LastScaledAt=now", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 1}}, Status: StepStatus{Replicas: 2, LastScaledAt: now}}
assert.Equal(t, 2, s.GetTargetReplicas(scalingDelay, peekDelay))
s := &Step{Spec: StepSpec{Scale: &Scale{MinReplicas: 1}}, Status: StepStatus{LastScaledAt: now}}
assert.Equal(t, 2, s.GetTargetReplicas(2, scalingDelay, peekDelay))
})
})
t.Run("ScaleToZero", func(t *testing.T) {
t.Run("Min=0,Replicas=1,LastScaledAt=old", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{Replicas: 1, LastScaledAt: old}}
assert.Equal(t, 0, s.GetTargetReplicas(scalingDelay, peekDelay))
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{LastScaledAt: old}}
assert.Equal(t, 0, s.GetTargetReplicas(1, scalingDelay, peekDelay))
})
t.Run("Min=0,Replicas=1,LastScaledAt=recent", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{Replicas: 1, LastScaledAt: recent}}
assert.Equal(t, 0, s.GetTargetReplicas(scalingDelay, peekDelay))
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{LastScaledAt: recent}}
assert.Equal(t, 0, s.GetTargetReplicas(1, scalingDelay, peekDelay))
})
t.Run("Min=0,Replicas=1,LastScaledAt=now", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{Replicas: 1, LastScaledAt: now}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{LastScaledAt: now}}
assert.Equal(t, 1, s.GetTargetReplicas(1, scalingDelay, peekDelay))
})
})
t.Run("Peek", func(t *testing.T) {
t.Run("Min=0,Replicas=0,LastScaledAt=old", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{Replicas: 0, LastScaledAt: old}}
assert.Equal(t, 1, s.GetTargetReplicas(scalingDelay, peekDelay))
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{LastScaledAt: old}}
assert.Equal(t, 1, s.GetTargetReplicas(0, scalingDelay, peekDelay))
})
t.Run("Min=0,Replicas=0,LastScaledAt=recent", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{Replicas: 0, LastScaledAt: now}}
assert.Equal(t, 0, s.GetTargetReplicas(scalingDelay, peekDelay))
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{LastScaledAt: now}}
assert.Equal(t, 0, s.GetTargetReplicas(0, scalingDelay, peekDelay))
})
t.Run("Min=0,Replicas=0,LastScaledAt=now", func(t *testing.T) {
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{Replicas: 0, LastScaledAt: now}}
assert.Equal(t, 0, s.GetTargetReplicas(scalingDelay, peekDelay))
s := &Step{Spec: StepSpec{Scale: &Scale{}}, Status: StepStatus{LastScaledAt: now}}
assert.Equal(t, 0, s.GetTargetReplicas(0, scalingDelay, peekDelay))
})
})
}
189 changes: 95 additions & 94 deletions manager/controllers/step_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,18 @@ func (r *StepReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
}

pipelineName := step.GetLabels()[dfv1.KeyPipelineName]
currentReplicas := step.Status.GetReplicas()
targetReplicas := step.GetTargetReplicas(scalingDelay, peekDelay)
stepName := step.Spec.Name

log.Info("reconciling", "pipelineName", pipelineName, "stepName", stepName)

log.Info("reconciling", "currentReplicas", currentReplicas, "targetReplicas", targetReplicas, "pipelineName", pipelineName)
pods := &corev1.PodList{}
selector, _ := labels.Parse(dfv1.KeyPipelineName + "=" + pipelineName + "," + dfv1.KeyStepName + "=" + stepName)
if err := r.Client.List(ctx, pods, &client.ListOptions{Namespace: step.Namespace, LabelSelector: selector}); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to list pods: %w", err)
}

hash := util.MustHash(step.Spec)
currentReplicas := len(pods.Items)

oldStatus := dfv1.StepStatus{
Phase: step.Status.Phase,
Expand All @@ -86,102 +92,23 @@ func (r *StepReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.

newStatus := dfv1.StepStatus{
Phase: dfv1.StepUnknown,
Replicas: uint32(currentReplicas),
Selector: selector.String(),
LastScaledAt: step.Status.LastScaledAt,
SinkStatues: dfv1.SinkStatuses{},
SourceStatuses: dfv1.SourceStatuses{},
}

stepName := step.Spec.Name
if !step.Status.Phase.Completed() { // once a step is completed, we do not schedule or create pods
for replica := 0; replica < targetReplicas; replica++ {
podName := fmt.Sprintf("%s-%d", step.Name, replica)
log.Info("applying pod", "podName", podName)
_labels := map[string]string{}
annotations := map[string]string{}
if x := step.Spec.Metadata; x != nil {
for k, v := range x.Annotations {
annotations[k] = v
}
for k, v := range x.Labels {
_labels[k] = v
}
}
_labels[dfv1.KeyStepName] = stepName
_labels[dfv1.KeyPipelineName] = pipelineName
annotations[dfv1.KeyReplica] = strconv.Itoa(replica)
annotations[dfv1.KeyHash] = hash
annotations[dfv1.KeyDefaultContainer] = dfv1.CtrMain
annotations[dfv1.KeyKillCmd(dfv1.CtrMain)] = util.MustJSON([]string{dfv1.PathKill, "1"})
annotations[dfv1.KeyKillCmd(dfv1.CtrSidecar)] = util.MustJSON([]string{dfv1.PathKill, "1"})
if err := r.Client.Create(
ctx,
&corev1.Pod{
ObjectMeta: (metav1.ObjectMeta{
Namespace: step.Namespace,
Name: podName,
Labels: _labels,
Annotations: annotations,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(step.GetObjectMeta(), dfv1.StepGroupVersionKind),
},
}),
Spec: step.Spec.GetPodSpec(
dfv1.GetPodSpecReq{
PipelineName: pipelineName,
Namespace: step.Namespace,
Replica: int32(replica),
ImageFormat: imageFormat,
RunnerImage: runnerImage,
PullPolicy: pullPolicy,
UpdateInterval: updateInterval,
StepStatus: step.Status,
},
),
},
); util.IgnoreAlreadyExists(err) != nil {
x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(newStatus.Phase, newStatus.Reason, newStatus.Message), dfv1.NewStepPhaseMessage(dfv1.StepFailed, "", fmt.Sprintf("failed to create pod %s: %v", podName, err)))
newStatus.Phase, newStatus.Reason, newStatus.Message = x.GetPhase(), x.GetReason(), x.GetMessage()
}
}
}

if step.Spec.Sources.Any(func(s dfv1.Source) bool { return s.HTTP != nil }) {
if err := r.Client.Create(
ctx,
&corev1.Service{
ObjectMeta: (metav1.ObjectMeta{
Namespace: step.Namespace,
Name: step.Name,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(step.GetObjectMeta(), dfv1.StepGroupVersionKind),
},
}),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Port: 80, TargetPort: intstr.FromInt(3569)},
},
Selector: map[string]string{
dfv1.KeyPipelineName: pipelineName,
dfv1.KeyStepName: stepName,
},
},
},
); util.IgnoreAlreadyExists(err) != nil {
x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(newStatus.Phase, newStatus.Reason, newStatus.Message), dfv1.NewStepPhaseMessage(dfv1.StepFailed, "", fmt.Sprintf("failed to create service %s: %v", step.Name, err)))
newStatus.Phase, newStatus.Reason, newStatus.Message = x.GetPhase(), x.GetReason(), x.GetMessage()
}
}

pods := &corev1.PodList{}
selector, _ := labels.Parse(dfv1.KeyPipelineName + "=" + pipelineName + "," + dfv1.KeyStepName + "=" + stepName)
if err := r.Client.List(ctx, pods, &client.ListOptions{Namespace: step.Namespace, LabelSelector: selector}); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to list pods: %w", err)
for _, pod := range pods.Items {
phase, reason, message := inferPhase(pod)
x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(newStatus.Phase, newStatus.Reason, newStatus.Message), dfv1.NewStepPhaseMessage(phase, reason, message))
newStatus.Phase, newStatus.Reason, newStatus.Message = x.GetPhase(), x.GetReason(), x.GetMessage()
}

newStatus.Replicas = uint32(len(pods.Items))
newStatus.Selector = selector.String()
targetReplicas := step.GetTargetReplicas(currentReplicas, scalingDelay, peekDelay)

if currentReplicas != targetReplicas {
log.Info("scaling", "currentReplicas", currentReplicas, "targetReplicas", targetReplicas)
newStatus.LastScaledAt = metav1.Time{Time: time.Now()}
r.Recorder.Eventf(step, "Normal", eventReason(currentReplicas, targetReplicas), "Scaling from %d to %d", currentReplicas, targetReplicas)
}
Expand All @@ -194,15 +121,12 @@ func (r *StepReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
newStatus.Phase, newStatus.Reason, newStatus.Message = x.GetPhase(), x.GetReason(), x.GetMessage()
}
} else {
phase, reason, message := inferPhase(pod)
x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(newStatus.Phase, newStatus.Reason, newStatus.Message), dfv1.NewStepPhaseMessage(phase, reason, message))
newStatus.Phase, newStatus.Reason, newStatus.Message = x.GetPhase(), x.GetReason(), x.GetMessage()
// if the main container has terminated, kill all sidecars
mainCtrTerminated := false
for _, s := range pod.Status.ContainerStatuses {
mainCtrTerminated = mainCtrTerminated || (s.Name == dfv1.CtrMain && s.State.Terminated != nil && s.State.Terminated.ExitCode == 0)
}
log.Info("pod", "name", pod.Name, "phase", phase, "reason", reason, "message", message, "mainCtrTerminated", mainCtrTerminated)
log.Info("pod", "name", pod.Name, "mainCtrTerminated", mainCtrTerminated)
if mainCtrTerminated {
for _, s := range pod.Status.ContainerStatuses {
if s.Name != dfv1.CtrMain {
Expand All @@ -215,6 +139,83 @@ func (r *StepReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
}
}

for replica := 0; replica < targetReplicas; replica++ {
podName := fmt.Sprintf("%s-%d", step.Name, replica)
_labels := map[string]string{}
annotations := map[string]string{}
if x := step.Spec.Metadata; x != nil {
for k, v := range x.Annotations {
annotations[k] = v
}
for k, v := range x.Labels {
_labels[k] = v
}
}
_labels[dfv1.KeyStepName] = stepName
_labels[dfv1.KeyPipelineName] = pipelineName
annotations[dfv1.KeyReplica] = strconv.Itoa(replica)
annotations[dfv1.KeyHash] = hash
annotations[dfv1.KeyDefaultContainer] = dfv1.CtrMain
annotations[dfv1.KeyKillCmd(dfv1.CtrMain)] = util.MustJSON([]string{dfv1.PathKill, "1"})
annotations[dfv1.KeyKillCmd(dfv1.CtrSidecar)] = util.MustJSON([]string{dfv1.PathKill, "1"})
if err := r.Client.Create(
ctx,
&corev1.Pod{
ObjectMeta: (metav1.ObjectMeta{
Namespace: step.Namespace,
Name: podName,
Labels: _labels,
Annotations: annotations,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(step.GetObjectMeta(), dfv1.StepGroupVersionKind),
},
}),
Spec: step.Spec.GetPodSpec(
dfv1.GetPodSpecReq{
PipelineName: pipelineName,
Namespace: step.Namespace,
Replica: int32(replica),
ImageFormat: imageFormat,
RunnerImage: runnerImage,
PullPolicy: pullPolicy,
UpdateInterval: updateInterval,
StepStatus: step.Status,
},
),
},
); util.IgnoreAlreadyExists(err) != nil {
x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(newStatus.Phase, newStatus.Reason, newStatus.Message), dfv1.NewStepPhaseMessage(dfv1.StepFailed, "", fmt.Sprintf("failed to create pod %s: %v", podName, err)))
newStatus.Phase, newStatus.Reason, newStatus.Message = x.GetPhase(), x.GetReason(), x.GetMessage()
}
}

if step.Spec.Sources.Any(func(s dfv1.Source) bool { return s.HTTP != nil }) {
if err := r.Client.Create(
ctx,
&corev1.Service{
ObjectMeta: (metav1.ObjectMeta{
Namespace: step.Namespace,
Name: step.Name,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(step.GetObjectMeta(), dfv1.StepGroupVersionKind),
},
}),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Port: 80, TargetPort: intstr.FromInt(3569)},
},
Selector: map[string]string{
dfv1.KeyPipelineName: pipelineName,
dfv1.KeyStepName: stepName,
},
},
},
); util.IgnoreAlreadyExists(err) != nil {
x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(newStatus.Phase, newStatus.Reason, newStatus.Message), dfv1.NewStepPhaseMessage(dfv1.StepFailed, "", fmt.Sprintf("failed to create service %s: %v", step.Name, err)))
newStatus.Phase, newStatus.Reason, newStatus.Message = x.GetPhase(), x.GetReason(), x.GetMessage()
}
}

if notEqual, patch := util.NotEqual(oldStatus, newStatus); notEqual {
log.Info("patching step status", "patch", patch)
if err := r.Status().
Expand Down
2 changes: 1 addition & 1 deletion runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
if err != nil {
logger.Error(err, "failed to get offset", "source", sourceName)
} else if pending := nextOffset - 1 - handler.offset; pending >= 0 {
logger.Info("setting pending", "source", sourceName, "pending", pending)
logger.Info("setting pending", "source", sourceName, "pending", pending, "nextOffset", nextOffset, "handlerOffset", handler.offset)
withLock(func() { status.SourceStatuses.SetPending(sourceName, uint64(pending)) })
}
}
Expand Down

0 comments on commit 22b0b8b

Please sign in to comment.