Skip to content

Commit c536370

Browse files
authored
Merge pull request #155737 from kev-cao/blathers/backport-release-25.4-155292
release-25.4: logical: external conn should not require db name
2 parents fea34e8 + e6900c0 commit c536370

File tree

2 files changed

+53
-8
lines changed

2 files changed

+53
-8
lines changed

pkg/crosscluster/logical/create_logical_replication_stmt.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ SELECT
6565
id AS job_id,
6666
crdb_internal.pb_to_json(
6767
'cockroach.sql.jobs.jobspb.Payload',
68-
payload)->'logicalReplicationDetails'->>'parentId' AS parent_id
69-
FROM crdb_internal.system_jobs
68+
payload)->'logicalReplicationDetails'->>'parentId' AS parent_id
69+
FROM crdb_internal.system_jobs
7070
WHERE job_type = 'LOGICAL REPLICATION'
7171
) AS t
7272
WHERE t.parent_id = $1
@@ -359,10 +359,12 @@ func (r *ResolvedDestObjects) TargetDescription() string {
359359
return targetDescription
360360
}
361361

362+
// TargetTableNames returns the fully qualified names of the resolved target
363+
// tables.
362364
func (r *ResolvedDestObjects) TargetTableNames() []string {
363-
var targetTableNames []string
365+
targetTableNames := make([]string, len(r.TableNames))
364366
for i := range r.TableNames {
365-
targetTableNames = append(targetTableNames, r.TableNames[i].Table())
367+
targetTableNames[i] = r.TableNames[i].FQString()
366368
}
367369
return targetTableNames
368370
}

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1727,12 +1727,12 @@ func GetReverseJobID(
17271727
var jobID jobspb.JobID
17281728
testutils.SucceedsSoon(t, func() error {
17291729
err := db.DB.QueryRowContext(ctx, `
1730-
SELECT id
1731-
FROM system.jobs
1732-
WHERE job_type = 'LOGICAL REPLICATION'
1730+
SELECT id
1731+
FROM system.jobs
1732+
WHERE job_type = 'LOGICAL REPLICATION'
17331733
AND id != $1
17341734
AND created > $2
1735-
ORDER BY created DESC
1735+
ORDER BY created DESC
17361736
LIMIT 1`,
17371737
parentID, created).Scan(&jobID)
17381738
if err != nil {
@@ -2905,3 +2905,46 @@ func TestGetWriterType(t *testing.T) {
29052905
require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt)
29062906
})
29072907
}
2908+
2909+
func TestLogicalReplicationExternalConnWithoutDBName(t *testing.T) {
2910+
defer leaktest.AfterTest(t)()
2911+
defer log.Scope(t).Close(t)
2912+
2913+
ctx := context.Background()
2914+
2915+
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
2916+
defer server.Stopper().Stop(ctx)
2917+
2918+
dbA.Exec(t, "CREATE TABLE a.public.foo (x INT PRIMARY KEY)")
2919+
dbA.Exec(t, "INSERT INTO a.public.foo SELECT * FROM generate_series(1, 10)")
2920+
dbA.Exec(t, "CREATE USER userA WITH PASSWORD '123'")
2921+
dbA.Exec(t, "GRANT REPLICATIONSOURCE, REPLICATIONDEST ON TABLE a.public.foo TO userA")
2922+
dbAURL := replicationtestutils.GetExternalConnectionURI(
2923+
t, s, s, serverutils.ClientCerts(false), serverutils.UserPassword("userA", "123"),
2924+
)
2925+
2926+
dbB.Exec(t, "CREATE USER userB WITH PASSWORD '123'")
2927+
dbB.Exec(t, "GRANT CREATE ON DATABASE b TO userB")
2928+
dbBURL := replicationtestutils.GetExternalConnectionURI(
2929+
t, s, s, serverutils.ClientCerts(false), serverutils.UserPassword("userB", "123"),
2930+
)
2931+
2932+
dbBAsUser := sqlutils.MakeSQLRunner(s.SQLConn(
2933+
t,
2934+
serverutils.DBName("b"),
2935+
serverutils.ClientCerts(false),
2936+
serverutils.UserPassword("userB", "123"),
2937+
))
2938+
2939+
var jobID jobspb.JobID
2940+
dbBAsUser.QueryRow(
2941+
t,
2942+
"CREATE LOGICALLY REPLICATED TABLE b.public.foo FROM TABLE a.public.foo ON $1 WITH BIDIRECTIONAL ON $2",
2943+
dbAURL.String(),
2944+
dbBURL.String(),
2945+
).Scan(&jobID)
2946+
WaitUntilReplicatedTime(t, s.Clock().Now(), dbB, jobID)
2947+
2948+
reverseJobID := GetReverseJobID(ctx, t, dbA, jobID)
2949+
WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, reverseJobID)
2950+
}

0 commit comments

Comments
 (0)