Skip to content

Commit

Permalink
Check state before activation
Browse files Browse the repository at this point in the history
Activation must happen only if state `status`-key is empty.

Closes #121

---

Pull Request resolved: #123

Co-authored-by: tserakhau <[email protected]>
commit_hash:4f0a1f99628b9249fa47512506abbbd59256e20e
  • Loading branch information
laskoviymishka authored and robot-piglet committed Nov 29, 2024
1 parent caa93eb commit 6919a5d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
30 changes: 25 additions & 5 deletions cmd/trcli/replicate/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ func replicate(cp *coordinator.Coordinator, rt abstract.Runtime, transferYaml *s
return xerrors.Errorf("unable to load transfer: %w", err)
}
transfer.Runtime = rt
if !transfer.IncrementOnly() {
if err := activate.RunActivate(*cp, transfer, registry, 0); err != nil {
return xerrors.Errorf("unable to activate transfer: %w", err)
}
}

return RunReplication(*cp, transfer, registry)
}
Expand All @@ -48,6 +43,28 @@ func RunReplication(cp coordinator.Coordinator, transfer *model.Transfer, regist
if err := provideradapter.ApplyForTransfer(transfer); err != nil {
return xerrors.Errorf("unable to adapt transfer: %w", err)
}
st, err := cp.GetTransferState(transfer.ID)
if err != nil {
return xerrors.Errorf("unable to get transfer state: %w", err)
}
if stt, ok := st["status"]; !ok || stt.Generic == nil {
if err := activate.RunActivate(cp, transfer, registry, 0); err != nil {
return xerrors.Errorf("unable to activate transfer: %w", err)
}
if err := cp.SetTransferState(transfer.ID, map[string]*coordinator.TransferStateData{
"status": {
Generic: "activated",
IncrementalTables: nil,
OraclePosition: nil,
MysqlGtid: nil,
MysqlBinlogPosition: nil,
YtStaticPart: nil,
},
}); err != nil {
return xerrors.Errorf("unable to set transfer state: %w", err)
}
}

for {
worker := local.NewLocalWorker(
cp,
Expand All @@ -60,6 +77,9 @@ func RunReplication(cp coordinator.Coordinator, transfer *model.Transfer, regist
)
err := worker.Run()
if abstract.IsFatal(err) {
if err := (cp).RemoveTransferState(transfer.ID, []string{"status"}); err != nil {
return xerrors.Errorf("unable to cleanup status state: %w", err)
}
return err
}
if err := worker.Stop(); err != nil {
Expand Down
3 changes: 0 additions & 3 deletions cmd/trcli/replicate/tests/pg2ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
"time"

"github.com/doublecloud/transfer/cmd/trcli/activate"
"github.com/doublecloud/transfer/cmd/trcli/config"
"github.com/doublecloud/transfer/cmd/trcli/replicate"
"github.com/doublecloud/transfer/internal/logger"
Expand Down Expand Up @@ -40,8 +39,6 @@ func TestReplicate(t *testing.T) {
transfer.Src = src
transfer.Dst = dst

require.NoError(t, activate.RunActivate(coordinator.NewStatefulFakeClient(), transfer, solomon.NewRegistry(solomon.NewRegistryOpts()), 0)) // so that a replication slot is created for source

go func() {
require.NoError(t, replicate.RunReplication(coordinator.NewStatefulFakeClient(), transfer, solomon.NewRegistry(solomon.NewRegistryOpts())))
}()
Expand Down

0 comments on commit 6919a5d

Please sign in to comment.