From df69c14b011764c4716e6aa3d67c9e1d7d828392 Mon Sep 17 00:00:00 2001 From: Andrii Dema Date: Tue, 30 Jan 2024 14:41:24 +0200 Subject: [PATCH] K8SPXC-1152: restore stucks on operator restart https://perconadev.atlassian.net/browse/K8SPXC-1152 fix unit test fix for pvc restores refactor fix tests fix `security-context` test add unit-test improvements add TODO comment --- ...cona.com_perconaxtradbclusterrestores.yaml | 20 + deploy/bundle.yaml | 20 + deploy/crd.yaml | 20 + deploy/cw-bundle.yaml | 20 + ...restore-src-restore-pvc-sec-context-oc.yml | 3 +- ...od_restore-src-restore-pvc-sec-context.yml | 3 +- pkg/apis/pxc/v1/pxc_prestore_types.go | 5 +- pkg/apis/pxc/v1/zz_generated.deepcopy.go | 1 + pkg/controller/pxc/upgrade.go | 2 +- pkg/controller/pxcrestore/controller.go | 471 +++++++++--------- .../{restore_test.go => controller_test.go} | 225 ++++++++- pkg/controller/pxcrestore/helpers_test.go | 28 +- pkg/controller/pxcrestore/restore.go | 132 ----- pkg/controller/pxcrestore/restorer.go | 111 +++-- pkg/controller/pxcrestore/util.go | 151 ++++++ pkg/k8s/setowner.go | 10 - pkg/pxc/backup/restore.go | 11 +- 17 files changed, 795 insertions(+), 438 deletions(-) rename pkg/controller/pxcrestore/{restore_test.go => controller_test.go} (56%) delete mode 100644 pkg/controller/pxcrestore/restore.go create mode 100644 pkg/controller/pxcrestore/util.go diff --git a/config/crd/bases/pxc.percona.com_perconaxtradbclusterrestores.yaml b/config/crd/bases/pxc.percona.com_perconaxtradbclusterrestores.yaml index 50e42bb776..74198d9728 100644 --- a/config/crd/bases/pxc.percona.com_perconaxtradbclusterrestores.yaml +++ b/config/crd/bases/pxc.percona.com_perconaxtradbclusterrestores.yaml @@ -354,16 +354,36 @@ spec: type: object status: properties: + clusterSize: + format: int32 + type: integer comments: type: string completed: format: date-time type: string + haproxySize: + format: int32 + type: integer lastscheduled: format: date-time type: string + proxysqlSize: + format: int32 + type: integer state: type: string + unsafeFlags: + properties: + backupIfUnhealthy: + type: boolean + proxySize: + type: boolean + pxcSize: + type: boolean + tls: + type: boolean + type: object type: object type: object served: true diff --git a/deploy/bundle.yaml b/deploy/bundle.yaml index 433a815c78..4ec4c0684b 100644 --- a/deploy/bundle.yaml +++ b/deploy/bundle.yaml @@ -597,16 +597,36 @@ spec: type: object status: properties: + clusterSize: + format: int32 + type: integer comments: type: string completed: format: date-time type: string + haproxySize: + format: int32 + type: integer lastscheduled: format: date-time type: string + proxysqlSize: + format: int32 + type: integer state: type: string + unsafeFlags: + properties: + backupIfUnhealthy: + type: boolean + proxySize: + type: boolean + pxcSize: + type: boolean + tls: + type: boolean + type: object type: object type: object served: true diff --git a/deploy/crd.yaml b/deploy/crd.yaml index 98e6861e79..1ce3914f40 100644 --- a/deploy/crd.yaml +++ b/deploy/crd.yaml @@ -597,16 +597,36 @@ spec: type: object status: properties: + clusterSize: + format: int32 + type: integer comments: type: string completed: format: date-time type: string + haproxySize: + format: int32 + type: integer lastscheduled: format: date-time type: string + proxysqlSize: + format: int32 + type: integer state: type: string + unsafeFlags: + properties: + backupIfUnhealthy: + type: boolean + proxySize: + type: boolean + pxcSize: + type: boolean + tls: + type: boolean + type: object type: object type: object served: true diff --git a/deploy/cw-bundle.yaml b/deploy/cw-bundle.yaml index 588c3c63c2..7a0fc97c59 100644 --- a/deploy/cw-bundle.yaml +++ b/deploy/cw-bundle.yaml @@ -597,16 +597,36 @@ spec: type: object status: properties: + clusterSize: + format: int32 + type: integer comments: type: string completed: format: date-time type: string + haproxySize: + format: int32 + type: integer lastscheduled: format: date-time type: string + proxysqlSize: + format: int32 + type: integer state: type: string + unsafeFlags: + properties: + backupIfUnhealthy: + type: boolean + proxySize: + type: boolean + pxcSize: + type: boolean + tls: + type: boolean + type: object type: object type: object served: true diff --git a/e2e-tests/security-context/compare/pod_restore-src-restore-pvc-sec-context-oc.yml b/e2e-tests/security-context/compare/pod_restore-src-restore-pvc-sec-context-oc.yml index 589ac59252..bedd27b02c 100644 --- a/e2e-tests/security-context/compare/pod_restore-src-restore-pvc-sec-context-oc.yml +++ b/e2e-tests/security-context/compare/pod_restore-src-restore-pvc-sec-context-oc.yml @@ -11,7 +11,8 @@ metadata: percona.com/restore-svc-name: restore-src-restore-pvc-sec-context name: restore-src-restore-pvc-sec-context ownerReferences: - - controller: true + - blockOwnerDeletion: true + controller: true kind: PerconaXtraDBClusterRestore name: restore-pvc spec: diff --git a/e2e-tests/security-context/compare/pod_restore-src-restore-pvc-sec-context.yml b/e2e-tests/security-context/compare/pod_restore-src-restore-pvc-sec-context.yml index c0c46d3328..4484d00e92 100644 --- a/e2e-tests/security-context/compare/pod_restore-src-restore-pvc-sec-context.yml +++ b/e2e-tests/security-context/compare/pod_restore-src-restore-pvc-sec-context.yml @@ -11,7 +11,8 @@ metadata: percona.com/restore-svc-name: restore-src-restore-pvc-sec-context name: restore-src-restore-pvc-sec-context ownerReferences: - - controller: true + - blockOwnerDeletion: true + controller: true kind: PerconaXtraDBClusterRestore name: restore-pvc spec: diff --git a/pkg/apis/pxc/v1/pxc_prestore_types.go b/pkg/apis/pxc/v1/pxc_prestore_types.go index accd547816..a14711a549 100644 --- a/pkg/apis/pxc/v1/pxc_prestore_types.go +++ b/pkg/apis/pxc/v1/pxc_prestore_types.go @@ -23,6 +23,10 @@ type PerconaXtraDBClusterRestoreStatus struct { Comments string `json:"comments,omitempty"` CompletedAt *metav1.Time `json:"completed,omitempty"` LastScheduled *metav1.Time `json:"lastscheduled,omitempty"` + PXCSize int32 `json:"clusterSize,omitempty"` + HAProxySize int32 `json:"haproxySize,omitempty"` + ProxySQLSize int32 `json:"proxysqlSize,omitempty"` + Unsafe UnsafeFlags `json:"unsafeFlags,omitempty"` } type PITR struct { @@ -63,7 +67,6 @@ type BcpRestoreStates string const ( RestoreNew BcpRestoreStates = "" - RestoreStarting BcpRestoreStates = "Starting" RestoreStopCluster BcpRestoreStates = "Stopping Cluster" RestoreRestore BcpRestoreStates = "Restoring" RestoreStartCluster BcpRestoreStates = "Starting Cluster" diff --git a/pkg/apis/pxc/v1/zz_generated.deepcopy.go b/pkg/apis/pxc/v1/zz_generated.deepcopy.go index d5bac9b675..d6f44851aa 100644 --- a/pkg/apis/pxc/v1/zz_generated.deepcopy.go +++ b/pkg/apis/pxc/v1/zz_generated.deepcopy.go @@ -788,6 +788,7 @@ func (in *PerconaXtraDBClusterRestoreStatus) DeepCopyInto(out *PerconaXtraDBClus in, out := &in.LastScheduled, &out.LastScheduled *out = (*in).DeepCopy() } + out.Unsafe = in.Unsafe } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PerconaXtraDBClusterRestoreStatus. diff --git a/pkg/controller/pxc/upgrade.go b/pkg/controller/pxc/upgrade.go index 0c232e2a42..e610392c81 100644 --- a/pkg/controller/pxc/upgrade.go +++ b/pkg/controller/pxc/upgrade.go @@ -625,7 +625,7 @@ func (r *ReconcilePerconaXtraDBCluster) isRestoreRunning(clusterName, namespace } switch v.Status.State { - case api.RestoreStarting, api.RestoreStopCluster, api.RestoreRestore, + case api.RestoreStopCluster, api.RestoreRestore, api.RestoreStartCluster, api.RestorePITR: return true, nil } diff --git a/pkg/controller/pxcrestore/controller.go b/pkg/controller/pxcrestore/controller.go index fc92370598..133788b73c 100644 --- a/pkg/controller/pxcrestore/controller.go +++ b/pkg/controller/pxcrestore/controller.go @@ -6,13 +6,13 @@ import ( "time" "github.com/pkg/errors" - corev1 "k8s.io/api/core/v1" + batchv1 "k8s.io/api/batch/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + k8sretry "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -90,10 +90,13 @@ type ReconcilePerconaXtraDBClusterRestore struct { func (r *ReconcilePerconaXtraDBClusterRestore) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { log := logf.FromContext(ctx) - rr := reconcile.Result{} + rr := reconcile.Result{ + // TODO: do not depend on the RequeueAfter + RequeueAfter: time.Second * 5, + } cr := &api.PerconaXtraDBClusterRestore{} - err := r.client.Get(context.TODO(), request.NamespacedName, cr) + err := r.client.Get(ctx, request.NamespacedName, cr) if err != nil { if k8serrors.IsNotFound(err) { // Request object not found, could have been deleted after reconcile request. @@ -102,305 +105,305 @@ func (r *ReconcilePerconaXtraDBClusterRestore) Reconcile(ctx context.Context, re // Error reading the object - requeue the request. return rr, err } - if cr.Status.State != api.RestoreNew { - return rr, nil - } - log.Info("backup restore request") - - err = r.setStatus(cr, api.RestoreStarting, "") - if err != nil { - return rr, errors.Wrap(err, "set status") - } - rJobsList := &api.PerconaXtraDBClusterRestoreList{} - err = r.client.List( - context.TODO(), - rJobsList, - &client.ListOptions{ - Namespace: cr.Namespace, - }, - ) - if err != nil { - return rr, errors.Wrap(err, "get restore jobs list") + switch cr.Status.State { + case api.RestoreSucceeded, api.RestoreFailed: + return reconcile.Result{}, nil } - returnMsg := fmt.Sprintf(backupRestoredMsg, cr.Name, cr.Spec.PXCCluster, cr.Name) + statusState := cr.Status.State + statusMsg := "" defer func() { - status := api.BcpRestoreStates(api.RestoreSucceeded) - if err != nil { - status = api.RestoreFailed - returnMsg = err.Error() - } - err := r.setStatus(cr, status, returnMsg) - if err != nil { - return + if err := setStatus(ctx, r.client, cr, statusState, statusMsg); err != nil { + log.Error(err, "failed to set status") } }() - for _, j := range rJobsList.Items { - if j.Spec.PXCCluster == cr.Spec.PXCCluster && - j.Name != cr.Name && j.Status.State != api.RestoreFailed && - j.Status.State != api.RestoreSucceeded { - err = errors.Errorf("unable to continue, concurent restore job %s running now.", j.Name) - return rr, err - } - } - - err = cr.CheckNsetDefaults() + otherRestore, err := isOtherRestoreInProgress(ctx, r.client, cr) if err != nil { - return rr, err + return rr, errors.Wrap(err, "failed to check if other restore is in progress") } - - cluster := new(api.PerconaXtraDBCluster) - err = r.client.Get(context.TODO(), types.NamespacedName{Name: cr.Spec.PXCCluster, Namespace: cr.Namespace}, cluster) - if err != nil { - err = errors.Wrapf(err, "get cluster %s", cr.Spec.PXCCluster) + if otherRestore != nil { + err = errors.Errorf("unable to continue, concurent restore job %s running now", otherRestore.Name) + statusState = api.RestoreFailed + statusMsg = err.Error() return rr, err } - clusterOrig := cluster.DeepCopy() - - err = cluster.CheckNSetDefaults(r.serverVersion, log) - if err != nil { - return reconcile.Result{}, fmt.Errorf("wrong PXC options: %v", err) - } - - bcp, err := r.getBackup(ctx, cr) - if err != nil { - return rr, errors.Wrap(err, "get backup") - } - - if cr.Spec.PITR != nil { - err = backup.CheckPITRErrors(ctx, r.client, r.clientcmd, cluster) - if err != nil { - return reconcile.Result{}, err - } - - annotations := cr.GetAnnotations() - _, unsafePITR := annotations[api.AnnotationUnsafePITR] - cond := meta.FindStatusCondition(bcp.Status.Conditions, api.BackupConditionPITRReady) - if cond != nil && cond.Status == metav1.ConditionFalse && !unsafePITR { - msg := fmt.Sprintf("Backup doesn't guarantee consistent recovery with PITR. Annotate PerconaXtraDBClusterRestore with %s to force it.", api.AnnotationUnsafePITR) - err = errors.New(msg) - return reconcile.Result{}, nil - } - } - err = r.validate(ctx, cr, bcp, cluster) - if err != nil { - err = errors.Wrap(err, "failed to validate restore job") + if err := cr.CheckNsetDefaults(); err != nil { + statusState = api.RestoreFailed + statusMsg = err.Error() return rr, err } - log.Info("stopping cluster", "cluster", cr.Spec.PXCCluster) - err = r.setStatus(cr, api.RestoreStopCluster, "") - if err != nil { - err = errors.Wrap(err, "set status") - return rr, err - } - err = k8s.PauseClusterWithWait(ctx, r.client, cluster, true) - if err != nil { - err = errors.Wrapf(err, "stop cluster %s", cluster.Name) - return rr, err + cluster := new(api.PerconaXtraDBCluster) + if err := r.client.Get(ctx, types.NamespacedName{Name: cr.Spec.PXCCluster, Namespace: cr.Namespace}, cluster); err != nil { + if k8serrors.IsNotFound(err) { + statusState = api.RestoreFailed + statusMsg = err.Error() + } + return rr, errors.Wrapf(err, "get cluster %s", cr.Spec.PXCCluster) } - log.Info("starting restore", "cluster", cr.Spec.PXCCluster, "backup", cr.Spec.BackupName) - err = r.setStatus(cr, api.RestoreRestore, "") - if err != nil { - err = errors.Wrap(err, "set status") - return rr, err + if err := cluster.CheckNSetDefaults(r.serverVersion, log); err != nil { + statusState = api.RestoreFailed + statusMsg = err.Error() + return rr, errors.Wrap(err, "wrong PXC options") } - err = r.restore(ctx, cr, bcp, cluster) + bcp, err := getBackup(ctx, r.client, cr) if err != nil { - err = errors.Wrap(err, "run restore") - return rr, err + statusState = api.RestoreFailed + statusMsg = err.Error() + return rr, errors.Wrap(err, "get backup") } - log.Info("starting cluster", "cluster", cr.Spec.PXCCluster) - err = r.setStatus(cr, api.RestoreStartCluster, "") + restorer, err := r.getRestorer(ctx, cr, bcp, cluster) if err != nil { - err = errors.Wrap(err, "set status") - return rr, err + return rr, errors.Wrap(err, "failed to get restorer") } - if cr.Spec.PITR != nil { - oldSize := cluster.Spec.PXC.Size - oldUnsafePXCSize := cluster.Spec.Unsafe.PXCSize - oldUnsafeProxySize := cluster.Spec.Unsafe.ProxySize + switch statusState { + case api.RestoreNew: + if cr.Spec.PITR != nil { + err = backup.CheckPITRErrors(ctx, r.client, r.clientcmd, cluster) + if err != nil { + return reconcile.Result{}, err + } + + annotations := cr.GetAnnotations() + _, unsafePITR := annotations[api.AnnotationUnsafePITR] + cond := meta.FindStatusCondition(bcp.Status.Conditions, api.BackupConditionPITRReady) + if cond != nil && cond.Status == metav1.ConditionFalse && !unsafePITR { + statusMsg = fmt.Sprintf("Backup doesn't guarantee consistent recovery with PITR. Annotate PerconaXtraDBClusterRestore with %s to force it.", api.AnnotationUnsafePITR) + return reconcile.Result{}, nil + } + } - var oldProxySQLSize int32 + err = validate(ctx, restorer, cr) + if err != nil { + if errors.Is(err, errWaitValidate) { + return rr, nil + } + err = errors.Wrap(err, "failed to validate restore job") + return rr, err + } + cr.Status.PXCSize = cluster.Spec.PXC.Size if cluster.Spec.ProxySQL != nil { - oldProxySQLSize = cluster.Spec.ProxySQL.Size + cr.Status.ProxySQLSize = cluster.Spec.ProxySQL.Size } - var oldHAProxySize int32 if cluster.Spec.HAProxy != nil { - oldHAProxySize = cluster.Spec.HAProxy.Size + cr.Status.HAProxySize = cluster.Spec.HAProxy.Size } + cr.Status.Unsafe = cluster.Spec.Unsafe - cluster.Spec.Unsafe.PXCSize = true - cluster.Spec.Unsafe.ProxySize = true - cluster.Spec.PXC.Size = 1 - - if cluster.Spec.ProxySQL != nil { - cluster.Spec.ProxySQL.Size = 0 + log.Info("stopping cluster", "cluster", cr.Spec.PXCCluster) + statusState = api.RestoreStopCluster + case api.RestoreStopCluster: + paused, err := k8s.PauseCluster(ctx, r.client, cluster) + if err != nil { + return rr, errors.Wrapf(err, "stop cluster %s", cluster.Name) } - if cluster.Spec.HAProxy != nil { - cluster.Spec.HAProxy.Size = 0 + if !paused { + log.Info("waiting for cluster to stop", "cluster", cr.Spec.PXCCluster, "msg", err.Error()) + return rr, nil } - if err := k8s.UnpauseClusterWithWait(ctx, r.client, cluster); err != nil { - return rr, errors.Wrap(err, "restart cluster for pitr") + log.Info("starting restore", "cluster", cr.Spec.PXCCluster, "backup", cr.Spec.BackupName) + if err := createRestoreJob(ctx, r.client, restorer, false); err != nil { + if errors.Is(err, errWaitInit) { + return rr, nil + } + err = errors.Wrap(err, "run restore") + return rr, err } - - log.Info("point-in-time recovering", "cluster", cr.Spec.PXCCluster) - err = r.setStatus(cr, api.RestorePITR, "") + statusState = api.RestoreRestore + case api.RestoreRestore: + restorerJob, err := restorer.Job() if err != nil { - return rr, errors.Wrap(err, "set status") + return rr, errors.Wrap(err, "failed to create restore job") + } + job := new(batchv1.Job) + if err := r.client.Get(ctx, types.NamespacedName{ + Name: restorerJob.Name, + Namespace: restorerJob.Namespace, + }, job); err != nil { + return rr, errors.Wrap(err, "failed to get restore job") } - err = r.pitr(ctx, cr, bcp, cluster) + finished, err := isJobFinished(job) if err != nil { - return rr, errors.Wrap(err, "run pitr") + statusState = api.RestoreFailed + statusMsg = err.Error() + return rr, err + } + if !finished { + log.Info("Waiting for restore job to finish", "job", job.Name) + return rr, nil } - cluster.Spec.PXC.Size = oldSize - cluster.Spec.Unsafe.PXCSize = oldUnsafePXCSize - cluster.Spec.Unsafe.ProxySize = oldUnsafeProxySize + if cr.Spec.PITR != nil { + if cluster.Spec.Pause { + err = k8sretry.RetryOnConflict(k8sretry.DefaultRetry, func() error { + current := new(api.PerconaXtraDBCluster) + err := r.client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, current) + if err != nil { + return errors.Wrap(err, "get cluster") + } + current.Spec.Pause = false + current.Spec.PXC.Size = 1 + current.Spec.Unsafe.PXCSize = true + current.Spec.Unsafe.ProxySize = true + + if cluster.Spec.ProxySQL != nil { + cluster.Spec.ProxySQL.Size = 0 + } + + if cluster.Spec.HAProxy != nil { + cluster.Spec.HAProxy.Size = 0 + } + + return r.client.Update(ctx, current) + }) + if err != nil { + return rr, errors.Wrap(err, "update cluster") + } + return rr, nil + } else { + if cluster.Status.ObservedGeneration != cluster.Generation || cluster.Status.PXC.Status != api.AppStateReady { + log.Info("Waiting for cluster to start", "cluster", cluster.Name) + return rr, nil + } + } + + log.Info("point-in-time recovering", "cluster", cr.Spec.PXCCluster) + if err := createRestoreJob(ctx, r.client, restorer, true); err != nil { + if errors.Is(err, errWaitInit) { + return rr, nil + } + return rr, errors.Wrap(err, "run pitr") + } + statusState = api.RestorePITR + return rr, nil + } - if cluster.Spec.ProxySQL != nil { - cluster.Spec.ProxySQL.Size = oldProxySQLSize + log.Info("starting cluster", "cluster", cr.Spec.PXCCluster) + statusState = api.RestoreStartCluster + case api.RestorePITR: + restorerJob, err := restorer.PITRJob() + if err != nil { + return rr, errors.Wrap(err, "failed to create restore job") } - if cluster.Spec.HAProxy != nil { - cluster.Spec.HAProxy.Size = oldHAProxySize + job := new(batchv1.Job) + if err := r.client.Get(ctx, types.NamespacedName{ + Name: restorerJob.Name, + Namespace: restorerJob.Namespace, + }, job); err != nil { + return rr, errors.Wrap(err, "failed to get pitr job") } - log.Info("starting cluster", "cluster", cr.Spec.PXCCluster) - err = r.setStatus(cr, api.RestoreStartCluster, "") + finished, err := isJobFinished(job) if err != nil { - err = errors.Wrap(err, "set status") + statusState = api.RestoreFailed + statusMsg = err.Error() return rr, err } - } + if !finished { + log.Info("Waiting for restore job to finish", "job", job.Name) + return rr, nil + } - err = k8s.UnpauseClusterWithWait(ctx, r.client, clusterOrig) - if err != nil { - err = errors.Wrap(err, "restart cluster") - return rr, err - } + log.Info("starting cluster", "cluster", cr.Spec.PXCCluster) + statusState = api.RestoreStartCluster + case api.RestoreStartCluster: + if cluster.Spec.Pause || + (cr.Status.PXCSize != 0 && cluster.Spec.PXC.Size != cr.Status.PXCSize) || + (cluster.Spec.HAProxy != nil && cr.Status.HAProxySize != 0 && cr.Status.HAProxySize != cluster.Spec.HAProxy.Size) || + (cluster.Spec.ProxySQL != nil && cr.Status.ProxySQLSize != 0 && cr.Status.ProxySQLSize != cluster.Spec.ProxySQL.Size) { + err = k8sretry.RetryOnConflict(k8sretry.DefaultRetry, func() error { + current := new(api.PerconaXtraDBCluster) + err := r.client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, current) + if err != nil { + return errors.Wrap(err, "get cluster") + } + current.Spec.Pause = false + current.Spec.PXC.Size = cr.Status.PXCSize + current.Spec.Unsafe = cr.Status.Unsafe + + if cluster.Spec.ProxySQL != nil { + cluster.Spec.ProxySQL.Size = cr.Status.ProxySQLSize + } + + if cluster.Spec.HAProxy != nil { + cluster.Spec.HAProxy.Size = cr.Status.HAProxySize + } + + return r.client.Update(ctx, current) + }) + if err != nil { + return rr, errors.Wrap(err, "update cluster") + } + } else { + if cluster.Status.ObservedGeneration == cluster.Generation && cluster.Status.PXC.Status == api.AppStateReady { + if err := restorer.Finalize(ctx); err != nil { + return rr, errors.Wrap(err, "failed to finalize restore") + } + + statusState = api.RestoreSucceeded + return rr, nil + } + } - log.Info(returnMsg) + log.Info("Waiting for cluster to start", "cluster", cluster.Name) + } - return rr, err + return rr, nil } -func (r *ReconcilePerconaXtraDBClusterRestore) getBackup(ctx context.Context, cr *api.PerconaXtraDBClusterRestore) (*api.PerconaXtraDBClusterBackup, error) { - if cr.Spec.BackupSource != nil { - status := cr.Spec.BackupSource.DeepCopy() - status.State = api.BackupSucceeded - status.CompletedAt = nil - status.LastScheduled = nil - return &api.PerconaXtraDBClusterBackup{ - ObjectMeta: metav1.ObjectMeta{ - Name: cr.Name, - Namespace: cr.Namespace, - }, - Spec: api.PXCBackupSpec{ - PXCCluster: cr.Spec.PXCCluster, - StorageName: cr.Spec.BackupSource.StorageName, - }, - Status: *status, - }, nil +func createRestoreJob(ctx context.Context, cl client.Client, restorer Restorer, pitr bool) error { + if err := restorer.Init(ctx); err != nil { + return errors.Wrap(err, "failed to init restore") } - bcp := &api.PerconaXtraDBClusterBackup{} - err := r.client.Get(ctx, types.NamespacedName{Name: cr.Spec.BackupName, Namespace: cr.Namespace}, bcp) + job, err := restorer.Job() if err != nil { - err = errors.Wrapf(err, "get backup %s", cr.Spec.BackupName) - return bcp, err + return errors.Wrap(err, "failed to get restore job") } - if bcp.Status.State != api.BackupSucceeded { - err = errors.Errorf("backup %s didn't finished yet, current state: %s", bcp.Name, bcp.Status.State) - return bcp, err - } - - return bcp, nil -} - -const backupRestoredMsg = `You can view xtrabackup log: -$ kubectl logs job/restore-job-%s-%s -If everything is fine, you can cleanup the job: -$ kubectl delete pxc-restore/%s -` - -const waitLimitSec int64 = 300 - -func (r *ReconcilePerconaXtraDBClusterRestore) waitForPodsShutdown(ls map[string]string, namespace string, gracePeriodSec int64) error { - for i := int64(0); i < waitLimitSec+gracePeriodSec; i++ { - pods := corev1.PodList{} - - err := r.client.List( - context.TODO(), - &pods, - &client.ListOptions{ - Namespace: namespace, - LabelSelector: labels.SelectorFromSet(ls), - }, - ) + if pitr { + job, err = restorer.PITRJob() if err != nil { - return errors.Wrap(err, "get pods list") - } - - if len(pods.Items) == 0 { - return nil + return errors.Wrap(err, "failed to create pitr restore job") } + } - time.Sleep(time.Second * 1) + if err := cl.Create(ctx, job); err != nil && !k8serrors.IsAlreadyExists(err) { + return errors.Wrap(err, "create job") } - return errors.Errorf("exceeded wait limit") + return nil } -func (r *ReconcilePerconaXtraDBClusterRestore) waitForPVCShutdown(ls map[string]string, namespace string) error { - for i := int64(0); i < waitLimitSec; i++ { - pvcs := corev1.PersistentVolumeClaimList{} - - err := r.client.List( - context.TODO(), - &pvcs, - &client.ListOptions{ - Namespace: namespace, - LabelSelector: labels.SelectorFromSet(ls), - }, - ) +func validate(ctx context.Context, restorer Restorer, cr *api.PerconaXtraDBClusterRestore) error { + job, err := restorer.Job() + if err != nil { + return errors.Wrap(err, "failed to create restore job") + } + if err := restorer.ValidateJob(ctx, job); err != nil { + return errors.Wrap(err, "failed to validate job") + } + + if cr.Spec.PITR != nil { + job, err := restorer.PITRJob() if err != nil { - return errors.Wrap(err, "get pvc list") + return errors.Wrap(err, "failed to create pitr restore job") } - - if len(pvcs.Items) == 1 { - return nil + if err := restorer.ValidateJob(ctx, job); err != nil { + return errors.Wrap(err, "failed to validate job") } - - time.Sleep(time.Second * 1) - } - - return errors.Errorf("exceeded wait limit") -} - -func (r *ReconcilePerconaXtraDBClusterRestore) setStatus(cr *api.PerconaXtraDBClusterRestore, state api.BcpRestoreStates, comments string) error { - cr.Status.State = state - switch state { - case api.RestoreSucceeded: - tm := metav1.NewTime(time.Now()) - cr.Status.CompletedAt = &tm } - - cr.Status.Comments = comments - - err := r.client.Status().Update(context.TODO(), cr) - if err != nil { - return errors.Wrap(err, "send update") + if err := restorer.Validate(ctx); err != nil { + return errors.Wrap(err, "failed to validate backup existence") } - return nil } diff --git a/pkg/controller/pxcrestore/restore_test.go b/pkg/controller/pxcrestore/controller_test.go similarity index 56% rename from pkg/controller/pxcrestore/restore_test.go rename to pkg/controller/pxcrestore/controller_test.go index 44e810edda..7442722b29 100644 --- a/pkg/controller/pxcrestore/restore_test.go +++ b/pkg/controller/pxcrestore/controller_test.go @@ -4,14 +4,21 @@ import ( "context" "testing" + "github.com/pkg/errors" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" api "github.com/percona/percona-xtradb-cluster-operator/pkg/apis/pxc/v1" + "github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/app/statefulset" + "github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/backup" "github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/backup/storage" fakestorage "github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/backup/storage/fake" "github.com/percona/percona-xtradb-cluster-operator/version" - "github.com/pkg/errors" ) func TestValidate(t *testing.T) { @@ -266,11 +273,15 @@ func TestValidate(t *testing.T) { r := reconciler(cl) r.newStorageClientFunc = tt.fakeStorageClientFunc - bcp, err := r.getBackup(ctx, tt.cr) + bcp, err := getBackup(ctx, cl, tt.cr) + if err != nil { + t.Fatal(err) + } + restorer, err := r.getRestorer(ctx, tt.cr, bcp, tt.cluster) if err != nil { t.Fatal(err) } - err = r.validate(ctx, tt.cr, bcp, tt.cluster) + err = validate(ctx, restorer, tt.cr) errStr := "" if err != nil { errStr = err.Error() @@ -297,3 +308,211 @@ func (c *fakeStorageClient) ListObjects(_ context.Context, _ string) ([]string, } return []string{"some-dest/backup1", "some-dest/backup2"}, nil } + +// TestOperatorRestart checks that the operator can catch up with the restore process after a restart. +// This test is run for each restore state. It runs reconcile twice for each state. Each reconcile operator should change the restore state. +// This test helps to eliminate errors such as creating an existing Pod without handling the AlreadyExists error. +func TestOperatorRestart(t *testing.T) { + ctx := context.Background() + + const clusterName = "test-cluster" + const namespace = "namespace" + const backupName = clusterName + "-backup" + const restoreName = clusterName + "-restore" + const s3SecretName = "my-cluster-name-backup-s3" + const azureSecretName = "my-cluster-name-backup-azure" + + states := []api.BcpRestoreStates{ + api.RestoreNew, + api.RestoreStopCluster, + api.RestoreRestore, + api.RestoreStartCluster, + api.RestorePITR, + } + + bcp := readDefaultBackup(t, backupName, namespace) + crSecret := readDefaultCRSecret(t, clusterName+"-secrets", namespace) + cluster := readDefaultCR(t, clusterName, namespace) + cluster.Status.PXC.Status = api.AppStateReady + cr := readDefaultRestore(t, restoreName, namespace) + cr.Spec.BackupName = backupName + cr.Spec.PXCCluster = clusterName + + tests := []struct { + name string + bcp *api.PerconaXtraDBClusterBackup + objects []runtime.Object + }{ + { + name: "s3", + bcp: updateResource(bcp, func(bcp *api.PerconaXtraDBClusterBackup) { + bcp.Status.State = api.BackupSucceeded + bcp.Status.Destination.SetS3Destination("some-dest", "dest") + bcp.Spec.StorageName = "s3-us-west" + bcp.Status.S3 = &api.BackupStorageS3Spec{ + Bucket: "some-bucket", + CredentialsSecret: s3SecretName, + } + }), + objects: []runtime.Object{readDefaultS3Secret(t, s3SecretName, namespace)}, + }, + { + name: "azure", + bcp: updateResource(bcp, func(bcp *api.PerconaXtraDBClusterBackup) { + bcp.Status.State = api.BackupSucceeded + bcp.Status.Destination.SetAzureDestination("some-dest", "dest") + bcp.Spec.StorageName = "azure-blob" + bcp.Status.Azure = &api.BackupStorageAzureSpec{ + ContainerPath: "some-bucket", + CredentialsSecret: azureSecretName, + } + }), + objects: []runtime.Object{ + updateResource(readDefaultS3Secret(t, azureSecretName, namespace), func(secret *corev1.Secret) { + secret.Data = map[string][]byte{ + "AZURE_STORAGE_ACCOUNT_NAME": []byte("some-account"), + "AZURE_STORAGE_ACCOUNT_KEY": []byte("some-key"), + } + }), + }, + }, + { + name: "pvc", + bcp: updateResource(bcp, func(bcp *api.PerconaXtraDBClusterBackup) { + bcp.Status.State = api.BackupSucceeded + bcp.Status.Destination.SetPVCDestination("some-dest") + bcp.Status.StorageType = api.BackupStorageFilesystem + }), + objects: []runtime.Object{ + &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "restore-src-" + cr.Name + "-" + cr.Spec.PXCCluster, + Namespace: cr.Namespace, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "restore-src-" + cr.Name + "-" + cr.Spec.PXCCluster + "-verify", + Namespace: cr.Namespace, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodSucceeded, + }, + }, + backup.PVCRestoreService(cr, cluster), + }, + }, + } + + for _, tt := range tests { + for _, state := range states { + if tt.bcp.Status.StorageType == api.BackupStorageFilesystem && state == api.RestorePITR { + continue + } + t.Run(tt.name+" state "+string(state), func(t *testing.T) { + cr := cr.DeepCopy() + cluster := cluster.DeepCopy() + if state == api.RestorePITR { + cr.Spec.PITR = &api.PITR{ + BackupSource: &api.PXCBackupStatus{ + StorageName: tt.bcp.Spec.StorageName, + }, + } + } + cr.Status.State = state + objects := append(tt.objects, tt.bcp, cr, cluster, crSecret, &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc1", + Namespace: namespace, + Labels: statefulset.NewNode(cluster).Labels(), + }, + }) + cl := buildFakeClient(objects...) + + r := reconciler(cl) + r.newStorageClientFunc = func(ctx context.Context, opts storage.Options) (storage.Storage, error) { + defaultFakeClient, err := fakestorage.NewFakeClient(ctx, opts) + if err != nil { + return nil, err + } + return &fakeStorageClient{defaultFakeClient, false, false}, nil + } + + if state == api.RestoreRestore || state == api.RestorePITR { + restorer, err := r.getRestorer(ctx, cr, tt.bcp, cluster) + if err != nil { + t.Fatal(err) + } + job, err := restorer.Job() + if err != nil { + t.Fatal(err) + } + if state == api.RestorePITR { + job, err = restorer.PITRJob() + if err != nil { + t.Fatal(err) + } + } + job.Status.Conditions = []batchv1.JobCondition{ + { + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + }, + } + if err := r.client.Create(ctx, job); err != nil { + t.Fatal(err) + } + } + + nn := types.NamespacedName{ + Name: cr.Name, + Namespace: cr.Namespace, + } + req := reconcile.Request{ + NamespacedName: nn, + } + + _, err := r.Reconcile(ctx, req) + if err != nil { + t.Fatal(err) + } + + restore := new(api.PerconaXtraDBClusterRestore) + if err := r.client.Get(ctx, nn, restore); err != nil { + t.Fatal(err) + } + if restore.Status.State == state { + t.Fatal("state not changed") + } + + // Assuming that the operator restarted just before the status update + restore.Status.State = state + if err := r.client.Status().Update(ctx, restore); err != nil { + t.Fatal(err) + } + _, err = r.Reconcile(ctx, req) + if err != nil { + t.Fatal(err) + } + + if err := r.client.Get(ctx, nn, restore); err != nil { + t.Fatal(err) + } + if restore.Status.State == state { + t.Fatal("state not changed") + } + }) + } + } +} diff --git a/pkg/controller/pxcrestore/helpers_test.go b/pkg/controller/pxcrestore/helpers_test.go index 3976c3d61f..8783f69750 100644 --- a/pkg/controller/pxcrestore/helpers_test.go +++ b/pkg/controller/pxcrestore/helpers_test.go @@ -14,6 +14,7 @@ import ( api "github.com/percona/percona-xtradb-cluster-operator/pkg/apis/pxc/v1" fakestorage "github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/backup/storage/fake" + "github.com/percona/percona-xtradb-cluster-operator/version" ) func readDefaultCR(t *testing.T, name, namespace string) *api.PerconaXtraDBCluster { @@ -138,6 +139,9 @@ func reconciler(cl client.Client) *ReconcilePerconaXtraDBClusterRestore { client: cl, scheme: cl.Scheme(), newStorageClientFunc: fakestorage.NewFakeClient, + serverVersion: &version.ServerVersion{ + Platform: version.PlatformKubernetes, + }, } } @@ -145,14 +149,30 @@ func reconciler(cl client.Client) *ReconcilePerconaXtraDBClusterRestore { func buildFakeClient(objs ...runtime.Object) client.Client { s := scheme.Scheme - s.AddKnownTypes(api.SchemeGroupVersion, new(api.PerconaXtraDBClusterRestore)) - s.AddKnownTypes(api.SchemeGroupVersion, new(api.PerconaXtraDBClusterBackup)) - s.AddKnownTypes(api.SchemeGroupVersion, new(api.PerconaXtraDBCluster)) + types := []runtime.Object{ + new(api.PerconaXtraDBClusterRestore), + new(api.PerconaXtraDBClusterRestoreList), + new(api.PerconaXtraDBClusterBackup), + new(api.PerconaXtraDBCluster), + } + + s.AddKnownTypes(api.SchemeGroupVersion, types...) + + toClientObj := func(objs []runtime.Object) []client.Object { + cliObjs := make([]client.Object, 0, len(objs)) + for _, obj := range objs { + cliObj, ok := obj.(client.Object) + if ok { + cliObjs = append(cliObjs, cliObj) + } + } + return cliObjs + } cl := fake.NewClientBuilder(). WithScheme(s). WithRuntimeObjects(objs...). - WithStatusSubresource(&api.PerconaXtraDBClusterRestore{}). + WithStatusSubresource(toClientObj(types)...). Build() return cl diff --git a/pkg/controller/pxcrestore/restore.go b/pkg/controller/pxcrestore/restore.go deleted file mode 100644 index 9722a03f01..0000000000 --- a/pkg/controller/pxcrestore/restore.go +++ /dev/null @@ -1,132 +0,0 @@ -package pxcrestore - -import ( - "context" - "time" - - "github.com/pkg/errors" - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" - logf "sigs.k8s.io/controller-runtime/pkg/log" - - api "github.com/percona/percona-xtradb-cluster-operator/pkg/apis/pxc/v1" - "github.com/percona/percona-xtradb-cluster-operator/pkg/k8s" -) - -func (r *ReconcilePerconaXtraDBClusterRestore) restore(ctx context.Context, cr *api.PerconaXtraDBClusterRestore, bcp *api.PerconaXtraDBClusterBackup, cluster *api.PerconaXtraDBCluster) error { - log := logf.FromContext(ctx) - - if cluster.Spec.Backup == nil { - return errors.New("undefined backup section in a cluster spec") - } - - restorer, err := r.getRestorer(ctx, cr, bcp, cluster) - if err != nil { - return errors.Wrap(err, "failed to get restorer") - } - job, err := restorer.Job() - if err != nil { - return errors.Wrap(err, "failed to get restore job") - } - if err = k8s.SetControllerReference(cr, job, r.scheme); err != nil { - return err - } - - if err = restorer.Init(ctx); err != nil { - return errors.Wrap(err, "failed to init restore") - } - defer func() { - if derr := restorer.Finalize(ctx); derr != nil { - log.Error(derr, "failed to finalize restore") - } - }() - - return r.createJob(ctx, job) -} - -func (r *ReconcilePerconaXtraDBClusterRestore) pitr(ctx context.Context, cr *api.PerconaXtraDBClusterRestore, bcp *api.PerconaXtraDBClusterBackup, cluster *api.PerconaXtraDBCluster) error { - log := logf.FromContext(ctx) - - restorer, err := r.getRestorer(ctx, cr, bcp, cluster) - if err != nil { - return errors.Wrap(err, "failed to get restorer") - } - job, err := restorer.PITRJob() - if err != nil { - return errors.Wrap(err, "failed to create pitr restore job") - } - if err := k8s.SetControllerReference(cr, job, r.scheme); err != nil { - return err - } - if err = restorer.Init(ctx); err != nil { - return errors.Wrap(err, "failed to init restore") - } - defer func() { - if derr := restorer.Finalize(ctx); derr != nil { - log.Error(derr, "failed to finalize restore") - } - }() - - return r.createJob(ctx, job) -} - -func (r *ReconcilePerconaXtraDBClusterRestore) validate(ctx context.Context, cr *api.PerconaXtraDBClusterRestore, bcp *api.PerconaXtraDBClusterBackup, cluster *api.PerconaXtraDBCluster) error { - restorer, err := r.getRestorer(ctx, cr, bcp, cluster) - if err != nil { - return errors.Wrap(err, "failed to get restorer") - } - job, err := restorer.Job() - if err != nil { - return errors.Wrap(err, "failed to create restore job") - } - if err := restorer.ValidateJob(ctx, job); err != nil { - return errors.Wrap(err, "failed to validate job") - } - - if cr.Spec.PITR != nil { - job, err := restorer.PITRJob() - if err != nil { - return errors.Wrap(err, "failed to create pitr restore job") - } - if err := restorer.ValidateJob(ctx, job); err != nil { - return errors.Wrap(err, "failed to validate job") - } - } - if err := restorer.Validate(ctx); err != nil { - return errors.Wrap(err, "failed to validate backup existence") - } - return nil -} - -func (r *ReconcilePerconaXtraDBClusterRestore) createJob(ctx context.Context, job *batchv1.Job) error { - err := r.client.Create(ctx, job) - if err != nil { - return errors.Wrap(err, "create job") - } - - for { - time.Sleep(time.Second * 1) - - checkJob := batchv1.Job{} - err := r.client.Get(ctx, types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, &checkJob) - if err != nil { - if k8serrors.IsNotFound(err) { - return nil - } - return errors.Wrap(err, "get job status") - } - for _, cond := range checkJob.Status.Conditions { - if cond.Status != corev1.ConditionTrue { - continue - } - switch cond.Type { - case batchv1.JobComplete: - return nil - case batchv1.JobFailed: - return errors.New(cond.Message) - } - } - } -} diff --git a/pkg/controller/pxcrestore/restorer.go b/pkg/controller/pxcrestore/restorer.go index e726d3b9d5..00132d5e71 100644 --- a/pkg/controller/pxcrestore/restorer.go +++ b/pkg/controller/pxcrestore/restorer.go @@ -4,7 +4,6 @@ import ( "context" "sort" "strings" - "time" "github.com/pkg/errors" batchv1 "k8s.io/api/batch/v1" @@ -13,6 +12,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" api "github.com/percona/percona-xtradb-cluster-operator/pkg/apis/pxc/v1" "github.com/percona/percona-xtradb-cluster-operator/pkg/k8s" @@ -20,6 +20,11 @@ import ( "github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/backup/storage" ) +var ( + errWaitValidate = errors.New("wait for validation") + errWaitInit = errors.New("wait for init") +) + type Restorer interface { Init(ctx context.Context) error Job() (*batchv1.Job, error) @@ -36,11 +41,11 @@ func (s *s3) Init(context.Context) error { return nil } func (s *s3) Finalize(context.Context) error { return nil } func (s *s3) Job() (*batchv1.Job, error) { - return backup.RestoreJob(s.cr, s.bcp, s.cluster, s.initImage, s.bcp.Status.Destination, false) + return backup.RestoreJob(s.cr, s.bcp, s.cluster, s.initImage, s.scheme, s.bcp.Status.Destination, false) } func (s *s3) PITRJob() (*batchv1.Job, error) { - return backup.RestoreJob(s.cr, s.bcp, s.cluster, s.initImage, s.bcp.Status.Destination, true) + return backup.RestoreJob(s.cr, s.bcp, s.cluster, s.initImage, s.scheme, s.bcp.Status.Destination, true) } func (s *s3) ValidateJob(ctx context.Context, job *batchv1.Job) error { @@ -85,40 +90,36 @@ func (s *pvc) Validate(ctx context.Context) error { if err != nil { return errors.Wrap(err, "restore pod") } - if err := k8s.SetControllerReference(s.cr, pod, s.scheme); err != nil { + if err := controllerutil.SetControllerReference(s.cr, pod, s.scheme); err != nil { return err } pod.Name += "-verify" pod.Spec.Containers[0].Command = []string{"bash", "-c", `[[ $(stat -c%s /backup/xtrabackup.stream) -gt 5000000 ]]`} pod.Spec.RestartPolicy = corev1.RestartPolicyNever - if err := s.k8sClient.Delete(ctx, pod); client.IgnoreNotFound(err) != nil { - return errors.Wrap(err, "failed to delete") - } - - if err := s.k8sClient.Create(ctx, pod); err != nil { - return errors.Wrap(err, "failed to create pod") - } - for { - time.Sleep(time.Second * 1) - - err := s.k8sClient.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod) - if err != nil { - return errors.Wrap(err, "get pod status") - } - if pod.Status.Phase == corev1.PodFailed { - return errors.Errorf("backup files not found on %s", destination) - } - if pod.Status.Phase == corev1.PodSucceeded { - break + err = s.k8sClient.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod) + if err != nil { + if k8serrors.IsNotFound(err) { + if err := s.k8sClient.Create(ctx, pod); err != nil { + return errors.Wrap(err, "failed to create pod") + } + return errWaitValidate } + return errors.Wrap(err, "get pod status") } - return nil + switch pod.Status.Phase { + case corev1.PodFailed: + return errors.Errorf("backup files not found on %s", destination) + case corev1.PodSucceeded: + return nil + default: + return errWaitValidate + } } func (s *pvc) Job() (*batchv1.Job, error) { - return backup.RestoreJob(s.cr, s.bcp, s.cluster, s.initImage, "", false) + return backup.RestoreJob(s.cr, s.bcp, s.cluster, s.initImage, s.scheme, "", false) } func (s *pvc) PITRJob() (*batchv1.Job, error) { @@ -129,55 +130,65 @@ func (s *pvc) Init(ctx context.Context) error { destination := s.bcp.Status.Destination svc := backup.PVCRestoreService(s.cr, s.cluster) - if err := k8s.SetControllerReference(s.cr, svc, s.scheme); err != nil { + if err := controllerutil.SetControllerReference(s.cr, svc, s.scheme); err != nil { return err } pod, err := backup.PVCRestorePod(s.cr, s.bcp.Status.StorageName, destination.BackupName(), s.cluster) if err != nil { return errors.Wrap(err, "restore pod") } - if err := k8s.SetControllerReference(s.cr, pod, s.scheme); err != nil { - return err - } - if err := s.k8sClient.Delete(ctx, svc); client.IgnoreNotFound(err) != nil { - return err - } - if err := s.k8sClient.Delete(ctx, pod); client.IgnoreNotFound(err) != nil { + if err := controllerutil.SetControllerReference(s.cr, pod, s.scheme); err != nil { return err } - err = s.k8sClient.Create(ctx, svc) - if err != nil { - return errors.Wrap(err, "create service") + initInProcess := true + + if err := s.k8sClient.Get(ctx, types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}, svc); k8serrors.IsNotFound(err) { + initInProcess = false } - err = s.k8sClient.Create(ctx, pod) - if err != nil { - return errors.Wrap(err, "create pod") + + if err := s.k8sClient.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: svc.Namespace}, pod); k8serrors.IsNotFound(err) { + initInProcess = false } - for { - time.Sleep(time.Second * 1) - err := s.k8sClient.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod) - if err != nil { - return errors.Wrap(err, "get pod status") + if !initInProcess { + if err := s.k8sClient.Delete(ctx, svc); client.IgnoreNotFound(err) != nil { + return err } - if pod.Status.Phase == corev1.PodRunning { - break + if err := s.k8sClient.Delete(ctx, pod); client.IgnoreNotFound(err) != nil { + return err + } + + err = s.k8sClient.Create(ctx, svc) + if client.IgnoreAlreadyExists(err) != nil { + return errors.Wrap(err, "create service") } + err = s.k8sClient.Create(ctx, pod) + if client.IgnoreAlreadyExists(err) != nil { + return errors.Wrap(err, "create pod") + } + } + + if pod.Status.Phase != corev1.PodRunning { + return errWaitInit } return nil } func (s *pvc) Finalize(ctx context.Context) error { svc := backup.PVCRestoreService(s.cr, s.cluster) - if err := s.k8sClient.Delete(ctx, svc); err != nil { + if err := s.k8sClient.Delete(ctx, svc); client.IgnoreNotFound(err) != nil { return errors.Wrap(err, "failed to delete pvc service") } pod, err := backup.PVCRestorePod(s.cr, s.bcp.Status.StorageName, s.bcp.Status.Destination.BackupName(), s.cluster) if err != nil { return err } - if err := s.k8sClient.Delete(ctx, pod); err != nil { + if err := s.k8sClient.Delete(ctx, pod); client.IgnoreNotFound(err) != nil { + return errors.Wrap(err, "failed to delete pvc pod") + } + pod.Name += "-verify" + if err := s.k8sClient.Delete(ctx, pod); client.IgnoreNotFound(err) != nil { return errors.Wrap(err, "failed to delete pvc pod") } return nil @@ -190,11 +201,11 @@ func (s *azure) Init(context.Context) error { return nil } func (s *azure) Finalize(context.Context) error { return nil } func (s *azure) Job() (*batchv1.Job, error) { - return backup.RestoreJob(s.cr, s.bcp, s.cluster, s.initImage, s.bcp.Status.Destination, false) + return backup.RestoreJob(s.cr, s.bcp, s.cluster, s.initImage, s.scheme, s.bcp.Status.Destination, false) } func (s *azure) PITRJob() (*batchv1.Job, error) { - return backup.RestoreJob(s.cr, s.bcp, s.cluster, s.initImage, s.bcp.Status.Destination, true) + return backup.RestoreJob(s.cr, s.bcp, s.cluster, s.initImage, s.scheme, s.bcp.Status.Destination, true) } func (s *azure) Validate(ctx context.Context) error { diff --git a/pkg/controller/pxcrestore/util.go b/pkg/controller/pxcrestore/util.go new file mode 100644 index 0000000000..9903f7d0b6 --- /dev/null +++ b/pkg/controller/pxcrestore/util.go @@ -0,0 +1,151 @@ +package pxcrestore + +import ( + "context" + "time" + + "github.com/pkg/errors" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + api "github.com/percona/percona-xtradb-cluster-operator/pkg/apis/pxc/v1" +) + +var ( + errWaitingPods = errors.New("waiting for pods to be deleted") + errWaitingPVC = errors.New("waiting for pvc to be deleted") +) + +func getBackup(ctx context.Context, cl client.Client, cr *api.PerconaXtraDBClusterRestore) (*api.PerconaXtraDBClusterBackup, error) { + if cr.Spec.BackupSource != nil { + status := cr.Spec.BackupSource.DeepCopy() + status.State = api.BackupSucceeded + status.CompletedAt = nil + status.LastScheduled = nil + return &api.PerconaXtraDBClusterBackup{ + ObjectMeta: metav1.ObjectMeta{ + Name: cr.Name, + Namespace: cr.Namespace, + }, + Spec: api.PXCBackupSpec{ + PXCCluster: cr.Spec.PXCCluster, + StorageName: cr.Spec.BackupSource.StorageName, + }, + Status: *status, + }, nil + } + + bcp := &api.PerconaXtraDBClusterBackup{} + err := cl.Get(ctx, types.NamespacedName{Name: cr.Spec.BackupName, Namespace: cr.Namespace}, bcp) + if err != nil { + err = errors.Wrapf(err, "get backup %s", cr.Spec.BackupName) + return bcp, err + } + if bcp.Status.State != api.BackupSucceeded { + err = errors.Errorf("backup %s didn't finished yet, current state: %s", bcp.Name, bcp.Status.State) + return bcp, err + } + + return bcp, nil +} + +func setStatus(ctx context.Context, cl client.Client, cr *api.PerconaXtraDBClusterRestore, state api.BcpRestoreStates, comments string) error { + cr.Status.State = state + switch state { + case api.RestoreSucceeded: + tm := metav1.NewTime(time.Now()) + cr.Status.CompletedAt = &tm + } + + cr.Status.Comments = comments + + err := cl.Status().Update(ctx, cr) + if err != nil { + return errors.Wrap(err, "send update") + } + + return nil +} + +func isOtherRestoreInProgress(ctx context.Context, cl client.Client, cr *api.PerconaXtraDBClusterRestore) (*api.PerconaXtraDBClusterRestore, error) { + rJobsList := &api.PerconaXtraDBClusterRestoreList{} + err := cl.List( + ctx, + rJobsList, + &client.ListOptions{ + Namespace: cr.Namespace, + }, + ) + if err != nil { + return nil, errors.Wrap(err, "get restore jobs list") + } + + for _, j := range rJobsList.Items { + if j.Spec.PXCCluster == cr.Spec.PXCCluster && + j.Name != cr.Name && j.Status.State != api.RestoreFailed && + j.Status.State != api.RestoreSucceeded { + return &j, nil + } + } + return nil, nil +} + +func isClusterStopped(ctx context.Context, cl client.Client, ls map[string]string, namespace string) (bool, error) { + pods := corev1.PodList{} + + err := cl.List( + ctx, + &pods, + &client.ListOptions{ + Namespace: namespace, + LabelSelector: labels.SelectorFromSet(ls), + }, + ) + if err != nil { + return false, errors.Wrap(err, "get pods list") + } + + return len(pods.Items) == 0, nil +} + +func isPVCDeleted(ctx context.Context, cl client.Client, ls map[string]string, namespace string) (bool, error) { + pvcs := corev1.PersistentVolumeClaimList{} + + err := cl.List( + ctx, + &pvcs, + &client.ListOptions{ + Namespace: namespace, + LabelSelector: labels.SelectorFromSet(ls), + }, + ) + if err != nil { + return false, errors.Wrap(err, "get pvc list") + } + + if len(pvcs.Items) == 1 { + return true, nil + } + + return false, nil +} + +func isJobFinished(checkJob *batchv1.Job) (bool, error) { + for _, c := range checkJob.Status.Conditions { + if c.Status != corev1.ConditionTrue { + continue + } + + switch c.Type { + case batchv1.JobComplete: + return true, nil + case batchv1.JobFailed: + return false, errors.Errorf("job %s failed: %s", checkJob.Name, c.Message) + } + } + return false, nil +} diff --git a/pkg/k8s/setowner.go b/pkg/k8s/setowner.go index afa285abfe..16baff4d4b 100644 --- a/pkg/k8s/setowner.go +++ b/pkg/k8s/setowner.go @@ -7,16 +7,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) -// SetControllerReference sets owner as a owner for the object obj -func SetControllerReference(owner runtime.Object, obj metav1.Object, scheme *runtime.Scheme) error { - ownerRef, err := OwnerRef(owner, scheme) - if err != nil { - return err - } - obj.SetOwnerReferences(append(obj.GetOwnerReferences(), ownerRef)) - return nil -} - // OwnerRef returns OwnerReference to object func OwnerRef(ro runtime.Object, scheme *runtime.Scheme) (metav1.OwnerReference, error) { gvk, err := apiutil.GVKForObject(ro, scheme) diff --git a/pkg/pxc/backup/restore.go b/pkg/pxc/backup/restore.go index b4e45118ba..5450ca62b3 100644 --- a/pkg/pxc/backup/restore.go +++ b/pkg/pxc/backup/restore.go @@ -10,6 +10,8 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" logf "sigs.k8s.io/controller-runtime/pkg/log" api "github.com/percona/percona-xtradb-cluster-operator/pkg/apis/pxc/v1" @@ -151,7 +153,7 @@ func PVCRestorePod(cr *api.PerconaXtraDBClusterRestore, bcpStorageName, pvcName }, nil } -func RestoreJob(cr *api.PerconaXtraDBClusterRestore, bcp *api.PerconaXtraDBClusterBackup, cluster *api.PerconaXtraDBCluster, initImage string, destination api.PXCBackupDestination, pitr bool) (*batchv1.Job, error) { +func RestoreJob(cr *api.PerconaXtraDBClusterRestore, bcp *api.PerconaXtraDBClusterBackup, cluster *api.PerconaXtraDBCluster, initImage string, scheme *runtime.Scheme, destination api.PXCBackupDestination, pitr bool) (*batchv1.Job, error) { switch bcp.Status.GetStorageType(cluster) { case api.BackupStorageAzure: if bcp.Status.Azure == nil { @@ -301,6 +303,13 @@ func RestoreJob(cr *api.PerconaXtraDBClusterRestore, bcp *api.PerconaXtraDBClust if cluster.CompareVersionWith("1.16.0") < 0 { job.Labels = cluster.Spec.PXC.Labels } + + if err := controllerutil.SetControllerReference(cr, job, scheme); err != nil { + return nil, errors.Wrap(err, "set controller reference") + } + for i := range job.OwnerReferences { + job.OwnerReferences[i].BlockOwnerDeletion = nil + } return job, nil }