diff --git a/cmd/flags/flags.go b/cmd/flags/flags.go index 587ea75a..705a2e58 100644 --- a/cmd/flags/flags.go +++ b/cmd/flags/flags.go @@ -3,6 +3,8 @@ package flags import ( + "time" + "github.com/spf13/viper" ) @@ -24,6 +26,8 @@ func LockTimeout() int { func BackfillBatchSize() int { return viper.GetInt("BACKFILL_BATCH_SIZE") } +func BackfillBatchDelay() time.Duration { return viper.GetDuration("BACKFILL_BATCH_DELAY") } + func Role() string { return viper.GetString("ROLE") } diff --git a/cmd/root.go b/cmd/root.go index 3eb1b4f7..63db7d1d 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -25,6 +25,7 @@ func init() { rootCmd.PersistentFlags().String("pgroll-schema", "pgroll", "Postgres schema to use for pgroll internal state") rootCmd.PersistentFlags().Int("lock-timeout", 500, "Postgres lock timeout in milliseconds for pgroll DDL operations") rootCmd.PersistentFlags().Int("backfill-batch-size", roll.DefaultBackfillBatchSize, "Number of rows backfilled in each batch") + rootCmd.PersistentFlags().Duration("backfill-batch-delay", roll.DefaultBackfillDelay, "Duration of delay between batch backfills (eg. 1s, 1000ms)") rootCmd.PersistentFlags().String("role", "", "Optional postgres role to set when executing migrations") viper.BindPFlag("PG_URL", rootCmd.PersistentFlags().Lookup("postgres-url")) @@ -32,6 +33,7 @@ func init() { viper.BindPFlag("STATE_SCHEMA", rootCmd.PersistentFlags().Lookup("pgroll-schema")) viper.BindPFlag("LOCK_TIMEOUT", rootCmd.PersistentFlags().Lookup("lock-timeout")) viper.BindPFlag("BACKFILL_BATCH_SIZE", rootCmd.PersistentFlags().Lookup("backfill-batch-size")) + viper.BindPFlag("BACKFILL_BATCH_DELAY", rootCmd.PersistentFlags().Lookup("backfill-batch-delay")) viper.BindPFlag("ROLE", rootCmd.PersistentFlags().Lookup("role")) } @@ -48,6 +50,7 @@ func NewRoll(ctx context.Context) (*roll.Roll, error) { lockTimeout := flags.LockTimeout() role := flags.Role() backfillBatchSize := flags.BackfillBatchSize() + backfillBatchDelay := flags.BackfillBatchDelay() state, err := state.New(ctx, pgURL, stateSchema) if err != nil { @@ -58,6 +61,7 @@ func NewRoll(ctx context.Context) (*roll.Roll, error) { roll.WithLockTimeoutMs(lockTimeout), roll.WithRole(role), roll.WithBackfillBatchSize(backfillBatchSize), + roll.WithBackfillBatchDelay(backfillBatchDelay), ) } diff --git a/pkg/migrations/backfill.go b/pkg/migrations/backfill.go index 006f78ff..1e0e5120 100644 --- a/pkg/migrations/backfill.go +++ b/pkg/migrations/backfill.go @@ -7,6 +7,7 @@ import ( "database/sql" "errors" "fmt" + "time" "github.com/lib/pq" "github.com/xataio/pgroll/pkg/db" @@ -19,7 +20,7 @@ import ( // 2. Get the first batch of rows from the table, ordered by the primary key. // 3. Update each row in the batch, setting the value of the primary key column to itself. // 4. Repeat steps 2 and 3 until no more rows are returned. -func Backfill(ctx context.Context, conn db.DB, table *schema.Table, batchSize int, cbs ...CallbackFn) error { +func Backfill(ctx context.Context, conn db.DB, table *schema.Table, batchSize int, batchDelay time.Duration, cbs ...CallbackFn) error { // get the backfill column identityColumn := getIdentityColumn(table) if identityColumn == nil { @@ -32,6 +33,7 @@ func Backfill(ctx context.Context, conn db.DB, table *schema.Table, batchSize in identityColumn: identityColumn, lastValue: nil, batchSize: batchSize, + batchDelay: batchDelay, } // Update each batch of rows, invoking callbacks for each one. @@ -46,6 +48,8 @@ func Backfill(ctx context.Context, conn db.DB, table *schema.Table, batchSize in } return err } + + time.Sleep(b.batchDelay) } return nil @@ -84,6 +88,7 @@ type batcher struct { identityColumn *schema.Column lastValue *string batchSize int + batchDelay time.Duration } func (b *batcher) updateBatch(ctx context.Context, conn db.DB) error { diff --git a/pkg/roll/execute.go b/pkg/roll/execute.go index 50efa5aa..3e5f57f4 100644 --- a/pkg/roll/execute.go +++ b/pkg/roll/execute.go @@ -277,7 +277,7 @@ func (m *Roll) ensureView(ctx context.Context, version, name string, table schem func (m *Roll) performBackfills(ctx context.Context, tables []*schema.Table, cbs ...migrations.CallbackFn) error { for _, table := range tables { - if err := migrations.Backfill(ctx, m.pgConn, table, m.backfillBatchSize, cbs...); err != nil { + if err := migrations.Backfill(ctx, m.pgConn, table, m.backfillBatchSize, m.backfillBatchDelay, cbs...); err != nil { errRollback := m.Rollback(ctx) return errors.Join( diff --git a/pkg/roll/options.go b/pkg/roll/options.go index 344081f5..efd08ce4 100644 --- a/pkg/roll/options.go +++ b/pkg/roll/options.go @@ -3,6 +3,8 @@ package roll import ( + "time" + "github.com/xataio/pgroll/pkg/migrations" ) @@ -28,6 +30,9 @@ type options struct { // the number of rows to backfill in each batch backfillBatchSize int + // the duration to delay after each batch is run + backfillBatchDelay time.Duration + migrationHooks MigrationHooks } @@ -109,3 +114,10 @@ func WithBackfillBatchSize(batchSize int) Option { o.backfillBatchSize = batchSize } } + +// WithBackfillBatchDelay sets the delay after each batch is run. +func WithBackfillBatchDelay(delay time.Duration) Option { + return func(o *options) { + o.backfillBatchDelay = delay + } +} diff --git a/pkg/roll/roll.go b/pkg/roll/roll.go index 942027a2..e559119b 100644 --- a/pkg/roll/roll.go +++ b/pkg/roll/roll.go @@ -7,6 +7,7 @@ import ( "database/sql" "fmt" "strings" + "time" "github.com/lib/pq" @@ -18,8 +19,9 @@ import ( type PGVersion int const ( - PGVersion15 PGVersion = 15 - DefaultBackfillBatchSize int = 1000 + PGVersion15 PGVersion = 15 + DefaultBackfillBatchSize int = 1000 + DefaultBackfillDelay time.Duration = 0 ) type Roll struct { @@ -34,11 +36,13 @@ type Roll struct { // disable creation of version schema for raw SQL migrations noVersionSchemaForRawSQL bool - migrationHooks MigrationHooks - state *state.State - pgVersion PGVersion - sqlTransformer migrations.SQLTransformer - backfillBatchSize int + migrationHooks MigrationHooks + state *state.State + pgVersion PGVersion + sqlTransformer migrations.SQLTransformer + + backfillBatchSize int + backfillBatchDelay time.Duration } // New creates a new Roll instance @@ -73,12 +77,13 @@ func New(ctx context.Context, pgURL, schema string, state *state.State, opts ... pgConn: &db.RDB{DB: conn}, schema: schema, state: state, - pgVersion: PGVersion(pgMajorVersion), + pgVersion: pgMajorVersion, + sqlTransformer: sqlTransformer, disableVersionSchemas: rollOpts.disableVersionSchemas, noVersionSchemaForRawSQL: rollOpts.noVersionSchemaForRawSQL, migrationHooks: rollOpts.migrationHooks, - sqlTransformer: sqlTransformer, backfillBatchSize: rollOpts.backfillBatchSize, + backfillBatchDelay: rollOpts.backfillBatchDelay, }, nil }