Skip to content

Commit

Permalink
fix dataObjects into pg dblog & added logging
Browse files Browse the repository at this point in the history
commit_hash:a8e7a97ce2e384b0b398e2dbfa284f3bd29ef61c
  • Loading branch information
timmyb32r committed Nov 29, 2024
1 parent af92f3c commit caa93eb
Show file tree
Hide file tree
Showing 19 changed files with 221 additions and 43 deletions.
2 changes: 2 additions & 0 deletions .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -2605,6 +2605,8 @@
"tests/e2e/pg2pg/bytea_key/check_db_test.go":"transfer_manager/go/tests/e2e/pg2pg/bytea_key/check_db_test.go",
"tests/e2e/pg2pg/bytea_key/init_source/dump.sql":"transfer_manager/go/tests/e2e/pg2pg/bytea_key/init_source/dump.sql",
"tests/e2e/pg2pg/bytea_key/init_target/dump.sql":"transfer_manager/go/tests/e2e/pg2pg/bytea_key/init_target/dump.sql",
"tests/e2e/pg2pg/dblog/dblog_test.go":"transfer_manager/go/tests/e2e/pg2pg/dblog/dblog_test.go",
"tests/e2e/pg2pg/dblog/dump/dump.sql":"transfer_manager/go/tests/e2e/pg2pg/dblog/dump/dump.sql",
"tests/e2e/pg2pg/debezium/all_datatypes/check_db_test.go":"transfer_manager/go/tests/e2e/pg2pg/debezium/all_datatypes/check_db_test.go",
"tests/e2e/pg2pg/debezium/all_datatypes/init_source/dump.sql":"transfer_manager/go/tests/e2e/pg2pg/debezium/all_datatypes/init_source/dump.sql",
"tests/e2e/pg2pg/debezium/all_datatypes/init_target/init.sql":"transfer_manager/go/tests/e2e/pg2pg/debezium/all_datatypes/init_target/init.sql",
Expand Down
13 changes: 8 additions & 5 deletions pkg/dblog/incremental_async_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package dblog
import (
"context"

"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/util"
"github.com/google/uuid"
"go.ytsaurus.tech/library/go/core/log"
"golang.org/x/exp/maps"
)

