Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing indexes to duplicated columns #570

Merged
merged 7 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/.ledger
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@
46_alter_column_drop_default.json
47_add_table_foreign_key_constraint.json
48_drop_tickets_check.json
49_unset_not_null_on_indexed_column.json
14 changes: 14 additions & 0 deletions examples/49_unset_not_null_on_indexed_column.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "49_unset_not_null_on_indexed_column",
"operations": [
{
"alter_column": {
"table": "fruits",
"column": "name",
"nullable": true,
"up": "SELECT CASE WHEN name IS NULL THEN 'unknown fruit' ELSE name END",
"down": "name"
}
}
]
}
46 changes: 46 additions & 0 deletions pkg/migrations/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ func (d *Duplicator) Duplicate(ctx context.Context) error {
}
}

// Generate SQL to duplicate any indexes on the columns.
for _, sql := range d.stmtBuilder.duplicateIndexes(d.withoutConstraint, colNames...) {
if _, err := d.conn.ExecContext(ctx, sql); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -204,6 +211,45 @@ func (d *duplicatorStmtBuilder) duplicateForeignKeyConstraints(withoutConstraint
return stmts
}

func (d *duplicatorStmtBuilder) duplicateIndexes(withoutConstraint []string, colNames ...string) []string {
stmts := make([]string, 0, len(d.table.Indexes))
for _, idx := range d.table.Indexes {
if slices.Contains(withoutConstraint, idx.Name) {
continue
}
if _, ok := d.table.UniqueConstraints[idx.Name]; ok && idx.Unique {
// unique constraints are duplicated as unique indexes
continue
}

if duplicatedMember, columns := d.allConstraintColumns(idx.Columns, colNames...); duplicatedMember {
stmtFmt := "CREATE INDEX CONCURRENTLY %s ON %s"
if idx.Unique {
stmtFmt = "CREATE UNIQUE INDEX CONCURRENTLY %s ON %s"
}
stmt := fmt.Sprintf(stmtFmt, pq.QuoteIdentifier(DuplicationName(idx.Name)), pq.QuoteIdentifier(d.table.Name))
if idx.Method != "" {
stmt += fmt.Sprintf(" USING %s", string(idx.Method))
}

stmt += fmt.Sprintf(" (%s)", strings.Join(quoteColumnNames(columns), ", "))

if storageParamStart := strings.Index(idx.Definition, " WITH ("); storageParamStart != -1 {
end := strings.Index(idx.Definition[storageParamStart:], ")")
stmt += idx.Definition[storageParamStart : storageParamStart+end+1]
}

if idx.Predicate != nil {
pred := strings.Replace(*idx.Predicate, strings.Join(idx.Columns, ", "), strings.Join(quoteColumnNames(columns), ", "), 1)
stmt += fmt.Sprintf(" WHERE %s", pred)
}

stmts = append(stmts, stmt)
}
}
return stmts
}

// duplicatedConstraintColumns returns a new slice of constraint columns with
// the columns that are duplicated replaced with temporary names.
func (d *duplicatorStmtBuilder) duplicatedConstraintColumns(constraintColumns []string, duplicatedColumns ...string) []string {
Expand Down
60 changes: 60 additions & 0 deletions pkg/migrations/duplicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,23 @@ var table = &schema.Table{
"fk_city": {Name: "fk_city", Columns: []string{"city"}, ReferencedTable: "cities", ReferencedColumns: []string{"id"}, OnDelete: "NO ACTION"},
"fk_name_nick": {Name: "fk_name_nick", Columns: []string{"name", "nick"}, ReferencedTable: "users", ReferencedColumns: []string{"name", "nick"}, OnDelete: "CASCADE"},
},
Indexes: map[string]schema.Index{
"idx_no_kate": {
Name: "idx_no_kate",
Columns: []string{"name"},
Definition: `CREATE INDEX "idx_name" ON "test_table" USING hash ("name") WITH (fillfactor = 70) WHERE "name" != 'Kate'`,
Predicate: ptr("name != 'Kate'"),
Method: "hash",
},
"idx_email": {
Name: "idx_email",
Columns: []string{"email"},
},
"idx_name_city": {
Name: "idx_name_city",
Columns: []string{"name", "city"},
},
},
}

func TestDuplicateStmtBuilderCheckConstraints(t *testing.T) {
Expand Down Expand Up @@ -174,3 +191,46 @@ func TestDuplicateStmtBuilderForeignKeyConstraints(t *testing.T) {
})
}
}

