diff --git a/.mapping.json b/.mapping.json index 2a702bb1..bf126575 100644 --- a/.mapping.json +++ b/.mapping.json @@ -2605,6 +2605,8 @@ "tests/e2e/pg2pg/bytea_key/check_db_test.go":"transfer_manager/go/tests/e2e/pg2pg/bytea_key/check_db_test.go", "tests/e2e/pg2pg/bytea_key/init_source/dump.sql":"transfer_manager/go/tests/e2e/pg2pg/bytea_key/init_source/dump.sql", "tests/e2e/pg2pg/bytea_key/init_target/dump.sql":"transfer_manager/go/tests/e2e/pg2pg/bytea_key/init_target/dump.sql", + "tests/e2e/pg2pg/dblog/dblog_test.go":"transfer_manager/go/tests/e2e/pg2pg/dblog/dblog_test.go", + "tests/e2e/pg2pg/dblog/dump/dump.sql":"transfer_manager/go/tests/e2e/pg2pg/dblog/dump/dump.sql", "tests/e2e/pg2pg/debezium/all_datatypes/check_db_test.go":"transfer_manager/go/tests/e2e/pg2pg/debezium/all_datatypes/check_db_test.go", "tests/e2e/pg2pg/debezium/all_datatypes/init_source/dump.sql":"transfer_manager/go/tests/e2e/pg2pg/debezium/all_datatypes/init_source/dump.sql", "tests/e2e/pg2pg/debezium/all_datatypes/init_target/init.sql":"transfer_manager/go/tests/e2e/pg2pg/debezium/all_datatypes/init_target/init.sql", diff --git a/pkg/dblog/incremental_async_sink.go b/pkg/dblog/incremental_async_sink.go index 326146ef..460e5769 100644 --- a/pkg/dblog/incremental_async_sink.go +++ b/pkg/dblog/incremental_async_sink.go @@ -3,16 +3,17 @@ package dblog import ( "context" - "github.com/doublecloud/transfer/internal/logger" "github.com/doublecloud/transfer/library/go/core/xerrors" "github.com/doublecloud/transfer/pkg/abstract" "github.com/doublecloud/transfer/pkg/util" "github.com/google/uuid" + "go.ytsaurus.tech/library/go/core/log" "golang.org/x/exp/maps" ) type IncrementalAsyncSink struct { - ctx context.Context + ctx context.Context + logger log.Logger signalTable SignalTable @@ -32,6 +33,7 @@ type IncrementalAsyncSink struct { func NewIncrementalAsyncSink( ctx context.Context, + logger log.Logger, signalTable SignalTable, table abstract.TableID, tableIterator *IncrementalIterator, @@ -43,6 +45,7 @@ func NewIncrementalAsyncSink( ) *IncrementalAsyncSink { asyncSink := &IncrementalAsyncSink{ ctx: ctx, + logger: logger, signalTable: signalTable, tableID: table, tableIterator: tableIterator, @@ -87,10 +90,10 @@ func (s *IncrementalAsyncSink) AsyncPush(items []abstract.ChangeItem) chan error } if ok, watermarkType := s.signalTable.IsWatermark(&item, s.tableID, s.expectedUUID()); ok { - logger.Log.Info("watermark found") + s.logger.Info("watermark found") if !s.isExpectedWatermarkOfType(watermarkType) { - logger.Log.Info("wrong watermark found") + s.logger.Info("wrong watermark found") continue } @@ -143,7 +146,7 @@ func (s *IncrementalAsyncSink) AsyncPush(items []abstract.ChangeItem) chan error encodedKey := stringArrToString(keyValue, defaultSeparator) if _, ok = s.chunk[encodedKey]; ok { - logger.Log.Infof("found primary key from chunk: %s", keyValue) + s.logger.Infof("found primary key from chunk: %s", keyValue) delete(s.chunk, encodedKey) } } diff --git a/pkg/dblog/incremental_iterator.go b/pkg/dblog/incremental_iterator.go index d6a94e03..2082c106 100644 --- a/pkg/dblog/incremental_iterator.go +++ b/pkg/dblog/incremental_iterator.go @@ -7,9 +7,12 @@ import ( "github.com/doublecloud/transfer/pkg/abstract" "github.com/doublecloud/transfer/pkg/dblog/tablequery" "github.com/google/uuid" + "go.ytsaurus.tech/library/go/core/log" ) type IncrementalIterator struct { + logger log.Logger + storage tablequery.StorageTableQueryable tableQuery *tablequery.TableQuery signalTable SignalTable @@ -26,6 +29,7 @@ type IncrementalIterator struct { } func NewIncrementalIterator( + logger log.Logger, storage tablequery.StorageTableQueryable, tableQuery *tablequery.TableQuery, signalTable SignalTable, @@ -36,6 +40,7 @@ func NewIncrementalIterator( betweenMarksOpts ...func(), ) (*IncrementalIterator, error) { iter := &IncrementalIterator{ + logger: logger, storage: storage, tableQuery: tableQuery, signalTable: signalTable, @@ -53,7 +58,7 @@ func NewIncrementalIterator( func (i *IncrementalIterator) Next(ctx context.Context) ([]abstract.ChangeItem, error) { i.tableQuery.Filter = MakeNextWhereStatement(i.pkColNames, i.lowBound) - + i.logger.Infof("IncrementalIterator::Next - i.tableQuery.Filter: %s", i.tableQuery.Filter) return i.loadTablePart(ctx) } @@ -63,6 +68,8 @@ func (i *IncrementalIterator) loadTablePart(ctx context.Context) ([]abstract.Cha return nil, xerrors.Errorf("Failed to create watermark when selecting chunk: %w", err) } + i.logger.Infof("created low watermark, uuid: %s", lowWatermarkUUID.String()) + i.LowWatermarkUUID = lowWatermarkUUID chunk := make([]abstract.ChangeItem, 0, i.limit) @@ -98,6 +105,8 @@ func (i *IncrementalIterator) loadTablePart(ctx context.Context) ([]abstract.Cha return nil, xerrors.Errorf("Failed to create watermark when selecting chunk") } + i.logger.Infof("created high watermark, uuid: %s", highWatermarkUUID.String()) + i.HighWatermarkUUID = highWatermarkUUID return chunk, nil diff --git a/pkg/providers/postgres/dblog/signal_table.go b/pkg/providers/postgres/dblog/signal_table.go index 5829348d..23dd05f6 100644 --- a/pkg/providers/postgres/dblog/signal_table.go +++ b/pkg/providers/postgres/dblog/signal_table.go @@ -3,6 +3,7 @@ package dblog import ( "context" "fmt" + "strings" "github.com/doublecloud/transfer/library/go/core/xerrors" "github.com/doublecloud/transfer/pkg/abstract" @@ -19,7 +20,7 @@ type ( ) const ( - signalTableName = "__data_transfer_signal_table" + SignalTableName = "__data_transfer_signal_table" tableSchemaColumnIndex = 0 tableNameColumnIndex = 1 tableTransferIDIndex = 2 @@ -27,14 +28,19 @@ const ( markTypeColumnIndex = 4 ) +func SignalTableTableID(schemaName string) *abstract.TableID { + return abstract.NewTableID(schemaName, SignalTableName) +} + type signalTable struct { conn *pgxpool.Pool logger log.Logger transferID string + schemaName string } -func buildSignalTableDDL() string { - query := `CREATE TABLE IF NOT EXISTS %s +func buildSignalTableDDL(schemaName string) string { + query := `CREATE TABLE IF NOT EXISTS "%s"."%s" ( table_schema TEXT, table_name TEXT, @@ -45,20 +51,21 @@ func buildSignalTableDDL() string { PRIMARY KEY (table_schema, table_name, transfer_id, mark_type) );` - return fmt.Sprintf(query, signalTableName) + return fmt.Sprintf(query, schemaName, SignalTableName) } -func newPgSignalTable( +func NewPgSignalTable( ctx context.Context, conn *pgxpool.Pool, logger log.Logger, transferID string, + schemaName string, ) (*signalTable, error) { - pgSignalTable := &signalTable{ conn: conn, logger: logger, transferID: transferID, + schemaName: schemaName, } if err := pgSignalTable.init(ctx); err != nil { @@ -70,7 +77,7 @@ func newPgSignalTable( func (s *signalTable) init(ctx context.Context) error { return s.tx(ctx, func(tx pgx.Tx) error { - if _, err := tx.Exec(ctx, buildSignalTableDDL()); err != nil { + if _, err := tx.Exec(ctx, buildSignalTableDDL(s.schemaName)); err != nil { return xerrors.Errorf("failed to ensure existence of the signal table service table: %w", err) } return nil @@ -125,9 +132,20 @@ func (s *signalTable) CreateWatermark( return xerrors.Errorf("unable to convert low bound array to string") } + query := s.makeWatermarkQuery() + s.logger.Info( + fmt.Sprintf("CreateWatermark - query: %s", strings.ReplaceAll(query, "\n", "")), + log.String("tableID.Namespace", tableID.Namespace), + log.String("tableID.Name", tableID.Name), + log.String("s.transferID", s.transferID), + log.String("newUUID", newUUID.String()), + log.String("watermarkType", string(watermarkType)), + log.String("lowBoundStr", lowBoundStr), + ) + _, err = tx.Exec( ctx, - s.makeWatermarkQuery(), + query, tableID.Namespace, tableID.Name, s.transferID, @@ -151,7 +169,7 @@ func (s *signalTable) CreateWatermark( } func (s *signalTable) IsWatermark(item *abstract.ChangeItem, tableID abstract.TableID, markUUID uuid.UUID) (bool, dblog.WatermarkType) { - isWatermark := item.Table == signalTableName + isWatermark := item.Table == SignalTableName if !isWatermark { return false, dblog.BadWatermarkType } @@ -183,14 +201,14 @@ func (s *signalTable) IsWatermark(item *abstract.ChangeItem, tableID abstract.Ta } func (s *signalTable) makeWatermarkQuery() string { - query := `INSERT INTO %s (table_schema, table_name, transfer_id, mark, mark_type, low_bound) + query := `INSERT INTO "%s"."%s" (table_schema, table_name, transfer_id, mark, mark_type, low_bound) VALUES (($1), ($2), ($3), ($4), ($5), ($6)) ON CONFLICT (table_schema, table_name, transfer_id, mark_type) DO UPDATE SET mark = EXCLUDED.mark, low_bound = EXCLUDED.low_bound;` - return fmt.Sprintf(query, signalTableName) + return fmt.Sprintf(query, s.schemaName, SignalTableName) } func (s *signalTable) resolveLowBound(ctx context.Context, tableID abstract.TableID) []string { @@ -220,11 +238,11 @@ func (s *signalTable) resolveLowBound(ctx context.Context, tableID abstract.Tabl } func (s *signalTable) resolveLowBoundQuery() string { - query := `SELECT low_bound FROM %s + query := `SELECT low_bound FROM "%s"."%s" WHERE table_schema = ($1) AND table_name = ($2) AND transfer_id = ($3) AND mark_type = ($4);` - return fmt.Sprintf(query, signalTableName) + return fmt.Sprintf(query, s.schemaName, SignalTableName) } diff --git a/pkg/providers/postgres/dblog/storage.go b/pkg/providers/postgres/dblog/storage.go index 42af0ce2..d036f402 100644 --- a/pkg/providers/postgres/dblog/storage.go +++ b/pkg/providers/postgres/dblog/storage.go @@ -3,15 +3,17 @@ package dblog import ( "context" - "github.com/doublecloud/transfer/internal/logger" "github.com/doublecloud/transfer/library/go/core/xerrors" "github.com/doublecloud/transfer/pkg/abstract" "github.com/doublecloud/transfer/pkg/dblog" "github.com/doublecloud/transfer/pkg/dblog/tablequery" "github.com/jackc/pgx/v4/pgxpool" + "go.ytsaurus.tech/library/go/core/log" ) type Storage struct { + logger log.Logger + src abstract.Source pgStorage tablequery.StorageTableQueryable conn *pgxpool.Pool @@ -20,24 +22,29 @@ type Storage struct { transferID string represent dblog.ChangeItemConverter + keeperSchema string betweenMarksOpts []func() } func NewStorage( + logger log.Logger, src abstract.Source, pgStorage tablequery.StorageTableQueryable, conn *pgxpool.Pool, chunkSize uint64, transferID string, + keeperSchema string, represent dblog.ChangeItemConverter, betweenMarksOpts ...func(), ) (abstract.Storage, error) { return &Storage{ + logger: log.With(logger, log.Any("component", "dblog")), src: src, pgStorage: pgStorage, conn: conn, chunkSize: chunkSize, transferID: transferID, + keeperSchema: keeperSchema, represent: represent, betweenMarksOpts: betweenMarksOpts, }, nil @@ -64,16 +71,23 @@ func (s *Storage) LoadTable(ctx context.Context, tableDescr abstract.TableDescri if err != nil { return xerrors.Errorf("unable to generate chunk size: %w", err) } + s.logger.Infof("Storage::LoadTable - inferred chunkSize: %d", chunkSize) + } else { + s.logger.Infof("Storage::LoadTable - from config chunkSize: %d", chunkSize) } - pgSignalTable, err := newPgSignalTable(ctx, s.conn, logger.Log, s.transferID) + pgSignalTable, err := NewPgSignalTable(ctx, s.conn, s.logger, s.transferID, s.keeperSchema) if err != nil { return xerrors.Errorf("unable to create signal table: %w", err) } tableQuery := tablequery.NewTableQuery(tableDescr.ID(), true, "", 0, chunkSize) + s.logger.Infof("Storage::LoadTable - tableQuery: %v", tableQuery) lowBound := pgSignalTable.resolveLowBound(ctx, tableDescr.ID()) + s.logger.Infof("Storage::LoadTable - lowBound: %v", lowBound) + iterator, err := dblog.NewIncrementalIterator( + s.logger, s.pgStorage, tableQuery, pgSignalTable, @@ -92,6 +106,8 @@ func (s *Storage) LoadTable(ctx context.Context, tableDescr abstract.TableDescri return xerrors.Errorf("failed to do initial iteration: %w", err) } + s.logger.Infof("Storage::LoadTable - first iteration done, extacted items: %d", len(items)) + chunk, err := dblog.ResolveChunkMapFromArr(items, pkColNames, s.represent) if err != nil { return xerrors.Errorf("failed to resolve chunk: %w", err) @@ -99,6 +115,7 @@ func (s *Storage) LoadTable(ctx context.Context, tableDescr abstract.TableDescri asyncSink := dblog.NewIncrementalAsyncSink( ctx, + s.logger, pgSignalTable, tableDescr.ID(), iterator, diff --git a/pkg/providers/postgres/dblog/tests/alltypes/check_all_types_test.go b/pkg/providers/postgres/dblog/tests/alltypes/check_all_types_test.go index b31d27c5..baecbbeb 100644 --- a/pkg/providers/postgres/dblog/tests/alltypes/check_all_types_test.go +++ b/pkg/providers/postgres/dblog/tests/alltypes/check_all_types_test.go @@ -6,6 +6,7 @@ import ( "os" "testing" + "github.com/doublecloud/transfer/internal/logger" "github.com/doublecloud/transfer/pkg/abstract" "github.com/doublecloud/transfer/pkg/dblog" "github.com/doublecloud/transfer/pkg/dblog/tablequery" @@ -199,13 +200,15 @@ func TestIncrementalSnapshot(t *testing.T) { tableQuery := tablequery.NewTableQuery(tableDescription.ID(), true, "", 0, defaultLimit) iterator, err := dblog.NewIncrementalIterator( + logger.Log, storage, tableQuery, signalTable, postgres.Represent, primaryKey, nil, - defaultLimit) + defaultLimit, + ) require.NoError(t, err) diff --git a/pkg/providers/postgres/dblog/tests/changing_chunk/changing_chunk_test.go b/pkg/providers/postgres/dblog/tests/changing_chunk/changing_chunk_test.go index bc396884..fffe59ca 100644 --- a/pkg/providers/postgres/dblog/tests/changing_chunk/changing_chunk_test.go +++ b/pkg/providers/postgres/dblog/tests/changing_chunk/changing_chunk_test.go @@ -71,7 +71,7 @@ func TestIncrementalSnapshot(t *testing.T) { coordinator.NewFakeClient()) require.NoError(t, err) - storage, err := dblog.NewStorage(src, pgStorage, pgStorage.Conn, incrementalLimit, Source.SlotID, pgsink.Represent, opt) + storage, err := dblog.NewStorage(logger.Log, src, pgStorage, pgStorage.Conn, incrementalLimit, Source.SlotID, "public", pgsink.Represent, opt) require.NoError(t, err) sourceTables, err := storage.TableList(nil) diff --git a/pkg/providers/postgres/dblog/tests/changing_chunk/update_pk_test.go b/pkg/providers/postgres/dblog/tests/changing_chunk/update_pk_test.go index 0a242cbe..79c3c5f2 100644 --- a/pkg/providers/postgres/dblog/tests/changing_chunk/update_pk_test.go +++ b/pkg/providers/postgres/dblog/tests/changing_chunk/update_pk_test.go @@ -70,7 +70,7 @@ func TestUpdateKey(t *testing.T) { coordinator.NewFakeClient()) require.NoError(t, err) - storage, err := dblog.NewStorage(src, pgStorage, pgStorage.Conn, incrementalLimit, Source.SlotID, pgsink.Represent, opt) + storage, err := dblog.NewStorage(logger.Log, src, pgStorage, pgStorage.Conn, incrementalLimit, Source.SlotID, "public", pgsink.Represent, opt) require.NoError(t, err) sourceTables, err := storage.TableList(nil) diff --git a/pkg/providers/postgres/dblog/tests/composite_key/check_composite_key_test.go b/pkg/providers/postgres/dblog/tests/composite_key/check_composite_key_test.go index a1e79034..9c0cccb5 100644 --- a/pkg/providers/postgres/dblog/tests/composite_key/check_composite_key_test.go +++ b/pkg/providers/postgres/dblog/tests/composite_key/check_composite_key_test.go @@ -75,7 +75,7 @@ func TestIncrementalSnapshot(t *testing.T) { coordinator.NewFakeClient()) require.NoError(t, err) - storage, err := dblog.NewStorage(src, pgStorage, pgStorage.Conn, incrementalLimit, Source.SlotID, pgsink.Represent) + storage, err := dblog.NewStorage(logger.Log, src, pgStorage, pgStorage.Conn, incrementalLimit, Source.SlotID, "public", pgsink.Represent) require.NoError(t, err) sourceTables, err := storage.TableList(nil) diff --git a/pkg/providers/postgres/dblog/tests/fault_tolerance/check_fault_tolerance_test.go b/pkg/providers/postgres/dblog/tests/fault_tolerance/check_fault_tolerance_test.go index 59d0d485..b02452a7 100644 --- a/pkg/providers/postgres/dblog/tests/fault_tolerance/check_fault_tolerance_test.go +++ b/pkg/providers/postgres/dblog/tests/fault_tolerance/check_fault_tolerance_test.go @@ -66,7 +66,7 @@ func TestIncrementalSnapshotFaultTolerance(t *testing.T) { coordinator.NewFakeClient()) require.NoError(t, err) - storage, err := dblog.NewStorage(src, pgStorage, pgStorage.Conn, incrementalLimit, Source.SlotID, pgsink.Represent) + storage, err := dblog.NewStorage(logger.Log, src, pgStorage, pgStorage.Conn, incrementalLimit, Source.SlotID, "public", pgsink.Represent) require.NoError(t, err) sourceTables, err := storage.TableList(nil) @@ -108,7 +108,7 @@ func TestIncrementalSnapshotFaultTolerance(t *testing.T) { coordinator.NewFakeClient()) require.NoError(t, err) - storage, err = dblog.NewStorage(src, pgStorage, pgStorage.Conn, incrementalLimit, Source.SlotID, pgsink.Represent) + storage, err = dblog.NewStorage(logger.Log, src, pgStorage, pgStorage.Conn, incrementalLimit, Source.SlotID, "public", pgsink.Represent) require.NoError(t, err) err = storage.LoadTable(context.Background(), *numTable, pusher) diff --git a/pkg/providers/postgres/dblog/tests/mvp/check_mvp_test.go b/pkg/providers/postgres/dblog/tests/mvp/check_mvp_test.go index 701dc8b8..5632db7d 100644 --- a/pkg/providers/postgres/dblog/tests/mvp/check_mvp_test.go +++ b/pkg/providers/postgres/dblog/tests/mvp/check_mvp_test.go @@ -74,7 +74,7 @@ func TestIncrementalSnapshot(t *testing.T) { coordinator.NewFakeClient()) require.NoError(t, err) - storage, err := dblog.NewStorage(src, pgStorage, pgStorage.Conn, incrementalLimit, Source.SlotID, pgsink.Represent) + storage, err := dblog.NewStorage(logger.Log, src, pgStorage, pgStorage.Conn, incrementalLimit, Source.SlotID, "public", pgsink.Represent) require.NoError(t, err) sourceTables, err := storage.TableList(nil) diff --git a/pkg/providers/postgres/model_pg_source.go b/pkg/providers/postgres/model_pg_source.go index 8a860011..cb4b2162 100644 --- a/pkg/providers/postgres/model_pg_source.go +++ b/pkg/providers/postgres/model_pg_source.go @@ -14,6 +14,7 @@ import ( "github.com/doublecloud/transfer/pkg/abstract/model" "github.com/doublecloud/transfer/pkg/errors" "github.com/doublecloud/transfer/pkg/errors/categories" + "github.com/doublecloud/transfer/pkg/providers/postgres/dblog" "github.com/doublecloud/transfer/pkg/providers/postgres/utils" "github.com/doublecloud/transfer/pkg/storage" "github.com/doublecloud/transfer/pkg/transformer/registry/rename" @@ -229,7 +230,7 @@ func (s *PgSource) fulfilledIncludesImpl(tID abstract.TableID, firstIncludeOnly } if tID.Namespace == s.KeeperSchema { switch tID.Name { - case TableConsumerKeeper, TableLSN: + case TableConsumerKeeper, TableLSN, dblog.SignalTableName: result = append(result, abstract.PgName(s.KeeperSchema, tID.Name)) } } @@ -252,6 +253,7 @@ func (s *PgSource) AuxTables() []string { return []string{ abstract.PgName(s.KeeperSchema, TableConsumerKeeper), abstract.PgName(s.KeeperSchema, TableLSN), + abstract.PgName(s.KeeperSchema, dblog.SignalTableName), } } diff --git a/pkg/providers/postgres/pg_dump_test.go b/pkg/providers/postgres/pg_dump_test.go index df4a2683..e075a83f 100644 --- a/pkg/providers/postgres/pg_dump_test.go +++ b/pkg/providers/postgres/pg_dump_test.go @@ -329,6 +329,8 @@ func TestBuildArgs(t *testing.T) { `"cms"."__consumer_keeper"`, `-t`, `"cms"."__data_transfer_lsn"`, + `-t`, + `"cms"."__data_transfer_signal_table"`, }, args) // without DBTables diff --git a/pkg/providers/postgres/provider.go b/pkg/providers/postgres/provider.go index d391e928..5cf7ebbd 100644 --- a/pkg/providers/postgres/provider.go +++ b/pkg/providers/postgres/provider.go @@ -47,7 +47,7 @@ func init() { transfer_id TEXT, schema_name TEXT, table_name TEXT, lsn BIGINT Table (in target) needed for resolving data overlapping during SNAPSHOT_AND_INCREMENT transfers. */ - abstract.RegisterSystemTables(TableConsumerKeeper, TableLSN) + abstract.RegisterSystemTables(TableConsumerKeeper, TableLSN, dblog.SignalTableName) } const ( @@ -168,7 +168,8 @@ func (p *Provider) Activate(ctx context.Context, task *model.TransferOperation, } if src.DBLogEnabled { - if err := p.DBLogUpload(src, tables); err != nil { + logger.Log.Info("DBLog enabled") + if err := p.DBLogUpload(ctx, tables); err != nil { return xerrors.Errorf("DBLog snapshot loading failed: %w", err) } } else { @@ -334,7 +335,7 @@ func (p *Provider) SourceSampleableStorage() (abstract.SampleableStorage, []abst } var tables []abstract.TableDescription for tID, tInfo := range all { - if tID.Name == TableConsumerKeeper { + if tID.Name == TableConsumerKeeper || tID.Name == dblog.SignalTableName { continue } if src.Include(tID) { @@ -362,26 +363,34 @@ func (p *Provider) Type() abstract.ProviderType { return ProviderType } -func (p *Provider) DBLogUpload(src *PgSource, tables abstract.TableMap) error { - +func (p *Provider) DBLogUpload(ctx context.Context, tables abstract.TableMap) error { + src, ok := p.transfer.Src.(*PgSource) + if !ok { + return xerrors.Errorf("unexpected type: %T", p.transfer.Src) + } pgStorage, err := NewStorage(src.ToStorageParams(p.transfer)) if err != nil { return xerrors.Errorf("failed to create postgres storage: %w", err) } - sourceWrapper, err := NewSourceWrapper(src, src.SlotID, nil, p.logger, stats.NewSourceStats(p.registry), p.cp) + // ensure SignalTable exists + _, err = dblog.NewPgSignalTable(ctx, pgStorage.Conn, logger.Log, p.transfer.ID, src.KeeperSchema) + if err != nil { + return xerrors.Errorf("unable to create signal table: %w", err) + } + + sourceWrapper, err := NewSourceWrapper(src, src.SlotID, p.transfer.DataObjects, p.logger, stats.NewSourceStats(p.registry), p.cp) if err != nil { return xerrors.Errorf("failed to create source wrapper: %w", err) } - dblogStorage, err := dblog.NewStorage(sourceWrapper, pgStorage, pgStorage.Conn, src.ChunkSize, src.SlotID, Represent) + dblogStorage, err := dblog.NewStorage(p.logger, sourceWrapper, pgStorage, pgStorage.Conn, src.ChunkSize, src.SlotID, src.KeeperSchema, Represent) if err != nil { return xerrors.Errorf("failed to create DBLog storage: %w", err) } tableDescs := tables.ConvertToTableDescriptions() for _, table := range tableDescs { - asyncSink, err := abstract_sink.MakeAsyncSink( p.transfer, logger.Log, @@ -397,7 +406,13 @@ func (p *Provider) DBLogUpload(src *PgSource, tables abstract.TableMap) error { } if err = backoff.Retry(func() error { - return dblogStorage.LoadTable(context.Background(), table, pusher) + logger.Log.Infof("Starting upload table: %s", table.String()) + + err := dblogStorage.LoadTable(ctx, table, pusher) + if err == nil { + logger.Log.Infof("Upload table %s successfully", table.String()) + } + return err }, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 10)); err != nil { return xerrors.Errorf("failed to load table: %w", err) } diff --git a/pkg/providers/postgres/publisher.go b/pkg/providers/postgres/publisher.go index c6585680..302371f8 100644 --- a/pkg/providers/postgres/publisher.go +++ b/pkg/providers/postgres/publisher.go @@ -11,6 +11,7 @@ import ( "github.com/doublecloud/transfer/pkg/abstract" "github.com/doublecloud/transfer/pkg/abstract/coordinator" "github.com/doublecloud/transfer/pkg/abstract/model" + "github.com/doublecloud/transfer/pkg/providers/postgres/dblog" "github.com/doublecloud/transfer/pkg/stats" "github.com/doublecloud/transfer/pkg/util" "github.com/jackc/pgconn" @@ -104,14 +105,23 @@ func addTablesList(config *PgSource, trackLSN bool, objects *model.DataObjects) consumerKeeperID := *abstract.NewTableID(config.KeeperSchema, TableConsumerKeeper) mustAddConsumerKeeper := true + signalTableID := *dblog.SignalTableTableID(config.KeeperSchema) + mustAddsignalTable := config.DBLogEnabled + for _, t := range result { if mustAddConsumerKeeper && t.Equals(consumerKeeperID) { mustAddConsumerKeeper = false } + if mustAddsignalTable && t.Equals(signalTableID) { + mustAddsignalTable = false + } } if mustAddConsumerKeeper { result = append(result, consumerKeeperID) } + if mustAddsignalTable { + result = append(result, signalTableID) + } // since inherit table appear dynamically we need to filter tables on our side instead of push-list to postgres if config.CollapseInheritTables && objects != nil && len(objects.IncludeObjects) > 0 { diff --git a/pkg/providers/postgres/storage.go b/pkg/providers/postgres/storage.go index d53b25e6..e1350515 100644 --- a/pkg/providers/postgres/storage.go +++ b/pkg/providers/postgres/storage.go @@ -1165,6 +1165,8 @@ func (s *Storage) LoadQueryTable(ctx context.Context, tableQuery tablequery.Tabl query := s.queryFromQueryTable(&tableQuery, schema, SortAsc, abstract.NoFilter, All, excludeDescendants) + logger.Log.Infof("Storage::LoadQueryTable - built query: %s", query) + if err = s.loadTable(ctx, tableDescription, pusher, query, conn.Conn(), loadMode, schema, time.Now()); err != nil { return xerrors.Errorf("unable to load table : %w", err) } diff --git a/tests/e2e/pg2pg/dblog/dblog_test.go b/tests/e2e/pg2pg/dblog/dblog_test.go new file mode 100644 index 00000000..5ddc138e --- /dev/null +++ b/tests/e2e/pg2pg/dblog/dblog_test.go @@ -0,0 +1,78 @@ +package dblog + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/doublecloud/transfer/internal/logger" + "github.com/doublecloud/transfer/pkg/abstract" + dblogcommon "github.com/doublecloud/transfer/pkg/dblog" + pgcommon "github.com/doublecloud/transfer/pkg/providers/postgres" + "github.com/doublecloud/transfer/pkg/providers/postgres/dblog" + "github.com/doublecloud/transfer/pkg/providers/postgres/pgrecipe" + "github.com/doublecloud/transfer/tests/helpers" + "github.com/jackc/pgx/v4/pgxpool" + "github.com/stretchr/testify/require" +) + +var ( + TransferType = abstract.TransferTypeSnapshotAndIncrement + Source = *pgrecipe.RecipeSource(pgrecipe.WithInitDir("init_source"), pgrecipe.WithDBTables("public.__test")) + Target = *pgrecipe.RecipeTarget(pgrecipe.WithInitDir("init_target")) + ctx = context.Background() +) + +func init() { + _ = os.Setenv("YC", "1") // to not go to vanga + helpers.InitSrcDst(helpers.TransferID, &Source, &Target, TransferType) // to WithDefaults() & FillDependentFields(): IsHomo, helpers.TransferID, IsUpdateable + Source.DBLogEnabled = true + Source.ChunkSize = 2 +} + +func TestDBLog(t *testing.T) { + defer func() { + require.NoError(t, helpers.CheckConnections( + helpers.LabeledPort{Label: "PG source", Port: Source.Port}, + helpers.LabeledPort{Label: "PG target", Port: Target.Port}, + )) + }() + + transfer := helpers.MakeTransfer(helpers.TransferID, &Source, &Target, TransferType) + + worker := helpers.Activate(t, transfer) + defer worker.Close(t) + + require.NoError(t, helpers.WaitEqualRowsCount(t, "public", "__test", helpers.GetSampleableStorageByModel(t, Source), helpers.GetSampleableStorageByModel(t, Target), 240*time.Second)) + require.NoError(t, helpers.CompareStorages(t, Source, Target, helpers.NewCompareStorageParams())) + + srcConn, err := pgcommon.MakeConnPoolFromSrc(&Source, logger.Log) + require.NoError(t, err) + defer srcConn.Close() + + // after all the data has been copied from the source code, all kinds of watermarks are expected + checkWatermarkExist(t, dblogcommon.LowWatermarkType, srcConn) + checkWatermarkExist(t, dblogcommon.HighWatermarkType, srcConn) + checkWatermarkExist(t, dblogcommon.SuccessWatermarkType, srcConn) + + dstConn, err := pgcommon.MakeConnPoolFromDst(&Target, logger.Log) + require.NoError(t, err) + defer dstConn.Close() + + // check replication + _, err = srcConn.Exec(ctx, "INSERT INTO __test VALUES('11', '11');") + require.NoError(t, err) + _, err = srcConn.Exec(ctx, "INSERT INTO __test VALUES('12', '12');") + require.NoError(t, err) + require.NoError(t, helpers.WaitEqualRowsCount(t, "public", "__test", helpers.GetSampleableStorageByModel(t, Source), helpers.GetSampleableStorageByModel(t, Target), 240*time.Second)) + require.NoError(t, helpers.CompareStorages(t, Source, Target, helpers.NewCompareStorageParams())) +} + +func checkWatermarkExist(t *testing.T, mark dblogcommon.WatermarkType, srcConn *pgxpool.Pool) { + var hasWatermark bool + err := srcConn.QueryRow(ctx, fmt.Sprintf("SELECT true FROM %s WHERE mark_type = $1;", dblog.SignalTableName), dblogcommon.SuccessWatermarkType).Scan(&hasWatermark) + require.True(t, hasWatermark) + require.NoError(t, err) +} diff --git a/tests/e2e/pg2pg/dblog/dump/dump.sql b/tests/e2e/pg2pg/dblog/dump/dump.sql new file mode 100644 index 00000000..f980b081 --- /dev/null +++ b/tests/e2e/pg2pg/dblog/dump/dump.sql @@ -0,0 +1,16 @@ +CREATE TABLE __test ( + id INT PRIMARY KEY, + txt TEXT +); + +INSERT INTO __test VALUES + ('1', 1), + ('2', 2), + ('3', 3), + ('4', 4), + ('5', 5), + ('6', 6), + ('7', 7), + ('8', 8), + ('9', 9), + ('10', 10); diff --git a/tests/helpers/compare_storages.go b/tests/helpers/compare_storages.go index 27edf98e..d9342f03 100644 --- a/tests/helpers/compare_storages.go +++ b/tests/helpers/compare_storages.go @@ -22,11 +22,12 @@ import ( ) var technicalTables = map[string]bool{ - "__consumer_keeper": true, // pg - "__dt_cluster_time": true, // mongodb - "__table_transfer_progress": true, // mysql - "__tm_gtid_keeper": true, // mysql - "__tm_keeper": true, // mysql + "__data_transfer_signal_table": true, // dblog signal table + "__consumer_keeper": true, // pg + "__dt_cluster_time": true, // mongodb + "__table_transfer_progress": true, // mysql + "__tm_gtid_keeper": true, // mysql + "__tm_keeper": true, // mysql } func withTextSerialization(storageParams *pgStorage.PgStorageParams) *pgStorage.PgStorageParams {