From 6919a5de051685e71b5462e30cd00db7882ce18e Mon Sep 17 00:00:00 2001 From: tserakhau Date: Fri, 29 Nov 2024 11:47:26 +0300 Subject: [PATCH] Check state before activation Activation must happen only if state `status`-key is empty. Closes #121 --- Pull Request resolved: https://github.com/doublecloud/transfer/pull/123 Co-authored-by: tserakhau commit_hash:4f0a1f99628b9249fa47512506abbbd59256e20e --- cmd/trcli/replicate/replicate.go | 30 ++++++++++++++++++++----- cmd/trcli/replicate/tests/pg2ch_test.go | 3 --- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/cmd/trcli/replicate/replicate.go b/cmd/trcli/replicate/replicate.go index a907431d..1509fd81 100644 --- a/cmd/trcli/replicate/replicate.go +++ b/cmd/trcli/replicate/replicate.go @@ -34,11 +34,6 @@ func replicate(cp *coordinator.Coordinator, rt abstract.Runtime, transferYaml *s return xerrors.Errorf("unable to load transfer: %w", err) } transfer.Runtime = rt - if !transfer.IncrementOnly() { - if err := activate.RunActivate(*cp, transfer, registry, 0); err != nil { - return xerrors.Errorf("unable to activate transfer: %w", err) - } - } return RunReplication(*cp, transfer, registry) } @@ -48,6 +43,28 @@ func RunReplication(cp coordinator.Coordinator, transfer *model.Transfer, regist if err := provideradapter.ApplyForTransfer(transfer); err != nil { return xerrors.Errorf("unable to adapt transfer: %w", err) } + st, err := cp.GetTransferState(transfer.ID) + if err != nil { + return xerrors.Errorf("unable to get transfer state: %w", err) + } + if stt, ok := st["status"]; !ok || stt.Generic == nil { + if err := activate.RunActivate(cp, transfer, registry, 0); err != nil { + return xerrors.Errorf("unable to activate transfer: %w", err) + } + if err := cp.SetTransferState(transfer.ID, map[string]*coordinator.TransferStateData{ + "status": { + Generic: "activated", + IncrementalTables: nil, + OraclePosition: nil, + MysqlGtid: nil, + MysqlBinlogPosition: nil, + YtStaticPart: nil, + }, + }); err != nil { + return xerrors.Errorf("unable to set transfer state: %w", err) + } + } + for { worker := local.NewLocalWorker( cp, @@ -60,6 +77,9 @@ func RunReplication(cp coordinator.Coordinator, transfer *model.Transfer, regist ) err := worker.Run() if abstract.IsFatal(err) { + if err := (cp).RemoveTransferState(transfer.ID, []string{"status"}); err != nil { + return xerrors.Errorf("unable to cleanup status state: %w", err) + } return err } if err := worker.Stop(); err != nil { diff --git a/cmd/trcli/replicate/tests/pg2ch_test.go b/cmd/trcli/replicate/tests/pg2ch_test.go index 69318ef9..4f8ed544 100644 --- a/cmd/trcli/replicate/tests/pg2ch_test.go +++ b/cmd/trcli/replicate/tests/pg2ch_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/doublecloud/transfer/cmd/trcli/activate" "github.com/doublecloud/transfer/cmd/trcli/config" "github.com/doublecloud/transfer/cmd/trcli/replicate" "github.com/doublecloud/transfer/internal/logger" @@ -40,8 +39,6 @@ func TestReplicate(t *testing.T) { transfer.Src = src transfer.Dst = dst - require.NoError(t, activate.RunActivate(coordinator.NewStatefulFakeClient(), transfer, solomon.NewRegistry(solomon.NewRegistryOpts()), 0)) // so that a replication slot is created for source - go func() { require.NoError(t, replicate.RunReplication(coordinator.NewStatefulFakeClient(), transfer, solomon.NewRegistry(solomon.NewRegistryOpts()))) }()