diff --git a/pkg/crosscluster/logical/create_logical_replication_stmt.go b/pkg/crosscluster/logical/create_logical_replication_stmt.go index b357d4c16764..50c83a9efe65 100644 --- a/pkg/crosscluster/logical/create_logical_replication_stmt.go +++ b/pkg/crosscluster/logical/create_logical_replication_stmt.go @@ -65,8 +65,8 @@ SELECT id AS job_id, crdb_internal.pb_to_json( 'cockroach.sql.jobs.jobspb.Payload', - payload)->'logicalReplicationDetails'->>'parentId' AS parent_id - FROM crdb_internal.system_jobs + payload)->'logicalReplicationDetails'->>'parentId' AS parent_id + FROM crdb_internal.system_jobs WHERE job_type = 'LOGICAL REPLICATION' ) AS t WHERE t.parent_id = $1 @@ -359,10 +359,12 @@ func (r *ResolvedDestObjects) TargetDescription() string { return targetDescription } +// TargetTableNames returns the fully qualified names of the resolved target +// tables. func (r *ResolvedDestObjects) TargetTableNames() []string { - var targetTableNames []string + targetTableNames := make([]string, len(r.TableNames)) for i := range r.TableNames { - targetTableNames = append(targetTableNames, r.TableNames[i].Table()) + targetTableNames[i] = r.TableNames[i].FQString() } return targetTableNames } diff --git a/pkg/crosscluster/logical/logical_replication_job_test.go b/pkg/crosscluster/logical/logical_replication_job_test.go index 1cf230efbd00..ccec056dedf5 100644 --- a/pkg/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/crosscluster/logical/logical_replication_job_test.go @@ -1727,12 +1727,12 @@ func GetReverseJobID( var jobID jobspb.JobID testutils.SucceedsSoon(t, func() error { err := db.DB.QueryRowContext(ctx, ` - SELECT id - FROM system.jobs - WHERE job_type = 'LOGICAL REPLICATION' + SELECT id + FROM system.jobs + WHERE job_type = 'LOGICAL REPLICATION' AND id != $1 AND created > $2 - ORDER BY created DESC + ORDER BY created DESC LIMIT 1`, parentID, created).Scan(&jobID) if err != nil { @@ -2905,3 +2905,96 @@ func TestGetWriterType(t *testing.T) { require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt) }) } + +func TestLogicalReplicationExternalConnWithoutDBName(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + server, s, dbA, dbB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1) + defer server.Stopper().Stop(ctx) + + dbA.Exec(t, "CREATE TABLE a.public.foo (x INT PRIMARY KEY)") + dbA.Exec(t, "INSERT INTO a.public.foo SELECT * FROM generate_series(1, 10)") + dbA.Exec(t, "CREATE USER userA WITH PASSWORD '123'") + dbA.Exec(t, "GRANT REPLICATIONSOURCE, REPLICATIONDEST ON TABLE a.public.foo TO userA") + dbAURL := replicationtestutils.GetExternalConnectionURI( + t, s, s, serverutils.ClientCerts(false), serverutils.UserPassword("userA", "123"), + ) + + dbB.Exec(t, "CREATE USER userB WITH PASSWORD '123'") + dbB.Exec(t, "GRANT CREATE ON DATABASE b TO userB") + dbBURL := replicationtestutils.GetExternalConnectionURI( + t, s, s, serverutils.ClientCerts(false), serverutils.UserPassword("userB", "123"), + ) + + dbBAsUser := sqlutils.MakeSQLRunner(s.SQLConn( + t, + serverutils.DBName("b"), + serverutils.ClientCerts(false), + serverutils.UserPassword("userB", "123"), + )) + + var jobID jobspb.JobID + dbBAsUser.QueryRow( + t, + "CREATE LOGICALLY REPLICATED TABLE b.public.foo FROM TABLE a.public.foo ON $1 WITH BIDIRECTIONAL ON $2", + dbAURL.String(), + dbBURL.String(), + ).Scan(&jobID) + WaitUntilReplicatedTime(t, s.Clock().Now(), dbB, jobID) + + reverseJobID := GetReverseJobID(ctx, t, dbA, jobID) + WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, reverseJobID) +} + +func TestLogicalReplicationCapitalTableName(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + server, s, dbA, dbB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1) + defer server.Stopper().Stop(ctx) + + dbA.Exec(t, `CREATE TABLE a.public."Foo" (x INT PRIMARY KEY)`) + dbA.Exec(t, `INSERT INTO a.public."Foo" SELECT * FROM generate_series(1, 10)`) + dbA.Exec(t, "CREATE USER userA WITH PASSWORD '123'") + dbA.Exec(t, `GRANT REPLICATIONSOURCE, REPLICATIONDEST ON TABLE a.public."Foo" TO userA`) + dbAURL := replicationtestutils.GetExternalConnectionURI( + t, s, s, serverutils.ClientCerts(false), serverutils.UserPassword("userA", "123"), serverutils.DBName("a"), + ) + + dbB.Exec(t, "CREATE USER userB WITH PASSWORD '123'") + dbB.Exec(t, "GRANT CREATE ON DATABASE b TO userB") + dbBURL := replicationtestutils.GetExternalConnectionURI( + t, s, s, serverutils.ClientCerts(false), serverutils.UserPassword("userB", "123"), serverutils.DBName("b"), + ) + + dbBAsUser := sqlutils.MakeSQLRunner(s.SQLConn( + t, + serverutils.DBName("b"), + serverutils.ClientCerts(false), + serverutils.UserPassword("userB", "123"), + )) + + // Unidirectional into B. + dbBAsUser.Exec(t, `CREATE TABLE b.public."uFoo" (x INT PRIMARY KEY)`) + var uJobID jobspb.JobID + dbBAsUser.QueryRow(t, `CREATE LOGICAL REPLICATION STREAM FROM TABLE a.public."Foo" ON $1 INTO TABLE b.public."uFoo"`, dbAURL.String()).Scan(&uJobID) + WaitUntilReplicatedTime(t, s.Clock().Now(), dbB, uJobID) + + // Bidirectional + var jobID jobspb.JobID + dbBAsUser.QueryRow( + t, + `CREATE LOGICALLY REPLICATED TABLE b.public."Foo" FROM TABLE a.public."Foo" ON $1 WITH BIDIRECTIONAL ON $2`, + dbAURL.String(), + dbBURL.String(), + ).Scan(&jobID) + WaitUntilReplicatedTime(t, s.Clock().Now(), dbB, jobID) + + reverseJobID := GetReverseJobID(ctx, t, dbA, jobID) + WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, reverseJobID) +}