Skip to content

Commit

Permalink
Move estimate into function and fix a test
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanslade committed Jan 3, 2025
1 parent c8a6ba0 commit 150e1c9
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
42 changes: 27 additions & 15 deletions pkg/migrations/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,9 @@ func Backfill(ctx context.Context, conn db.DB, table *schema.Table, batchSize in
return BackfillNotPossibleError{Table: table.Name}
}

var total int64

// Try and get estimated row count
row := conn.QueryRowContext(ctx, `SELECT n_live_tup AS estimate
FROM pg_stat_user_tables
WHERE relname = $1`, table.Name)
if err := row.Scan(&total); err != nil {
return fmt.Errorf("scanning row count estimate for %q: %w", table.Name, err)
}
// If the estimate is zero, fall back to full count
if total == 0 {
row = conn.QueryRowContext(ctx, fmt.Sprintf(`SELECT count(*) from %s`, table.Name))
if err := row.Scan(&total); err != nil {
return fmt.Errorf("scanning row count for %q: %w", table.Name, err)
}
total, err := getRowCount(ctx, conn, table.Name)
if err != nil {
return fmt.Errorf("get row count for %q: %w", table.Name, err)
}

// Create a batcher for the table.
Expand Down Expand Up @@ -72,6 +60,30 @@ func Backfill(ctx context.Context, conn db.DB, table *schema.Table, batchSize in
return nil
}

// getRowCount will attempt to get the row count for the given table. It first attempts to get an
// estimate and if that is zero, falls back to a full table scan.
func getRowCount(ctx context.Context, conn db.DB, tableName string) (int64, error) {
var total int64
// Try and get estimated row count
row := conn.QueryRowContext(ctx, `
SELECT n_live_tup AS estimate
FROM pg_stat_user_tables
WHERE relname = $1`, tableName)
if err := row.Scan(&total); err != nil {
return 0, fmt.Errorf("scanning row count estimate for %q: %w", tableName, err)
}
if total > 0 {
return total, nil
}

// If the estimate is zero, fall back to full count
row = conn.QueryRowContext(ctx, fmt.Sprintf(`SELECT count(*) from %s`, tableName))
if err := row.Scan(&total); err != nil {
return 0, fmt.Errorf("scanning row count for %q: %w", tableName, err)
}
return total, nil
}

// checkBackfill will return an error if the backfill operation is not supported.
func checkBackfill(table *schema.Table) error {
cols := getIdentityColumns(table)
Expand Down
2 changes: 1 addition & 1 deletion pkg/roll/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func TestCallbacksAreInvokedOnMigrationStart(t *testing.T) {

// Define a mock callback
invoked := false
cb := func(n int64) { invoked = true }
cb := func(n, total int64) { invoked = true }

// Start a migration that requires a backfill
err = mig.Start(ctx, &migrations.Migration{
Expand Down

0 comments on commit 150e1c9

Please sign in to comment.