Skip to content

Commit 2e1afdc

Browse files
committed
Merge branch 'pr/postgres-migrations' of https://github.com/yogesh1801/peerdb into pr/postgres-migrations
2 parents 7df39be + 4e884fb commit 2e1afdc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1376
-1080
lines changed

flow/activities/flowable.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func (a *FlowableActivity) SyncFlow(
319319
return a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
320320
}
321321

322-
if err := srcConn.SetupReplConn(ctx); err != nil {
322+
if err := srcConn.SetupReplConn(ctx, config.Env); err != nil {
323323
srcClose(ctx)
324324
return a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
325325
}
@@ -612,7 +612,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
612612
}
613613

614614
return func(partition *protos.QRepPartition) error {
615-
stream := model.NewQRecordStream(shared.FetchAndChannelSize)
615+
stream := model.NewQRecordStream(shared.QRepChannelSize)
616616
outstream := stream
617617

618618
if luaScript != nil {
@@ -641,7 +641,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
641641
}
642642

643643
return func(partition *protos.QRepPartition) error {
644-
stream := model.NewQObjectStream(shared.FetchAndChannelSize)
644+
stream := model.NewQObjectStream(shared.QRepChannelSize)
645645

646646
return replicateQRepPartition(ctx, a, srcConn, destConn, dstPeer.Type, config, partition, runUUID, stream, stream,
647647
connectors.QRepPullObjectsConnector.PullQRepObjects,
@@ -1534,7 +1534,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
15341534

15351535
switch config.System {
15361536
case protos.TypeSystem_Q:
1537-
stream := model.NewQRecordStream(shared.FetchAndChannelSize)
1537+
stream := model.NewQRecordStream(shared.QRepChannelSize)
15381538
return replicateXminPartition(ctx, a, config, partition, runUUID,
15391539
stream, stream,
15401540
(*connpostgres.PostgresConnector).PullXminRecordStream,

flow/activities/flowable_core.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,9 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
234234
defer dstClose(ctx)
235235

236236
syncState.Store(shared.Ptr("updating schema"))
237-
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas); err != nil {
237+
if err := dstConn.ReplayTableSchemaDeltas(
238+
ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas, config.Version,
239+
); err != nil {
238240
return nil, fmt.Errorf("failed to sync schema: %w", err)
239241
}
240242

flow/alerting/classifier.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,11 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) {
497497
return ErrorNotifyPostgresSlotMemalloc, pgErrorInfo
498498
}
499499

500+
// Usually a single occurrence then reconnect immediately helps
501+
if strings.Contains(pgErr.Message, "pfree called with invalid pointer") {
502+
return ErrorRetryRecoverable, pgErrorInfo
503+
}
504+
500505
// Fall through for other internal errors
501506
return ErrorOther, pgErrorInfo
502507

@@ -608,8 +613,10 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) {
608613
return ErrorOther, myErrorInfo
609614
case 1146: // ER_NO_SUCH_TABLE
610615
return ErrorNotifySourceTableMissing, myErrorInfo
611-
case 1943:
616+
case 1943: // ER_DUPLICATE_GTID_DOMAIN (MariaDB)
612617
return ErrorNotifyBadGTIDSetup, myErrorInfo
618+
case 1317: // ER_QUERY_INTERRUPTED
619+
return ErrorRetryRecoverable, myErrorInfo
613620
default:
614621
return ErrorOther, myErrorInfo
615622
}

flow/alerting/classifier_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,23 @@ func TestPostgresMemoryAllocErrorShouldBeSlotMemalloc(t *testing.T) {
170170
}, errInfo, "Unexpected error info")
171171
}
172172

173+
func TestPostgresPfreeInvalidPointerErrorShouldBeRecoverable(t *testing.T) {
174+
// Simulate a Postgres pfree invalid pointer error
175+
err := &exceptions.PostgresWalError{
176+
Msg: &pgproto3.ErrorResponse{
177+
Severity: "ERROR",
178+
Code: pgerrcode.InternalError,
179+
Message: "pfree called with invalid pointer 0x400720764ed0 (header 0x0000400720825ae0) ",
180+
},
181+
}
182+
errorClass, errInfo := GetErrorClass(t.Context(), fmt.Errorf("error in WAL: %w", err))
183+
assert.Equal(t, ErrorRetryRecoverable, errorClass, "Unexpected error class")
184+
assert.Equal(t, ErrorInfo{
185+
Source: ErrorSourcePostgres,
186+
Code: pgerrcode.InternalError,
187+
}, errInfo, "Unexpected error info")
188+
}
189+
173190
func TestClickHouseAccessEntityNotFoundErrorShouldBeRecoverable(t *testing.T) {
174191
// Simulate a ClickHouse access entity not found error
175192
for idx, msg := range []string{

flow/connectors/bigquery/bigquery.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(
239239
flowJobName string,
240240
_ []*protos.TableMapping,
241241
schemaDeltas []*protos.TableSchemaDelta,
242+
_ uint32,
242243
) error {
243244
for _, schemaDelta := range schemaDeltas {
244245
if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 {

flow/connectors/bigquery/gcp.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,13 @@ func (p *gcsPath) Bucket() string {
7171
// It is used as the prefix to query objects in the bucket.
7272
// For example, if the path is "folder/subfolder", it returns "folder/subfolder/".
7373
// If the path is "folder/subfolder/", it also returns "folder/subfolder/".
74+
// If the path is empty or just "/", it returns an empty string.
7475
func (p *gcsPath) QueryPrefix() string {
75-
return strings.TrimPrefix(p.Path, "/") + "/"
76+
path := strings.TrimPrefix(p.Path, "/")
77+
if path == "" {
78+
return ""
79+
}
80+
return path + "/"
7681
}
7782

7883
func (p *gcsPath) JoinPath(elem ...string) *gcsPath {

flow/connectors/bigquery/qrep.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep(
8787
}
8888

8989
if err := c.ReplayTableSchemaDeltas(
90-
ctx, config.Env, config.FlowJobName, nil, []*protos.TableSchemaDelta{tableSchemaDelta},
90+
ctx, config.Env, config.FlowJobName, nil, []*protos.TableSchemaDelta{tableSchemaDelta}, config.Version,
9191
); err != nil {
9292
return nil, fmt.Errorf("failed to add columns to destination table: %w", err)
9393
}

flow/connectors/bigquery/qrep_avro_sync.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ func (s *QRepAvroSyncMethod) SyncRecords(
9898
slog.String(string(shared.FlowNameKey), req.FlowJobName),
9999
slog.String("dstTableName", rawTableName))
100100

101-
if err := s.connector.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas); err != nil {
101+
if err := s.connector.ReplayTableSchemaDeltas(
102+
ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version,
103+
); err != nil {
102104
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
103105
}
104106

flow/connectors/bigquery/qrep_object_pull.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -397,22 +397,22 @@ func (c *BigQueryConnector) FinishExport(v any) error {
397397
return nil
398398
}
399399

400-
func (c *BigQueryConnector) SetupReplConn(_ context.Context) error {
400+
func (c *BigQueryConnector) SetupReplConn(context.Context, map[string]string) error {
401401
return nil
402402
}
403403

404-
func (c *BigQueryConnector) ReplPing(_ context.Context) error {
404+
func (c *BigQueryConnector) ReplPing(context.Context) error {
405405
return nil
406406
}
407407

408-
func (c *BigQueryConnector) UpdateReplStateLastOffset(_ context.Context, _ model.CdcCheckpoint) error {
408+
func (c *BigQueryConnector) UpdateReplStateLastOffset(context.Context, model.CdcCheckpoint) error {
409409
return nil
410410
}
411411

412-
func (c *BigQueryConnector) PullFlowCleanup(_ context.Context, _ string) error {
412+
func (c *BigQueryConnector) PullFlowCleanup(context.Context, string) error {
413413
return nil
414414
}
415415

416-
func (c *BigQueryConnector) SetupReplication(_ context.Context, _ *protos.SetupReplicationInput) (model.SetupReplicationResult, error) {
416+
func (c *BigQueryConnector) SetupReplication(context.Context, *protos.SetupReplicationInput) (model.SetupReplicationResult, error) {
417417
return model.SetupReplicationResult{}, nil
418418
}

flow/connectors/clickhouse/cdc.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
131131
}
132132
warnings := numericTruncator.Warnings()
133133

134-
if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas); err != nil {
134+
if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version); err != nil {
135135
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
136136
}
137137

@@ -165,6 +165,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
165165
flowJobName string,
166166
tableMappings []*protos.TableMapping,
167167
schemaDeltas []*protos.TableSchemaDelta,
168+
internalVersion uint32,
168169
) error {
169170
if len(schemaDeltas) == 0 {
170171
return nil
@@ -188,7 +189,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
188189
for _, addedColumn := range schemaDelta.AddedColumns {
189190
qvKind := types.QValueKind(addedColumn.Type)
190191
clickHouseColType, err := qvalue.ToDWHColumnType(
191-
ctx, qvKind, env, protos.DBType_CLICKHOUSE, c.chVersion, addedColumn, schemaDelta.NullableEnabled,
192+
ctx, qvKind, env, protos.DBType_CLICKHOUSE, c.chVersion, addedColumn, schemaDelta.NullableEnabled, internalVersion,
192193
)
193194
if err != nil {
194195
return fmt.Errorf("failed to convert column type %s to ClickHouse type: %w", addedColumn.Type, err)

0 commit comments

Comments
 (0)