Skip to content

Commit 694c72d

Browse files
committed
changefeedccl: add cluster setting for changefeed max retry backoff
When changefeeds enter a high-level retry loop, e.g. as part of a rolling restart, there is an exponential backoff applied. The default max backoff was 10m, but due to considerations in #146448 a lower 1m max was considered for some rolling restart cases. This PR makes the max backoff configurable via the non-pubic cluster setting changefeed.max_retry_backoff, so that most users can keep the old setting of 10m, which is better for degenerate scenarios when changefeeds might try to retry frequently due to cluster instability. It also adds a separate cluster setting, changefeed.retry_backoff_reset, which is the amount of time between retries before the backoff timer resets. Both settings have a default of 10m. Epic: none Fixes: #148467 Release note: None
1 parent aed749a commit 694c72d

File tree

3 files changed

+34
-10
lines changed

3 files changed

+34
-10
lines changed

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,9 @@ func coreChangefeed(
411411
p.ExtendedEvalContext().ChangefeedState = localState
412412
knobs, _ := p.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs)
413413

414-
for r := getRetry(ctx); ; {
414+
maxBackoff := changefeedbase.MaxRetryBackoff.Get(&p.ExecCfg().Settings.SV)
415+
backoffReset := changefeedbase.RetryBackoffReset.Get(&p.ExecCfg().Settings.SV)
416+
for r := getRetry(ctx, maxBackoff, backoffReset); ; {
415417
if !r.Next() {
416418
// Retry loop exits when context is canceled.
417419
log.Infof(ctx, "core changefeed retry loop exiting: %s", ctx.Err())
@@ -1441,7 +1443,9 @@ func (b *changefeedResumer) resumeWithRetries(
14411443
b.mu.perNodeAggregatorStats[componentID] = *meta
14421444
}
14431445

1444-
for r := getRetry(ctx); r.Next(); {
1446+
maxBackoff := changefeedbase.MaxRetryBackoff.Get(&execCfg.Settings.SV)
1447+
backoffReset := changefeedbase.RetryBackoffReset.Get(&execCfg.Settings.SV)
1448+
for r := getRetry(ctx, maxBackoff, backoffReset); r.Next(); {
14451449
flowErr := maybeUpgradePreProductionReadyExpression(ctx, jobID, details, jobExec)
14461450

14471451
if flowErr == nil {

pkg/ccl/changefeedccl/changefeedbase/settings.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,3 +349,23 @@ var Quantize = settings.RegisterDurationSettingWithExplicitUnit(
349349
time.Duration(metamorphic.ConstantWithTestRange("changefeed.resolved_timestamp.granularity", 1, 0, 10))*time.Second,
350350
settings.DurationWithMinimum(0),
351351
)
352+
353+
// MaxRetryBackoff is the maximum time a changefeed will backoff when in
354+
// a top-level retry loop, for example during rolling restarts.
355+
var MaxRetryBackoff = settings.RegisterDurationSettingWithExplicitUnit(
356+
settings.ApplicationLevel,
357+
"changefeed.max_retry_backoff",
358+
"the maximum time a changefeed will backoff when retrying after a restart and how long between retries before backoff resets",
359+
10*time.Minute, /* defaultValue */
360+
settings.DurationInRange(1*time.Second, 1*time.Hour),
361+
)
362+
363+
// RetryBackoffReset is the time between changefeed retries before the
364+
// backoff timer resets.
365+
var RetryBackoffReset = settings.RegisterDurationSettingWithExplicitUnit(
366+
settings.ApplicationLevel,
367+
"changefeed.retry_backoff_reset",
368+
"the time between changefeed retries before the backoff timer resets",
369+
10*time.Minute, /* defaultValue */
370+
settings.DurationInRange(1*time.Second, 1*time.Hour),
371+
)

pkg/ccl/changefeedccl/retry.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ var useFastRetry = envutil.EnvOrDefaultBool(
1818
"COCKROACH_CHANGEFEED_TESTING_FAST_RETRY", false)
1919

2020
// getRetry returns retry object for changefeed.
21-
func getRetry(ctx context.Context) Retry {
21+
func getRetry(ctx context.Context, maxBackoff, backoffReset time.Duration) Retry {
2222
opts := retry.Options{
2323
InitialBackoff: 1 * time.Second,
2424
Multiplier: 2,
25-
MaxBackoff: 1 * time.Minute,
25+
MaxBackoff: maxBackoff,
2626
}
2727

2828
if useFastRetry {
@@ -33,7 +33,8 @@ func getRetry(ctx context.Context) Retry {
3333
}
3434
}
3535

36-
return Retry{Retry: retry.StartWithCtx(ctx, opts)}
36+
return Retry{Retry: retry.StartWithCtx(ctx, opts),
37+
resetRetryAfter: backoffReset}
3738
}
3839

3940
func testingUseFastRetry() func() {
@@ -43,16 +44,15 @@ func testingUseFastRetry() func() {
4344
}
4445
}
4546

46-
// reset retry state after changefeed ran for that much time
47-
// without errors.
48-
const resetRetryAfter = 10 * time.Minute
49-
5047
// Retry is a thin wrapper around retry.Retry which
5148
// resets retry state if changefeed been running for sufficiently
5249
// long time.
5350
type Retry struct {
5451
retry.Retry
5552
lastRetry time.Time
53+
// reset retry state after changefeed ran for that much time
54+
// without errors.
55+
resetRetryAfter time.Duration
5656
}
5757

5858
// Next returns whether the retry loop should continue, and blocks for the
@@ -63,7 +63,7 @@ func (r *Retry) Next() bool {
6363
defer func() {
6464
r.lastRetry = timeutil.Now()
6565
}()
66-
if timeutil.Since(r.lastRetry) > resetRetryAfter {
66+
if timeutil.Since(r.lastRetry) > r.resetRetryAfter {
6767
r.Reset()
6868
}
6969
return r.Retry.Next()

0 commit comments

Comments
 (0)