Skip to content

Commit

Permalink
Allow configuration of batch delay (#410)
Browse files Browse the repository at this point in the history
You can now configure a delay after each batch has completed. It can be
configured in three ways:

* The `--backfill-batch-delay` CLI parameter
* The `PGROLL_BACKFILL_BATCH_DELAY` environment variable
* The `roll.WithBackfillBatchDelay` functional option

It defaults to 0 if not set and all values should be provided using the
Go [duration format](https://pkg.go.dev/time#ParseDuration).

Closes #168
  • Loading branch information
ryanslade authored Oct 16, 2024
1 parent ced761b commit 7d88c25
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 11 deletions.
4 changes: 4 additions & 0 deletions cmd/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
package flags

import (
"time"

"github.com/spf13/viper"
)

Expand All @@ -24,6 +26,8 @@ func LockTimeout() int {

func BackfillBatchSize() int { return viper.GetInt("BACKFILL_BATCH_SIZE") }

func BackfillBatchDelay() time.Duration { return viper.GetDuration("BACKFILL_BATCH_DELAY") }

func Role() string {
return viper.GetString("ROLE")
}
4 changes: 4 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ func init() {
rootCmd.PersistentFlags().String("pgroll-schema", "pgroll", "Postgres schema to use for pgroll internal state")
rootCmd.PersistentFlags().Int("lock-timeout", 500, "Postgres lock timeout in milliseconds for pgroll DDL operations")
rootCmd.PersistentFlags().Int("backfill-batch-size", roll.DefaultBackfillBatchSize, "Number of rows backfilled in each batch")
rootCmd.PersistentFlags().Duration("backfill-batch-delay", roll.DefaultBackfillDelay, "Duration of delay between batch backfills (eg. 1s, 1000ms)")
rootCmd.PersistentFlags().String("role", "", "Optional postgres role to set when executing migrations")

viper.BindPFlag("PG_URL", rootCmd.PersistentFlags().Lookup("postgres-url"))
viper.BindPFlag("SCHEMA", rootCmd.PersistentFlags().Lookup("schema"))
viper.BindPFlag("STATE_SCHEMA", rootCmd.PersistentFlags().Lookup("pgroll-schema"))
viper.BindPFlag("LOCK_TIMEOUT", rootCmd.PersistentFlags().Lookup("lock-timeout"))
viper.BindPFlag("BACKFILL_BATCH_SIZE", rootCmd.PersistentFlags().Lookup("backfill-batch-size"))
viper.BindPFlag("BACKFILL_BATCH_DELAY", rootCmd.PersistentFlags().Lookup("backfill-batch-delay"))
viper.BindPFlag("ROLE", rootCmd.PersistentFlags().Lookup("role"))
}

Expand All @@ -48,6 +50,7 @@ func NewRoll(ctx context.Context) (*roll.Roll, error) {
lockTimeout := flags.LockTimeout()
role := flags.Role()
backfillBatchSize := flags.BackfillBatchSize()
backfillBatchDelay := flags.BackfillBatchDelay()

state, err := state.New(ctx, pgURL, stateSchema)
if err != nil {
Expand All @@ -58,6 +61,7 @@ func NewRoll(ctx context.Context) (*roll.Roll, error) {
roll.WithLockTimeoutMs(lockTimeout),
roll.WithRole(role),
roll.WithBackfillBatchSize(backfillBatchSize),
roll.WithBackfillBatchDelay(backfillBatchDelay),
)
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/migrations/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"database/sql"
"errors"
"fmt"
"time"

"github.com/lib/pq"
"github.com/xataio/pgroll/pkg/db"
Expand All @@ -19,7 +20,7 @@ import (
// 2. Get the first batch of rows from the table, ordered by the primary key.
// 3. Update each row in the batch, setting the value of the primary key column to itself.
// 4. Repeat steps 2 and 3 until no more rows are returned.
func Backfill(ctx context.Context, conn db.DB, table *schema.Table, batchSize int, cbs ...CallbackFn) error {
func Backfill(ctx context.Context, conn db.DB, table *schema.Table, batchSize int, batchDelay time.Duration, cbs ...CallbackFn) error {
// get the backfill column
identityColumn := getIdentityColumn(table)
if identityColumn == nil {
Expand All @@ -32,6 +33,7 @@ func Backfill(ctx context.Context, conn db.DB, table *schema.Table, batchSize in
identityColumn: identityColumn,
lastValue: nil,
batchSize: batchSize,
batchDelay: batchDelay,
}

// Update each batch of rows, invoking callbacks for each one.
Expand All @@ -46,6 +48,8 @@ func Backfill(ctx context.Context, conn db.DB, table *schema.Table, batchSize in
}
return err
}

time.Sleep(b.batchDelay)
}

return nil
Expand Down Expand Up @@ -84,6 +88,7 @@ type batcher struct {
identityColumn *schema.Column
lastValue *string
batchSize int
batchDelay time.Duration
}

func (b *batcher) updateBatch(ctx context.Context, conn db.DB) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/roll/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (m *Roll) ensureView(ctx context.Context, version, name string, table schem

func (m *Roll) performBackfills(ctx context.Context, tables []*schema.Table, cbs ...migrations.CallbackFn) error {
for _, table := range tables {
if err := migrations.Backfill(ctx, m.pgConn, table, m.backfillBatchSize, cbs...); err != nil {
if err := migrations.Backfill(ctx, m.pgConn, table, m.backfillBatchSize, m.backfillBatchDelay, cbs...); err != nil {
errRollback := m.Rollback(ctx)

return errors.Join(
Expand Down
12 changes: 12 additions & 0 deletions pkg/roll/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
package roll

import (
"time"

"github.com/xataio/pgroll/pkg/migrations"
)

Expand All @@ -28,6 +30,9 @@ type options struct {
// the number of rows to backfill in each batch
backfillBatchSize int

// the duration to delay after each batch is run
backfillBatchDelay time.Duration

migrationHooks MigrationHooks
}

Expand Down Expand Up @@ -109,3 +114,10 @@ func WithBackfillBatchSize(batchSize int) Option {
o.backfillBatchSize = batchSize
}
}

// WithBackfillBatchDelay sets the delay after each batch is run.
func WithBackfillBatchDelay(delay time.Duration) Option {
return func(o *options) {
o.backfillBatchDelay = delay
}
}
23 changes: 14 additions & 9 deletions pkg/roll/roll.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"database/sql"
"fmt"
"strings"
"time"

"github.com/lib/pq"

Expand All @@ -18,8 +19,9 @@ import (
type PGVersion int

const (
PGVersion15 PGVersion = 15
DefaultBackfillBatchSize int = 1000
PGVersion15 PGVersion = 15
DefaultBackfillBatchSize int = 1000
DefaultBackfillDelay time.Duration = 0
)

type Roll struct {
Expand All @@ -34,11 +36,13 @@ type Roll struct {
// disable creation of version schema for raw SQL migrations
noVersionSchemaForRawSQL bool

migrationHooks MigrationHooks
state *state.State
pgVersion PGVersion
sqlTransformer migrations.SQLTransformer
backfillBatchSize int
migrationHooks MigrationHooks
state *state.State
pgVersion PGVersion
sqlTransformer migrations.SQLTransformer

backfillBatchSize int
backfillBatchDelay time.Duration
}

// New creates a new Roll instance
Expand Down Expand Up @@ -73,12 +77,13 @@ func New(ctx context.Context, pgURL, schema string, state *state.State, opts ...
pgConn: &db.RDB{DB: conn},
schema: schema,
state: state,
pgVersion: PGVersion(pgMajorVersion),
pgVersion: pgMajorVersion,
sqlTransformer: sqlTransformer,
disableVersionSchemas: rollOpts.disableVersionSchemas,
noVersionSchemaForRawSQL: rollOpts.noVersionSchemaForRawSQL,
migrationHooks: rollOpts.migrationHooks,
sqlTransformer: sqlTransformer,
backfillBatchSize: rollOpts.backfillBatchSize,
backfillBatchDelay: rollOpts.backfillBatchDelay,
}, nil
}

Expand Down

0 comments on commit 7d88c25

Please sign in to comment.