type IncrementalAsyncSink struct {
ctx context.Context
ctx context.Context
logger log.Logger

signalTable SignalTable

Expand All @@ -32,6 +33,7 @@ type IncrementalAsyncSink struct {

func NewIncrementalAsyncSink(
ctx context.Context,
logger log.Logger,
signalTable SignalTable,
table abstract.TableID,
tableIterator *IncrementalIterator,
Expand All @@ -43,6 +45,7 @@ func NewIncrementalAsyncSink(
) *IncrementalAsyncSink {
asyncSink := &IncrementalAsyncSink{
ctx: ctx,
logger: logger,
signalTable: signalTable,
tableID: table,
tableIterator: tableIterator,
Expand Down Expand Up @@ -87,10 +90,10 @@ func (s *IncrementalAsyncSink) AsyncPush(items []abstract.ChangeItem) chan error
}

if ok, watermarkType := s.signalTable.IsWatermark(&item, s.tableID, s.expectedUUID()); ok {
logger.Log.Info("watermark found")
s.logger.Info("watermark found")

if !s.isExpectedWatermarkOfType(watermarkType) {
logger.Log.Info("wrong watermark found")
s.logger.Info("wrong watermark found")
continue
}

Expand Down Expand Up @@ -143,7 +146,7 @@ func (s *IncrementalAsyncSink) AsyncPush(items []abstract.ChangeItem) chan error
encodedKey := stringArrToString(keyValue, defaultSeparator)

if _, ok = s.chunk[encodedKey]; ok {
logger.Log.Infof("found primary key from chunk: %s", keyValue)
s.logger.Infof("found primary key from chunk: %s", keyValue)
delete(s.chunk, encodedKey)
}
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/dblog/incremental_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/dblog/tablequery"
"github.com/google/uuid"
"go.ytsaurus.tech/library/go/core/log"
)

type IncrementalIterator struct {
logger log.Logger

storage tablequery.StorageTableQueryable
tableQuery *tablequery.TableQuery
signalTable SignalTable
Expand All @@ -26,6 +29,7 @@ type IncrementalIterator struct {
}

func NewIncrementalIterator(
logger log.Logger,
storage tablequery.StorageTableQueryable,
tableQuery *tablequery.TableQuery,
signalTable SignalTable,
Expand All @@ -36,6 +40,7 @@ func NewIncrementalIterator(
betweenMarksOpts ...func(),
) (*IncrementalIterator, error) {
iter := &IncrementalIterator{
logger: logger,
storage: storage,
tableQuery: tableQuery,
signalTable: signalTable,
Expand All @@ -53,7 +58,7 @@ func NewIncrementalIterator(

func (i *IncrementalIterator) Next(ctx context.Context) ([]abstract.ChangeItem, error) {
i.tableQuery.Filter = MakeNextWhereStatement(i.pkColNames, i.lowBound)

i.logger.Infof("IncrementalIterator::Next - i.tableQuery.Filter: %s", i.tableQuery.Filter)
return i.loadTablePart(ctx)
}

Expand All @@ -63,6 +68,8 @@ func (i *IncrementalIterator) loadTablePart(ctx context.Context) ([]abstract.Cha
return nil, xerrors.Errorf("Failed to create watermark when selecting chunk: %w", err)
}

i.logger.Infof("created low watermark, uuid: %s", lowWatermarkUUID.String())

i.LowWatermarkUUID = lowWatermarkUUID

chunk := make([]abstract.ChangeItem, 0, i.limit)
Expand Down Expand Up @@ -98,6 +105,8 @@ func (i *IncrementalIterator) loadTablePart(ctx context.Context) ([]abstract.Cha
return nil, xerrors.Errorf("Failed to create watermark when selecting chunk")
}

i.logger.Infof("created high watermark, uuid: %s", highWatermarkUUID.String())

i.HighWatermarkUUID = highWatermarkUUID

return chunk, nil
Expand Down
44 changes: 31 additions & 13 deletions pkg/providers/postgres/dblog/signal_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dblog
import (
"context"
"fmt"
"strings"

"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/abstract"
Expand All @@ -19,22 +20,27 @@ type (
)

const (
signalTableName = "__data_transfer_signal_table"
SignalTableName = "__data_transfer_signal_table"
tableSchemaColumnIndex = 0
tableNameColumnIndex = 1
tableTransferIDIndex = 2
markColumnIndex = 3
markTypeColumnIndex = 4
)

func SignalTableTableID(schemaName string) *abstract.TableID {
return abstract.NewTableID(schemaName, SignalTableName)
}

type signalTable struct {
conn *pgxpool.Pool
logger log.Logger
transferID string
schemaName string
}

func buildSignalTableDDL() string {
query := `CREATE TABLE IF NOT EXISTS %s
func buildSignalTableDDL(schemaName string) string {
query := `CREATE TABLE IF NOT EXISTS "%s"."%s"
(
table_schema TEXT,
table_name TEXT,
Expand All @@ -45,20 +51,21 @@ func buildSignalTableDDL() string {
PRIMARY KEY (table_schema, table_name, transfer_id, mark_type)
);`

return fmt.Sprintf(query, signalTableName)
return fmt.Sprintf(query, schemaName, SignalTableName)
}

func newPgSignalTable(
func NewPgSignalTable(
ctx context.Context,
conn *pgxpool.Pool,
logger log.Logger,
transferID string,
schemaName string,
) (*signalTable, error) {

pgSignalTable := &signalTable{
conn: conn,
logger: logger,
transferID: transferID,
schemaName: schemaName,
}

if err := pgSignalTable.init(ctx); err != nil {
Expand All @@ -70,7 +77,7 @@ func newPgSignalTable(

func (s *signalTable) init(ctx context.Context) error {
return s.tx(ctx, func(tx pgx.Tx) error {
if _, err := tx.Exec(ctx, buildSignalTableDDL()); err != nil {
if _, err := tx.Exec(ctx, buildSignalTableDDL(s.schemaName)); err != nil {
return xerrors.Errorf("failed to ensure existence of the signal table service table: %w", err)
}
return nil
Expand Down Expand Up @@ -125,9 +132,20 @@ func (s *signalTable) CreateWatermark(
return xerrors.Errorf("unable to convert low bound array to string")
}

query := s.makeWatermarkQuery()
s.logger.Info(
fmt.Sprintf("CreateWatermark - query: %s", strings.ReplaceAll(query, "\n", "")),
log.String("tableID.Namespace", tableID.Namespace),
log.String("tableID.Name", tableID.Name),
log.String("s.transferID", s.transferID),
log.String("newUUID", newUUID.String()),
log.String("watermarkType", string(watermarkType)),
log.String("lowBoundStr", lowBoundStr),
)

_, err = tx.Exec(
ctx,
s.makeWatermarkQuery(),
query,
tableID.Namespace,
tableID.Name,
s.transferID,
Expand All @@ -151,7 +169,7 @@ func (s *signalTable) CreateWatermark(
}

func (s *signalTable) IsWatermark(item *abstract.ChangeItem, tableID abstract.TableID, markUUID uuid.UUID) (bool, dblog.WatermarkType) {
isWatermark := item.Table == signalTableName
isWatermark := item.Table == SignalTableName
if !isWatermark {
return false, dblog.BadWatermarkType
}
Expand Down Expand Up @@ -183,14 +201,14 @@ func (s *signalTable) IsWatermark(item *abstract.ChangeItem, tableID abstract.Ta
}

func (s *signalTable) makeWatermarkQuery() string {
query := `INSERT INTO %s (table_schema, table_name, transfer_id, mark, mark_type, low_bound)
query := `INSERT INTO "%s"."%s" (table_schema, table_name, transfer_id, mark, mark_type, low_bound)
VALUES (($1), ($2), ($3), ($4), ($5), ($6))
ON CONFLICT (table_schema, table_name, transfer_id, mark_type)
DO UPDATE
SET mark = EXCLUDED.mark,
low_bound = EXCLUDED.low_bound;`

return fmt.Sprintf(query, signalTableName)
return fmt.Sprintf(query, s.schemaName, SignalTableName)
}

func (s *signalTable) resolveLowBound(ctx context.Context, tableID abstract.TableID) []string {
Expand Down Expand Up @@ -220,11 +238,11 @@ func (s *signalTable) resolveLowBound(ctx context.Context, tableID abstract.Tabl
}

func (s *signalTable) resolveLowBoundQuery() string {
query := `SELECT low_bound FROM %s
query := `SELECT low_bound FROM "%s"."%s"
WHERE table_schema = ($1)
AND table_name = ($2)
AND transfer_id = ($3)
AND mark_type = ($4);`

return fmt.Sprintf(query, signalTableName)
return fmt.Sprintf(query, s.schemaName, SignalTableName)
}
21 changes: 19 additions & 2 deletions pkg/providers/postgres/dblog/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ package dblog
import (
"context"

"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/dblog"
"github.com/doublecloud/transfer/pkg/dblog/tablequery"
"github.com/jackc/pgx/v4/pgxpool"
"go.ytsaurus.tech/library/go/core/log"
)

type Storage struct {
logger log.Logger

src abstract.Source
pgStorage tablequery.StorageTableQueryable
conn *pgxpool.Pool
Expand All @@ -20,24 +22,29 @@ type Storage struct {

transferID string
represent dblog.ChangeItemConverter
keeperSchema string
betweenMarksOpts []func()
}

func NewStorage(
logger log.Logger,
src abstract.Source,
pgStorage tablequery.StorageTableQueryable,
conn *pgxpool.Pool,
chunkSize uint64,
transferID string,
keeperSchema string,
represent dblog.ChangeItemConverter,
betweenMarksOpts ...func(),
) (abstract.Storage, error) {
return &Storage{
logger: log.With(logger, log.Any("component", "dblog")),
src: src,
pgStorage: pgStorage,
conn: conn,
chunkSize: chunkSize,
transferID: transferID,
keeperSchema: keeperSchema,
represent: represent,
betweenMarksOpts: betweenMarksOpts,
}, nil
Expand All @@ -64,16 +71,23 @@ func (s *Storage) LoadTable(ctx context.Context, tableDescr abstract.TableDescri
if err != nil {
return xerrors.Errorf("unable to generate chunk size: %w", err)
}
s.logger.Infof("Storage::LoadTable - inferred chunkSize: %d", chunkSize)
} else {
s.logger.Infof("Storage::LoadTable - from config chunkSize: %d", chunkSize)
}

pgSignalTable, err := newPgSignalTable(ctx, s.conn, logger.Log, s.transferID)
pgSignalTable, err := NewPgSignalTable(ctx, s.conn, s.logger, s.transferID, s.keeperSchema)
if err != nil {
return xerrors.Errorf("unable to create signal table: %w", err)
}

tableQuery := tablequery.NewTableQuery(tableDescr.ID(), true, "", 0, chunkSize)
s.logger.Infof("Storage::LoadTable - tableQuery: %v", tableQuery)
lowBound := pgSignalTable.resolveLowBound(ctx, tableDescr.ID())
s.logger.Infof("Storage::LoadTable - lowBound: %v", lowBound)

iterator, err := dblog.NewIncrementalIterator(
s.logger,
s.pgStorage,
tableQuery,
pgSignalTable,
Expand All @@ -92,13 +106,16 @@ func (s *Storage) LoadTable(ctx context.Context, tableDescr abstract.TableDescri
return xerrors.Errorf("failed to do initial iteration: %w", err)
}

s.logger.Infof("Storage::LoadTable - first iteration done, extacted items: %d", len(items))

chunk, err := dblog.ResolveChunkMapFromArr(items, pkColNames, s.represent)
if err != nil {
return xerrors.Errorf("failed to resolve chunk: %w", err)
}

asyncSink := dblog.NewIncrementalAsyncSink(
ctx,
s.logger,
pgSignalTable,
tableDescr.ID(),
iterator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"testing"

"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/dblog"
"github.com/doublecloud/transfer/pkg/dblog/tablequery"
Expand Down Expand Up @@ -199,13 +200,15 @@ func TestIncrementalSnapshot(t *testing.T) {
tableQuery := tablequery.NewTableQuery(tableDescription.ID(), true, "", 0, defaultLimit)

iterator, err := dblog.NewIncrementalIterator(
logger.Log,
storage,
tableQuery,
signalTable,
postgres.Represent,
primaryKey,
nil,
defaultLimit)
defaultLimit,
)

require.NoError(t, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestIncrementalSnapshot(t *testing.T) {
coordinator.NewFakeClient())
require.NoError(t, err)

storage, err := dblog.NewStorage(src, pgStorage, pgStorage.Conn, incrementalLimit, Source.SlotID, pgsink.Represent, opt)
storage, err := dblog.NewStorage(logger.Log, src, pgStorage, pgStorage.Conn, incrementalLimit, Source.SlotID, "public", pgsink.Represent, opt)
require.NoError(t, err)

sourceTables, err := storage.TableList(nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestUpdateKey(t *testing.T) {
coordinator.NewFakeClient())
require.NoError(t, err)

storage, err := dblog.NewStorage(src, pgStorage, pgStorage.Conn, incrementalLimit, Source.SlotID, pgsink.Represent, opt)
storage, err := dblog.NewStorage(logger.Log, src, pgStorage, pgStorage.Conn, incrementalLimit, Source.SlotID, "public", pgsink.Represent, opt)
require.NoError(t, err)

sourceTables, err := storage.TableList(nil)
Expand Down
Loading

0 comments on commit caa93eb

Please sign in to comment.