Skip to content

Commit e6900c0

Browse files
committed
logical: external conn should not require db name
LDR currently erroneously requires the db name in the external connection with setting up a bidirectional connection. This was determined to be caused by the fact that the privilege lookup on the source cluster was using unqualified table names during the reverse stream setup. This commit fixes the issue by using fully qualified table names. Fixes: #152395 Release note: LDR no longer requires the database name to be specified in the external connection URI when setting up a bidirectional stream.
1 parent fea34e8 commit e6900c0

File tree

2 files changed

+53
-8
lines changed

2 files changed

+53
-8
lines changed

pkg/crosscluster/logical/create_logical_replication_stmt.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ SELECT
6565
id AS job_id,
6666
crdb_internal.pb_to_json(
6767
'cockroach.sql.jobs.jobspb.Payload',
68-
payload)->'logicalReplicationDetails'->>'parentId' AS parent_id
69-
FROM crdb_internal.system_jobs
68+
payload)->'logicalReplicationDetails'->>'parentId' AS parent_id
69+
FROM crdb_internal.system_jobs
7070
WHERE job_type = 'LOGICAL REPLICATION'
7171
) AS t
7272
WHERE t.parent_id = $1
@@ -359,10 +359,12 @@ func (r *ResolvedDestObjects) TargetDescription() string {
359359
return targetDescription
360360
}
361361

362+
// TargetTableNames returns the fully qualified names of the resolved target
363+
// tables.
362364
func (r *ResolvedDestObjects) TargetTableNames() []string {
363-
var targetTableNames []string
365+
targetTableNames := make([]string, len(r.TableNames))
364366
for i := range r.TableNames {
365-
targetTableNames = append(targetTableNames, r.TableNames[i].Table())
367+
targetTableNames[i] = r.TableNames[i].FQString()
366368
}
367369
return targetTableNames
368370
}

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1727,12 +1727,12 @@ func GetReverseJobID(
17271727
var jobID jobspb.JobID
17281728
testutils.SucceedsSoon(t, func() error {
17291729
err := db.DB.QueryRowContext(ctx, `
1730-
SELECT id
1731-
FROM system.jobs
1732-
WHERE job_type = 'LOGICAL REPLICATION'
1730+
SELECT id
1731+
FROM system.jobs
1732+
WHERE job_type = 'LOGICAL REPLICATION'
17331733
AND id != $1
17341734
AND created > $2
1735-
ORDER BY created DESC
1735+
ORDER BY created DESC
17361736
LIMIT 1`,
17371737
parentID, created).Scan(&jobID)
17381738
if err != nil {
@@ -2905,3 +2905,46 @@ func TestGetWriterType(t *testing.T) {
29052905
require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt)
29062906
})
29072907
}
2908+
2909+
func TestLogicalReplicationExternalConnWithoutDBName(t *testing.T) {
2910+
defer leaktest.AfterTest(t)()
2911+
defer log.Scope(t).Close(t)
2912+
2913+
ctx := context.Background()
2914+
2915+
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
2916+
defer server.Stopper().Stop(ctx)
2917+
2918+
dbA.Exec(t, "CREATE TABLE a.public.foo (x INT PRIMARY KEY)")
2919+
dbA.Exec(t, "INSERT INTO a.public.foo SELECT * FROM generate_series(1, 10)")
2920+
dbA.Exec(t, "CREATE USER userA WITH PASSWORD '123'")
2921+
dbA.Exec(t, "GRANT REPLICATIONSOURCE, REPLICATIONDEST ON TABLE a.public.foo TO userA")
2922+
dbAURL := replicationtestutils.GetExternalConnectionURI(
2923+
t, s, s, serverutils.ClientCerts(false), serverutils.UserPassword("userA", "123"),
2924+
)
2925+
2926+
dbB.Exec(t, "CREATE USER userB WITH PASSWORD '123'")
2927+
dbB.Exec(t, "GRANT CREATE ON DATABASE b TO userB")
2928+
dbBURL := replicationtestutils.GetExternalConnectionURI(
2929+
t, s, s, serverutils.ClientCerts(false), serverutils.UserPassword("userB", "123"),
2930+
)
2931+
2932+
dbBAsUser := sqlutils.MakeSQLRunner(s.SQLConn(
2933+
t,
2934+
serverutils.DBName("b"),
2935+
serverutils.ClientCerts(false),
2936+
serverutils.UserPassword("userB", "123"),
2937+
))
2938+
2939+
var jobID jobspb.JobID
2940+
dbBAsUser.QueryRow(
2941+
t,
2942+
"CREATE LOGICALLY REPLICATED TABLE b.public.foo FROM TABLE a.public.foo ON $1 WITH BIDIRECTIONAL ON $2",
2943+
dbAURL.String(),
2944+
dbBURL.String(),
2945+
).Scan(&jobID)
2946+
WaitUntilReplicatedTime(t, s.Clock().Now(), dbB, jobID)
2947+
2948+
reverseJobID := GetReverseJobID(ctx, t, dbA, jobID)
2949+
WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, reverseJobID)
2950+
}

0 commit comments

Comments
 (0)