func TestDuplicateStmtBuilderIndexes(t *testing.T) {
d := &duplicatorStmtBuilder{table}
for name, testCases := range map[string]struct {
columns []string
expectedStmts []string
}{
"single column duplicated": {
columns: []string{"nick"},
expectedStmts: []string{},
},
"single-column index with single column duplicated": {
columns: []string{"email"},
expectedStmts: []string{`CREATE INDEX CONCURRENTLY "_pgroll_dup_idx_email" ON "test_table" ("_pgroll_new_email")`},
},
"single-column index with multiple column duplicated": {
columns: []string{"email", "description"},
expectedStmts: []string{`CREATE INDEX CONCURRENTLY "_pgroll_dup_idx_email" ON "test_table" ("_pgroll_new_email")`},
},
"multi-column index with single column duplicated": {
columns: []string{"name"},
expectedStmts: []string{`CREATE INDEX CONCURRENTLY "_pgroll_dup_idx_name_city" ON "test_table" ("_pgroll_new_name", "city")`, `CREATE INDEX CONCURRENTLY "_pgroll_dup_idx_no_kate" ON "test_table" USING hash ("_pgroll_new_name") WITH (fillfactor = 70) WHERE "_pgroll_new_name" != 'Kate'`},
},
"multi-column index with multiple unrelated column duplicated": {
columns: []string{"name", "description"},
expectedStmts: []string{`CREATE INDEX CONCURRENTLY "_pgroll_dup_idx_name_city" ON "test_table" ("_pgroll_new_name", "city")`, `CREATE INDEX CONCURRENTLY "_pgroll_dup_idx_no_kate" ON "test_table" USING hash ("_pgroll_new_name") WITH (fillfactor = 70) WHERE "_pgroll_new_name" != 'Kate'`},
},
"multi-column index with multiple columns": {
columns: []string{"name", "city"},
expectedStmts: []string{`CREATE INDEX CONCURRENTLY "_pgroll_dup_idx_name_city" ON "test_table" ("_pgroll_new_name", "_pgroll_new_city")`, `CREATE INDEX CONCURRENTLY "_pgroll_dup_idx_no_kate" ON "test_table" USING hash ("_pgroll_new_name") WITH (fillfactor = 70) WHERE "_pgroll_new_name" != 'Kate'`},
},
} {
t.Run(name, func(t *testing.T) {
stmts := d.duplicateIndexes(nil, testCases.columns...)
assert.Equal(t, len(testCases.expectedStmts), len(stmts))
for _, stmt := range stmts {
assert.True(t, slices.Contains(testCases.expectedStmts, stmt))
}
})
}
}

func ptr[T any](x T) *T { return &x }
32 changes: 16 additions & 16 deletions pkg/migrations/rename.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,42 +120,42 @@ func RenameDuplicatedColumn(ctx context.Context, conn db.DB, table *schema.Table
}
}

// Rename any `UNIQUE` indexes on the duplicated column and use them to
// Rename any indexes on the duplicated column and use unique indexes to
// create `UNIQUE` constraints.
for _, ui := range table.Indexes {
if !IsDuplicatedName(ui.Name) {
continue
}
if !ui.Unique {
for _, idx := range table.Indexes {
if !IsDuplicatedName(idx.Name) {
continue
}

if slices.Contains(ui.Columns, TemporaryName(column.Name)) {
// Rename the unique index to its original name
if slices.Contains(idx.Columns, TemporaryName(column.Name)) {
// Rename the index to its original name
renameIndexSQL := fmt.Sprintf(cRenameIndexSQL,
pq.QuoteIdentifier(ui.Name),
pq.QuoteIdentifier(StripDuplicationPrefix(ui.Name)),
pq.QuoteIdentifier(idx.Name),
pq.QuoteIdentifier(StripDuplicationPrefix(idx.Name)),
)

_, err = conn.ExecContext(ctx, renameIndexSQL)
if err != nil {
return fmt.Errorf("failed to rename unique index %q: %w", ui.Name, err)
return fmt.Errorf("failed to rename index %q: %w", idx.Name, err)
}
}

if _, ok := table.UniqueConstraints[StripDuplicationPrefix(idx.Name)]; idx.Unique && ok {
// Create a unique constraint using the unique index
createUniqueConstraintSQL := fmt.Sprintf(cCreateUniqueConstraintSQL,
pq.QuoteIdentifier(table.Name),
pq.QuoteIdentifier(StripDuplicationPrefix(ui.Name)),
pq.QuoteIdentifier(StripDuplicationPrefix(ui.Name)),
pq.QuoteIdentifier(StripDuplicationPrefix(idx.Name)),
pq.QuoteIdentifier(StripDuplicationPrefix(idx.Name)),
)

_, err = conn.ExecContext(ctx, createUniqueConstraintSQL)
if err != nil {
return fmt.Errorf("failed to create unique constraint from index %q: %w", ui.Name, err)
return fmt.Errorf("failed to create unique constraint from index %q: %w", idx.Name, err)
}
// Index no longer exists, remove it from the table
delete(table.Indexes, ui.Name)
}

// Index no longer exists, remove it from the table
delete(table.Indexes, idx.Name)
}

return nil
Expand Down