Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,13 @@ func validateRequiredToParseConfigs() error {
return err
}
}

if jobTimeout := viper.GetString(constant.CfgAddonJobTimeout); jobTimeout != "" {
if _, err := time.ParseDuration(jobTimeout); err != nil {
return err
}
}

if err := validateTolerations(viper.GetString(constant.CfgKeyCtrlrMgrTolerations)); err != nil {
return err
}
Expand Down
61 changes: 61 additions & 0 deletions controllers/extensions/addon_controller_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/json"
"fmt"
"slices"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -543,6 +544,18 @@ func (r *helmTypeInstallStage) Handle(ctx context.Context) {
r.setRequeueWithErr(err, "")
return
} else if err == nil {
// Check if the job is outdated (belongs to an older generation)
if isJobOutdated(helmInstallJob, addon) {
r.reqCtx.Log.Info("Deleting outdated install job", "job", key.Name, "jobGeneration",
helmInstallJob.Annotations[AddonGeneration], "addonGeneration", addon.Generation)
if err := r.reconciler.Delete(ctx, helmInstallJob); client.IgnoreNotFound(err) != nil {
r.setRequeueWithErr(err, "")
return
}
r.setRequeueAfter(time.Second, "recreating install job for new generation")
return
}

if helmInstallJob.Status.Succeeded > 0 {
return
}
Expand Down Expand Up @@ -710,6 +723,18 @@ func (r *helmTypeUninstallStage) Handle(ctx context.Context) {
r.setRequeueWithErr(err, "")
return
} else if err == nil {
// Check if the job is outdated (belongs to an older generation)
if isJobOutdated(helmUninstallJob, addon) {
r.reqCtx.Log.Info("Deleting outdated uninstall job", "job", key.Name, "jobGeneration",
helmUninstallJob.Annotations[AddonGeneration], "addonGeneration", addon.Generation)
if err := r.reconciler.Delete(ctx, helmUninstallJob); client.IgnoreNotFound(err) != nil {
r.setRequeueWithErr(err, "")
return
}
r.setRequeueAfter(time.Second, "recreating uninstall job for new generation")
return
}

if helmUninstallJob.Status.Succeeded > 0 {
r.reqCtx.Log.V(1).Info("helm uninstall job succeed", "job", key)
// TODO:
Expand Down Expand Up @@ -911,6 +936,17 @@ func createHelmJobProto(addon *extensionsv1alpha1.Addon) (*batchv1.Job, error) {
}
ttlSec := int32(ttl.Seconds())
backoffLimit := int32(3)

// Set job timeout to prevent jobs from running indefinitely
jobTimeout := time.Minute * 5
if timeout := viper.GetString(constant.CfgAddonJobTimeout); timeout != "" {
var err error
if jobTimeout, err = time.ParseDuration(timeout); err != nil {
return nil, err
}
}
activeDeadlineSeconds := int64(jobTimeout.Seconds())

container := corev1.Container{
Name: getJobMainContainerName(addon),
Image: viper.GetString(constant.KBToolsImage),
Expand Down Expand Up @@ -940,10 +976,15 @@ func createHelmJobProto(addon *extensionsv1alpha1.Addon) (*batchv1.Job, error) {
constant.AddonNameLabelKey: addon.Name,
constant.AppManagedByLabelKey: constant.AppName,
},
Annotations: map[string]string{
// Add generation annotation to track which addon generation this job belongs to
AddonGeneration: fmt.Sprintf("%d", addon.Generation),
},
},
Spec: batchv1.JobSpec{
BackoffLimit: &backoffLimit,
TTLSecondsAfterFinished: &ttlSec,
ActiveDeadlineSeconds: &activeDeadlineSeconds,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand Down Expand Up @@ -1075,6 +1116,26 @@ func getJobMainContainerName(addon *extensionsv1alpha1.Addon) string {
return strings.ToLower(string(addon.Spec.Type))
}

// isJobOutdated checks if the job belongs to an older generation of the addon
func isJobOutdated(job *batchv1.Job, addon *extensionsv1alpha1.Addon) bool {
if job.Annotations == nil {
// Jobs without generation annotation are considered outdated
return true
}

jobGenStr, exists := job.Annotations[AddonGeneration]
if !exists {
return true
}

jobGen, err := strconv.ParseInt(jobGenStr, 10, 64)
if err != nil {
return true
}

return jobGen < addon.Generation
}

func logFailedJobPodToCondError(ctx context.Context, stageCtx *stageCtx, addon *extensionsv1alpha1.Addon,
jobName, reason string) error {
podList := &corev1.PodList{}
Expand Down
44 changes: 44 additions & 0 deletions controllers/extensions/addon_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,50 @@ var _ = Describe("Addon controller", func() {
g.Expect(addon.Status.Phase).Should(Equal(extensionsv1alpha1.AddonEnabling))
}).Should(Succeed())
})

It("should cleanup outdated jobs when addon is updated", func() {
By("By create an addon with auto-install")
createAddonSpecWithRequiredAttributes(func(newOjb *extensionsv1alpha1.Addon) {
newOjb.Spec.Installable.AutoInstall = true
})

By("By addon autoInstall auto added")
enablingPhaseCheck(2)

By("By checking install job is created with generation annotation")
jobKey := client.ObjectKey{
Namespace: viper.GetString(constant.CfgKeyCtrlrMgrNS),
Name: getInstallJobName(addon),
}
Eventually(func(g Gomega) {
job := &batchv1.Job{}
g.Expect(testCtx.Cli.Get(ctx, jobKey, job)).Should(Succeed())
g.Expect(job.Annotations).Should(HaveKey(AddonGeneration))
g.Expect(job.Annotations[AddonGeneration]).Should(Equal("2"))
g.Expect(job.Spec.ActiveDeadlineSeconds).ShouldNot(BeNil())
g.Expect(*job.Spec.ActiveDeadlineSeconds).Should(Equal(int64(300)))
}).Should(Succeed())

By("By updating addon to trigger new generation")
addon = &extensionsv1alpha1.Addon{}
Expect(testCtx.Cli.Get(ctx, key, addon)).To(Not(HaveOccurred()))
addon.Spec.InstallSpec.Enabled = false
Expect(testCtx.Cli.Update(ctx, addon)).Should(Succeed())
addon.Spec.InstallSpec.Enabled = true
Expect(testCtx.Cli.Update(ctx, addon)).Should(Succeed())

By("By checking old job is deleted and new job is created")
Eventually(func(g Gomega) {
_, err := doReconcile()
g.Expect(err).To(Not(HaveOccurred()))
job := &batchv1.Job{}
err = testCtx.Cli.Get(ctx, jobKey, job)
if err == nil {
// If job exists, it should have the new generation
g.Expect(job.Annotations[AddonGeneration]).Should(Equal("4"))
}
}).Should(Succeed())
})
})

Context("Addon controller SetupWithManager", func() {
Expand Down
1 change: 1 addition & 0 deletions controllers/extensions/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
NoDeleteJobs = "extensions.kubeblocks.io/no-delete-jobs"
AddonDefaultIsEmpty = "addons.extensions.kubeblocks.io/default-is-empty"
KBVersionValidate = "addon.kubeblocks.io/kubeblocks-version"
AddonGeneration = "addon.kubeblocks.io/generation"

// label keys
AddonProvider = "addon.kubeblocks.io/provider"
Expand Down
2 changes: 2 additions & 0 deletions deploy/helm/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ spec:
value: {{ .jobTTL | quote }}
- name: ADDON_JOB_IMAGE_PULL_POLICY
value: {{ .jobImagePullPolicy | default "IfNotPresent" }}
- name: ADDON_JOB_TIMEOUT
value: {{ .jobTimeout | quote }}
{{- end }}
- name: KUBEBLOCKS_ADDON_HELM_INSTALL_OPTIONS
value: {{ join " " .Values.addonHelmInstallOptions }}
Expand Down
1 change: 1 addition & 0 deletions deploy/helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ addonController:
enabled: true
jobTTL: "5m"
jobImagePullPolicy: IfNotPresent
jobTimeout: "5m"


## @param keepAddons - keep Addon CR objects when delete this chart.
Expand Down
1 change: 1 addition & 0 deletions pkg/constant/viper_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
// addon config keys
CfgKeyAddonJobTTL = "ADDON_JOB_TTL"
CfgAddonJobImgPullPolicy = "ADDON_JOB_IMAGE_PULL_POLICY"
CfgAddonJobTimeout = "ADDON_JOB_TIMEOUT"

// addon charts config keys
CfgAddonChartsImgPullPolicy = "KUBEBLOCKS_ADDON_CHARTS_IMAGE_PULL_POLICY"
Expand Down
Loading