Skip to content

Commit 553aa73

Browse files
craig[bot]wenyihu6rharding6373sravotto
committed
148241: kvserver: enable kv.rangefeed.buffered_sender.enabled by default r=stevendanna a=wenyihu6 This patch enables kv.rangefeed.buffered_sender.enabled by default. We've gained confidence in the feature through scale testing and enabling it metamorphically since v25.2. We also currently don't believe enabling this feature would cause any performance regressions or negative impact. Epic: none Release note: none 148698: changefeedccl: add cluster setting for changefeed max retry backoff r=rharding6373 a=rharding6373 When changefeeds enter a high-level retry loop, e.g. as part of a rolling restart, there is an exponential backoff applied. The default max backoff was 10m, but due to considerations in #146448 a lower 1m max was considered for some rolling restart cases. This PR makes the max backoff configurable via the non-pubic cluster setting changefeed.max_retry_backoff, so that most users can keep the old setting of 10m, which is better for degenerate scenarios when changefeeds might try to retry frequently due to cluster instability. It also adds a separate cluster setting, changefeed.retry_backoff_reset, which is the amount of time between retries before the backoff timer resets. Both settings have a default of 10m. Epic: none Fixes: #148467 Release note: None 148756: roachtest: adding defensive code in ceph/reef test r=sravotto a=sravotto We have seen sporadic failures in the ceph tests, due to failures in creating users in the ceph object gateway. To address this we are adding code to check that the gateway is up by submitting a read only request, before attempting to add the user. Epic: none Fixes: #148731 Release note: None Co-authored-by: wenyihu6 <[email protected]> Co-authored-by: rharding6373 <[email protected]> Co-authored-by: Silvano Ravotto <[email protected]>
4 parents f7b60e4 + fa5320c + 694c72d + d598226 commit 553aa73

File tree

5 files changed

+58
-12
lines changed

5 files changed

+58
-12
lines changed

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,9 @@ func coreChangefeed(
411411
p.ExtendedEvalContext().ChangefeedState = localState
412412
knobs, _ := p.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs)
413413

