Skip to content

Commit d020717

Browse files
craig[bot]Rui Huerikgrinaker
committed
110208: roachtest: backup-restore/mixed-version use system db when querying job status r=rhu713 a=rhu713 Previously in `waitForJobSuccess()` in the backup-restore/mixed-version test, if the job was a full cluster restore that dropped `defaultdb`, the query to the jobs table would fail with `pq: database "defaultdb" is offline: restoring` as our connection was to `defaultdb`, which is dropped during full cluster restore. We fix this by adding an option to connect to a different database in roachtests and connecting to the system database in `waitForJobSuccess()` instead. Fixes #110165 Release note: None 110333: rangefeed: omit txn push attempt when intent queue is empty r=erikgrinaker a=erikgrinaker `hlc.Clock.Now()` mutex acquisition during rangefeed txn push attempts has been seen to be a significant source of contention when running with many rangefeeds. This patch short-circuits the txn push attempt if we're not tracking any intents, which is the common case. Epic: none Release note: None Co-authored-by: Rui Hu <[email protected]> Co-authored-by: Erik Grinaker <[email protected]>
3 parents 7bac4be + 1d53954 + 2a8275d commit d020717

File tree

5 files changed

+45
-15
lines changed

5 files changed

+45
-15
lines changed

pkg/cmd/roachtest/cluster.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2497,15 +2497,20 @@ func (c *clusterImpl) ConnE(
24972497
return nil, err
24982498
}
24992499

2500-
dataSourceName := urls[0]
2500+
u, err := url.Parse(urls[0])
2501+
if err != nil {
2502+
return nil, err
2503+
}
2504+
25012505
if connOptions.User != "" {
2502-
u, err := url.Parse(urls[0])
2503-
if err != nil {
2504-
return nil, err
2505-
}
25062506
u.User = url.User(connOptions.User)
2507-
dataSourceName = u.String()
25082507
}
2508+
2509+
if connOptions.DBName != "" {
2510+
u.Path = connOptions.DBName
2511+
}
2512+
dataSourceName := u.String()
2513+
25092514
if len(connOptions.Options) > 0 {
25102515
vals := make(url.Values)
25112516
for k, v := range connOptions.Options {

pkg/cmd/roachtest/option/connection_options.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717

1818
type ConnOption struct {
1919
User string
20+
DBName string
2021
TenantName string
2122
Options map[string]string
2223
}
@@ -49,3 +50,9 @@ func ConnectTimeout(t time.Duration) func(*ConnOption) {
4950
}
5051
return ConnectionOption("connect_timeout", fmt.Sprintf("%d", sec))
5152
}
53+
54+
func DBName(dbName string) func(*ConnOption) {
55+
return func(option *ConnOption) {
56+
option.DBName = dbName
57+
}
58+
}

pkg/cmd/roachtest/tests/mixed_version_backup.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"github.com/cockroachdb/cockroach/pkg/util/retry"
4646
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
4747
"github.com/cockroachdb/cockroach/pkg/util/version"
48+
"github.com/cockroachdb/errors"
4849
"golang.org/x/sync/errgroup"
4950
)
5051

@@ -1361,19 +1362,30 @@ func (mvb *mixedVersionBackup) backupName(
13611362
// error if the job doesn't succeed within the attempted retries.
13621363
func (mvb *mixedVersionBackup) waitForJobSuccess(
13631364
ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper, jobID int,
1364-
) error {
1365+
) (resErr error) {
13651366
var lastErr error
1366-
node, db := h.RandomDB(rng, mvb.roachNodes)
1367+
node := h.RandomNode(rng, mvb.roachNodes)
13671368
l.Printf("querying job status through node %d", node)
13681369

1370+
db, err := mvb.cluster.ConnE(ctx, l, node, option.DBName("system"))
1371+
if err != nil {
1372+
l.Printf("error connecting to node %d: %v", node, err)
1373+
return err
1374+
}
1375+
defer func() {
1376+
err := db.Close()
1377+
resErr = errors.CombineErrors(resErr, err)
1378+
}()
1379+
13691380
jobsQuery := "system.jobs WHERE id = $1"
13701381
if hasInternalSystemJobs(h) {
13711382
jobsQuery = fmt.Sprintf("(%s)", jobutils.InternalSystemJobsBaseQuery)
13721383
}
1373-
for r := retry.StartWithCtx(ctx, backupCompletionRetryOptions); r.Next(); {
1384+
r := retry.StartWithCtx(ctx, backupCompletionRetryOptions)
1385+
for r.Next() {
13741386
var status string
13751387
var payloadBytes []byte
1376-
err := db.QueryRow(
1388+
err := db.QueryRowContext(ctx,
13771389
fmt.Sprintf(`SELECT status, payload FROM %s`, jobsQuery), jobID,
13781390
).Scan(&status, &payloadBytes)
13791391
if err != nil {
@@ -1404,7 +1416,11 @@ func (mvb *mixedVersionBackup) waitForJobSuccess(
14041416
return nil
14051417
}
14061418

1407-
return fmt.Errorf("waiting for job to finish: %w", lastErr)
1419+
if r.CurrentAttempt() >= backupCompletionRetryOptions.MaxRetries {
1420+
return fmt.Errorf("exhausted all %d retries waiting for job %d to finish, last err: %w", backupCompletionRetryOptions.MaxRetries, jobID, lastErr)
1421+
}
1422+
1423+
return fmt.Errorf("error waiting for job to finish: %w", lastErr)
14081424
}
14091425

14101426
// computeTableContents will generate a list of `tableContents`

pkg/kv/kvserver/rangefeed/processor.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -462,9 +462,9 @@ func (p *LegacyProcessor) run(
462462

463463
// Check whether any unresolved intents need a push.
464464
case <-txnPushTickerC:
465-
// Don't perform transaction push attempts until the resolved
466-
// timestamp has been initialized.
467-
if !p.rts.IsInit() {
465+
// Don't perform transaction push attempts until the resolved timestamp
466+
// has been initialized, or if we're not tracking any intents.
467+
if !p.rts.IsInit() || p.rts.intentQ.Len() == 0 {
468468
continue
469469
}
470470

pkg/kv/kvserver/rangefeed/scheduled_processor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,9 @@ func (p *ScheduledProcessor) processEvents(ctx context.Context) {
178178
}
179179

180180
func (p *ScheduledProcessor) processPushTxn(ctx context.Context) {
181-
if !p.txnPushActive && p.rts.IsInit() {
181+
// NB: Len() check avoids hlc.Clock.Now() mutex acquisition in the common
182+
// case, which can be a significant source of contention.
183+
if !p.txnPushActive && p.rts.IsInit() && p.rts.intentQ.Len() > 0 {
182184
now := p.Clock.Now()
183185
before := now.Add(-p.PushTxnsAge.Nanoseconds(), 0)
184186
oldTxns := p.rts.intentQ.Before(before)

0 commit comments

Comments
 (0)