Skip to content

Commit

Permalink
Add a 'set replica identity' operation (#201)
Browse files Browse the repository at this point in the history
Add support for a new **set replica identity** operation type. The new
operation looks like this:

```json
{
  "name": "29_set_replica_identity",
  "operations": [
    {
      "set_replica_identity": {
        "table": "fruits",
        "identity": {
          "type": "full"
        }
      }
    }
  ]
}
```

This sets the replica identity for the `fruits` table to `FULL`. The
other supported operation types are 'nothing', 'default', and 'index'.
If the replica identity is being set to an index, the operation looks
like:

```json
{
  "name": "29_set_replica_identity",
  "operations": [
    {
      "set_replica_identity": {
        "table": "fruits",
        "identity": {
          "type": "index",
          "index": "some_index_name"
        }
      }
    }
  ]
}
```

The replica identity is set directly on the underlying table on
operation start. This means that both versions of the table exposed in
the new and old versioned views will have the new replica identity set.

The docs have been updated with the details of the new operation
[here](https://github.com/xataio/pgroll/blob/set-replica-identity/docs/README.md#set-replica-identity).
  • Loading branch information
andrew-farries authored Nov 8, 2023
1 parent edc78ac commit 4f4d549
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 10 deletions.
26 changes: 26 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* [Drop table](#drop-table)
* [Raw SQL](#raw-sql)
* [Rename table](#rename-table)
* [Set replica identity](#set-replica-identity)

## Concepts

Expand Down Expand Up @@ -674,6 +675,7 @@ See the [examples](../examples) directory for examples of each kind of operation
* [Drop table](#drop-table)
* [Raw SQL](#raw-sql)
* [Rename table](#rename-table)
* [Set replica identity](#set-replica-identity)

### Add column

Expand Down Expand Up @@ -1048,3 +1050,27 @@ A rename table operation renames a table.
Example **rename table** migrations:

* [04_rename_table.json](../examples/04_rename_table.json)

### Set replica identity

A set replica identity operation sets the replica identity for a table.

**set replica identity** operations have this structure:

```json
{
"set_replica_identity": {
"table": "name of the table",
"identity": {
"type": "full | default | nothing | index"
"index": "name of the index, if type is 'index'"
}
}
}
```

:warning: A **set replica identity** operation is applied directly to the underlying table on migration start. This means that both versions of the table exposed in the old and new version schemas will have the new replica identity set. :warning:

Example **set replica identity** migrations:

* [29_set_replica_identity.json](../examples/29_set_replica_identity.json)
14 changes: 14 additions & 0 deletions examples/29_set_replica_identity.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "29_set_replica_identity",
"operations": [
{
"set_replica_identity": {
"table": "fruits",
"identity": {
"type": "index",
"index": "_pgroll_new_fruits_pkey"
}
}
}
]
}
9 changes: 9 additions & 0 deletions pkg/migrations/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,12 @@ type InvalidPrimaryKeyError struct {
func (e InvalidPrimaryKeyError) Error() string {
return fmt.Sprintf("primary key on table %q must be defined on exactly one column, found %d", e.Table, e.Fields)
}

type InvalidReplicaIdentityError struct {
Table string
Identity string
}

func (e InvalidReplicaIdentityError) Error() string {
return fmt.Sprintf("replica identity on table %q must be one of 'NOTHING', 'DEFAULT', 'INDEX' or 'FULL', found %q", e.Table, e.Identity)
}
27 changes: 17 additions & 10 deletions pkg/migrations/op_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@ import (
type OpName string

const (
OpNameCreateTable OpName = "create_table"
OpNameRenameTable OpName = "rename_table"
OpNameDropTable OpName = "drop_table"
OpNameAddColumn OpName = "add_column"
OpNameDropColumn OpName = "drop_column"
OpNameAlterColumn OpName = "alter_column"
OpNameCreateIndex OpName = "create_index"
OpNameDropIndex OpName = "drop_index"
OpNameDropConstraint OpName = "drop_constraint"
OpRawSQLName OpName = "sql"
OpNameCreateTable OpName = "create_table"
OpNameRenameTable OpName = "rename_table"
OpNameDropTable OpName = "drop_table"
OpNameAddColumn OpName = "add_column"
OpNameDropColumn OpName = "drop_column"
OpNameAlterColumn OpName = "alter_column"
OpNameCreateIndex OpName = "create_index"
OpNameDropIndex OpName = "drop_index"
OpNameDropConstraint OpName = "drop_constraint"
OpNameSetReplicaIdentity OpName = "set_replica_identity"
OpRawSQLName OpName = "sql"

// Internal operation types used by `alter_column`
OpNameRenameColumn OpName = "rename_column"
Expand Down Expand Up @@ -95,6 +96,9 @@ func (v *Operations) UnmarshalJSON(data []byte) error {
case OpNameDropConstraint:
item = &OpDropConstraint{}

case OpNameSetReplicaIdentity:
item = &OpSetReplicaIdentity{}

case OpNameAlterColumn:
item = &OpAlterColumn{}

Expand Down Expand Up @@ -172,6 +176,9 @@ func OperationName(op Operation) OpName {
case *OpDropConstraint:
return OpNameDropConstraint

case *OpSetReplicaIdentity:
return OpNameSetReplicaIdentity

case *OpAlterColumn:
return OpNameAlterColumn

Expand Down
21 changes: 21 additions & 0 deletions pkg/migrations/op_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,27 @@ func IndexMustNotExist(t *testing.T, db *sql.DB, schema, table, index string) {
}
}

func ReplicaIdentityMustBe(t *testing.T, db *sql.DB, schema, table, replicaIdentity string) {
t.Helper()

var actualReplicaIdentity string
err := db.QueryRow(`
SELECT c.relreplident
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r' -- regular table
AND n.nspname = $1
AND c.relname = $2;
`, schema, table).Scan(&actualReplicaIdentity)
if err != nil {
t.Fatal(err)
}

if replicaIdentity != actualReplicaIdentity {
t.Fatalf("Expected replica identity to be %q, got %q", replicaIdentity, actualReplicaIdentity)
}
}

func indexExists(t *testing.T, db *sql.DB, schema, table, index string) bool {
t.Helper()

Expand Down
70 changes: 70 additions & 0 deletions pkg/migrations/op_set_replica_identity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// SPDX-License-Identifier: Apache-2.0

package migrations

import (
"context"
"database/sql"
"fmt"
"slices"
"strings"

"github.com/lib/pq"
"github.com/xataio/pgroll/pkg/schema"
)

type OpSetReplicaIdentity struct {
Table string `json:"table"`
Identity ReplicaIdentity `json:"identity"`
}

type ReplicaIdentity struct {
Type string `json:"type"`
Index string `json:"index"`
}

var _ Operation = (*OpSetReplicaIdentity)(nil)

func (o *OpSetReplicaIdentity) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
// build the correct form of the `SET REPLICA IDENTITY` statement based on the`identity type
identitySQL := strings.ToUpper(o.Identity.Type)
if identitySQL == "INDEX" {
identitySQL = fmt.Sprintf("USING INDEX %s", pq.QuoteIdentifier(o.Identity.Index))
}

// set the replica identity on the underlying table
_, err := conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s REPLICA IDENTITY %s", o.Table, identitySQL))
return err
}

func (o *OpSetReplicaIdentity) Complete(ctx context.Context, conn *sql.DB) error {
// No-op
return nil
}

func (o *OpSetReplicaIdentity) Rollback(ctx context.Context, conn *sql.DB) error {
// No-op
return nil
}

func (o *OpSetReplicaIdentity) Validate(ctx context.Context, s *schema.Schema) error {
identityType := strings.ToUpper(o.Identity.Type)

table := s.GetTable(o.Table)
if table == nil {
return TableDoesNotExistError{Name: o.Table}
}

identities := []string{"NOTHING", "DEFAULT", "INDEX", "FULL"}
if !slices.Contains(identities, identityType) {
return InvalidReplicaIdentityError{Table: o.Table, Identity: o.Identity.Type}
}

if identityType == "INDEX" {
if _, ok := table.Indexes[o.Identity.Index]; !ok {
return IndexDoesNotExistError{Name: o.Identity.Index}
}
}

return nil
}
Loading

0 comments on commit 4f4d549

Please sign in to comment.