414-
for r := getRetry(ctx); ; {
414+
maxBackoff := changefeedbase.MaxRetryBackoff.Get(&p.ExecCfg().Settings.SV)
415+
backoffReset := changefeedbase.RetryBackoffReset.Get(&p.ExecCfg().Settings.SV)
416+
for r := getRetry(ctx, maxBackoff, backoffReset); ; {
415417
if !r.Next() {
416418
// Retry loop exits when context is canceled.
417419
log.Infof(ctx, "core changefeed retry loop exiting: %s", ctx.Err())
@@ -1441,7 +1443,9 @@ func (b *changefeedResumer) resumeWithRetries(
14411443
b.mu.perNodeAggregatorStats[componentID] = *meta
14421444
}
14431445

1444-
for r := getRetry(ctx); r.Next(); {
1446+
maxBackoff := changefeedbase.MaxRetryBackoff.Get(&execCfg.Settings.SV)
1447+
backoffReset := changefeedbase.RetryBackoffReset.Get(&execCfg.Settings.SV)
1448+
for r := getRetry(ctx, maxBackoff, backoffReset); r.Next(); {
14451449
flowErr := maybeUpgradePreProductionReadyExpression(ctx, jobID, details, jobExec)
14461450

14471451
if flowErr == nil {

pkg/ccl/changefeedccl/changefeedbase/settings.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,3 +349,23 @@ var Quantize = settings.RegisterDurationSettingWithExplicitUnit(
349349
time.Duration(metamorphic.ConstantWithTestRange("changefeed.resolved_timestamp.granularity", 1, 0, 10))*time.Second,
350350
settings.DurationWithMinimum(0),
351351
)
352+
353+
// MaxRetryBackoff is the maximum time a changefeed will backoff when in
354+
// a top-level retry loop, for example during rolling restarts.
355+
var MaxRetryBackoff = settings.RegisterDurationSettingWithExplicitUnit(
356+
settings.ApplicationLevel,
357+
"changefeed.max_retry_backoff",
358+
"the maximum time a changefeed will backoff when retrying after a restart and how long between retries before backoff resets",
359+
10*time.Minute, /* defaultValue */
360+
settings.DurationInRange(1*time.Second, 1*time.Hour),
361+
)
362+
363+
// RetryBackoffReset is the time between changefeed retries before the
364+
// backoff timer resets.
365+
var RetryBackoffReset = settings.RegisterDurationSettingWithExplicitUnit(
366+
settings.ApplicationLevel,
367+
"changefeed.retry_backoff_reset",
368+
"the time between changefeed retries before the backoff timer resets",
369+
10*time.Minute, /* defaultValue */
370+
settings.DurationInRange(1*time.Second, 1*time.Hour),
371+
)

pkg/ccl/changefeedccl/retry.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ var useFastRetry = envutil.EnvOrDefaultBool(
1818
"COCKROACH_CHANGEFEED_TESTING_FAST_RETRY", false)
1919

2020
// getRetry returns retry object for changefeed.
21-
func getRetry(ctx context.Context) Retry {
21+
func getRetry(ctx context.Context, maxBackoff, backoffReset time.Duration) Retry {
2222
opts := retry.Options{
2323
InitialBackoff: 1 * time.Second,
2424
Multiplier: 2,
25-
MaxBackoff: 1 * time.Minute,
25+
MaxBackoff: maxBackoff,
2626
}
2727

2828
if useFastRetry {
@@ -33,7 +33,8 @@ func getRetry(ctx context.Context) Retry {
3333
}
3434
}
3535

36-
return Retry{Retry: retry.StartWithCtx(ctx, opts)}
36+
return Retry{Retry: retry.StartWithCtx(ctx, opts),
37+
resetRetryAfter: backoffReset}
3738
}
3839

3940
func testingUseFastRetry() func() {
@@ -43,16 +44,15 @@ func testingUseFastRetry() func() {
4344
}
4445
}
4546

46-
// reset retry state after changefeed ran for that much time
47-
// without errors.
48-
const resetRetryAfter = 10 * time.Minute
49-
5047
// Retry is a thin wrapper around retry.Retry which
5148
// resets retry state if changefeed been running for sufficiently
5249
// long time.
5350
type Retry struct {
5451
retry.Retry
5552
lastRetry time.Time
53+
// reset retry state after changefeed ran for that much time
54+
// without errors.
55+
resetRetryAfter time.Duration
5656
}
5757

5858
// Next returns whether the retry loop should continue, and blocks for the
@@ -63,7 +63,7 @@ func (r *Retry) Next() bool {
6363
defer func() {
6464
r.lastRetry = timeutil.Now()
6565
}()
66-
if timeutil.Since(r.lastRetry) > resetRetryAfter {
66+
if timeutil.Since(r.lastRetry) > r.resetRetryAfter {
6767
r.Reset()
6868
}
6969
return r.Retry.Next()

pkg/cmd/roachtest/tests/s3_microceph.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@ import (
1010
"fmt"
1111
"net/url"
1212
"path/filepath"
13+
"time"
1314

1415
"github.com/cockroachdb/cockroach/pkg/cloud/amazon"
1516
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
1617
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
1718
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
19+
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
20+
"github.com/cockroachdb/cockroach/pkg/util/retry"
1821
)
1922

2023
// cephDisksScript creates 3 4GB loop devices, e.g. virtual block devices that allows
@@ -124,7 +127,9 @@ func (m cephManager) install(ctx context.Context) {
124127
rgwCmd = rgwCmd + ` --ssl-certificate="$(base64 -w0 certs/node.crt)" --ssl-private-key="$(base64 -w0 certs/node.key)"`
125128
}
126129
m.run(ctx, `starting object gateway`, rgwCmd)
127-
130+
// We have seen occasional failures in creating users, so we
131+
// wait until a read only request succeeds before proceeding.
132+
m.checkRGW(ctx)
128133
m.run(ctx, `creating backup user`,
129134
`sudo radosgw-admin user create --uid=backup --display-name=backup`)
130135
m.run(ctx, `add keys to the user`,
@@ -166,3 +171,20 @@ func (m cephManager) run(ctx context.Context, msg string, cmd ...string) {
166171
m.c.Run(ctx, option.WithNodes(m.cephNodes), cmd...)
167172
m.t.Status(msg, " done")
168173
}
174+
175+
// checkRGW verifies that the Ceph Object Gateway is up.
176+
func (m cephManager) checkRGW(ctx context.Context) {
177+
m.t.Status("waiting for Ceph Object Gateway...")
178+
if err := m.c.RunE(ctx,
179+
option.WithNodes(m.cephNodes).
180+
WithRetryOpts(retry.Options{
181+
InitialBackoff: 2 * time.Second,
182+
MaxBackoff: 30 * time.Second,
183+
MaxRetries: 10,
184+
}).
185+
WithShouldRetryFn(func(*install.RunResultDetails) bool { return true }),
186+
`sudo radosgw-admin user list`,
187+
); err != nil {
188+
m.t.Error("failed to connect to Ceph Object Gateway", err)
189+
}
190+
}

pkg/kv/kvserver/replica_rangefeed.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ var RangefeedUseBufferedSender = settings.RegisterBoolSetting(
9090
"kv.rangefeed.buffered_sender.enabled",
9191
"use buffered sender for all range feeds instead of buffering events "+
9292
"separately per client per range",
93-
metamorphic.ConstantWithTestBool("kv.rangefeed.buffered_sender.enabled", false),
93+
metamorphic.ConstantWithTestBool("kv.rangefeed.buffered_sender.enabled", true),
9494
)
9595

9696
func init() {

0 commit comments

Comments
 (0)