Skip to content

Commit 1ded868

Browse files
craig[bot]andyyang890jbowens
committed
148617: changefeedccl: fix job progress checkpoint fields invariant violation r=asg0451 a=andyyang890 Fixes #148560 Fixes #148620 --- **changefeedccl: avoid creating checkpoint string all the time** This patch modifies the changefeed job progress saving code so that we only create a string for a span-level checkpoint when we actually need it for logging. Release note: None --- **changefeedccl: fix job progress checkpoint fields invariant violation** Release note (bug fix): A bug where a changefeed that was created before v25.2 could fail after upgrading to v25.2 with the error message `both legacy and current checkpoint set on change aggregator spec` has now been fixed. 148782: storage: add blob-rewrite compaction cluster settings r=jbowens a=jbowens Add two new cluster settings for configuring blob-rewrite compactions. Epic: CRDB-20379 Release note: None Co-authored-by: Andy Yang <[email protected]> Co-authored-by: Jackson Owens <[email protected]>
3 parents 79e136a + dcd914a + feb3805 commit 1ded868

File tree

9 files changed

+178
-33
lines changed

9 files changed

+178
-33
lines changed

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,10 @@ func alterChangefeedPlanHook(
167167
if !changefeedProgress.Checkpoint.IsEmpty() {
168168
return errors.AssertionFailedf("both legacy and current checkpoint set on changefeed job progress")
169169
}
170-
changefeedProgress.Checkpoint = checkpoint.ConvertToLegacyCheckpoint(changefeedProgress.SpanLevelCheckpoint)
171-
changefeedProgress.SpanLevelCheckpoint = nil
170+
legacyCheckpoint := checkpoint.ConvertToLegacyCheckpoint(changefeedProgress.SpanLevelCheckpoint)
171+
if err := changefeedProgress.SetCheckpoint(legacyCheckpoint, nil); err != nil {
172+
return err
173+
}
172174
}
173175
}
174176
newChangefeedStmt.Targets = newTargets
@@ -856,21 +858,15 @@ func generateNewProgress(
856858
}
857859

858860
func removeSpansFromProgress(
859-
prevProgress jobspb.Progress, spansToRemove []roachpb.Span, statementTime hlc.Timestamp,
861+
progress jobspb.Progress, spansToRemove []roachpb.Span, statementTime hlc.Timestamp,
860862
) error {
861-
changefeedProgress := prevProgress.GetChangefeed()
862-
if changefeedProgress == nil {
863-
return nil
864-
}
865-
changefeedCheckpoint := changefeedProgress.Checkpoint
866-
if changefeedCheckpoint == nil {
867-
return nil
868-
}
869-
870-
spanLevelCheckpoint, err := getSpanLevelCheckpointFromProgress(prevProgress, statementTime)
863+
spanLevelCheckpoint, err := getSpanLevelCheckpointFromProgress(progress, statementTime)
871864
if err != nil {
872865
return err
873866
}
867+
if spanLevelCheckpoint == nil {
868+
return nil
869+
}
874870
checkpointSpansMap := make(map[hlc.Timestamp]roachpb.Spans)
875871
for ts, sp := range spanLevelCheckpoint.All() {
876872
var spanGroup roachpb.SpanGroup
@@ -880,7 +876,10 @@ func removeSpansFromProgress(
880876
checkpointSpansMap[ts] = spans
881877
}
882878
}
883-
changefeedProgress.SpanLevelCheckpoint = jobspb.NewTimestampSpansMap(checkpointSpansMap)
879+
spanLevelCheckpoint = jobspb.NewTimestampSpansMap(checkpointSpansMap)
880+
if err := progress.GetChangefeed().SetCheckpoint(nil, spanLevelCheckpoint); err != nil {
881+
return err
882+
}
884883

885884
return nil
886885
}

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,16 +261,33 @@ func startDistChangefeed(
261261
dsp := execCtx.DistSQLPlanner()
262262

263263
//lint:ignore SA1019 deprecated usage
264-
var checkpoint *jobspb.ChangefeedProgress_Checkpoint
264+
var legacyCheckpoint *jobspb.ChangefeedProgress_Checkpoint
265265
if progress := localState.progress.GetChangefeed(); progress != nil && progress.Checkpoint != nil {
266-
checkpoint = progress.Checkpoint
266+
legacyCheckpoint = progress.Checkpoint
267267
}
268268
var spanLevelCheckpoint *jobspb.TimestampSpansMap
269269
if progress := localState.progress.GetChangefeed(); progress != nil && progress.SpanLevelCheckpoint != nil {
270270
spanLevelCheckpoint = progress.SpanLevelCheckpoint
271271
}
272+
if legacyCheckpoint != nil && spanLevelCheckpoint != nil {
273+
if legacyCheckpoint.Timestamp.After(spanLevelCheckpoint.MinTimestamp()) {
274+
// We should never be writing the legacy checkpoint again once we
275+
// start writing the new checkpoint format. If we do, that signals
276+
// a missing or incorrect version gate check somewhere.
277+
return errors.AssertionFailedf("both legacy and current checkpoint set on " +
278+
"changefeed job progress and legacy checkpoint has later timestamp")
279+
}
280+
// This should be an assertion failure but unfortunately due to a bug
281+
// that was included in earlier versions of 25.2 (#148620), we may fail
282+
// to clear the legacy checkpoint when we start writing the new one.
283+
// We instead discard the legacy checkpoint here and it will eventually be
284+
// cleared once the cluster is running a newer patch release with the fix.
285+
log.Warningf(ctx, "both legacy and current checkpoint set on changefeed job progress; "+
286+
"discarding legacy checkpoint")
287+
legacyCheckpoint = nil
288+
}
272289
p, planCtx, err := makePlan(execCtx, jobID, details, description, initialHighWater,
273-
trackedSpans, checkpoint, spanLevelCheckpoint, localState.drainingNodes)(ctx, dsp)
290+
trackedSpans, legacyCheckpoint, spanLevelCheckpoint, localState.drainingNodes)(ctx, dsp)
274291
if err != nil {
275292
return err
276293
}

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1148,11 +1148,11 @@ func (cs *cachedState) SetHighwater(frontier hlc.Timestamp) {
11481148
}
11491149

11501150
// SetCheckpoint implements the eval.ChangefeedState interface.
1151-
func (cs *cachedState) SetCheckpoint(checkpoint *jobspb.TimestampSpansMap) {
1152-
// NB: It's not necessary to set the legacy checkpoint field because this
1151+
func (cs *cachedState) SetCheckpoint(checkpoint *jobspb.TimestampSpansMap) error {
1152+
// It's not necessary to do a version gate check here because this
11531153
// copy of the checkpoint is only used in-memory on a coordinator node that
11541154
// knows about the new field.
1155-
cs.progress.Details.(*jobspb.Progress_Changefeed).Changefeed.SpanLevelCheckpoint = checkpoint
1155+
return cs.progress.Details.(*jobspb.Progress_Changefeed).Changefeed.SetCheckpoint(nil, checkpoint)
11561156
}
11571157

11581158
// AggregatorFrontierSpans returns an iterator over the spans in the aggregator
@@ -1846,7 +1846,8 @@ func (cf *changeFrontier) checkpointJobProgress(
18461846
cf.metrics.FrontierUpdates.Inc(1)
18471847
if cf.js.job != nil {
18481848
var ptsUpdated bool
1849-
var checkpointStr string
1849+
//lint:ignore SA1019 deprecated usage
1850+
var legacyCheckpoint *jobspb.ChangefeedProgress_Checkpoint
18501851
if err := cf.js.job.DebugNameNoTxn(changefeedJobProgressTxnName).Update(cf.Ctx(), func(
18511852
txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
18521853
) error {
@@ -1863,12 +1864,14 @@ func (cf *changeFrontier) checkpointJobProgress(
18631864

18641865
changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed
18651866
if cv.IsActive(cf.Ctx(), clusterversion.V25_2) {
1866-
changefeedProgress.SpanLevelCheckpoint = spanLevelCheckpoint
1867-
checkpointStr = spanLevelCheckpoint.String()
1867+
if err := changefeedProgress.SetCheckpoint(nil, spanLevelCheckpoint); err != nil {
1868+
return err
1869+
}
18681870
} else {
1869-
legacyCheckpoint := checkpoint.ConvertToLegacyCheckpoint(spanLevelCheckpoint)
1870-
changefeedProgress.Checkpoint = legacyCheckpoint
1871-
checkpointStr = legacyCheckpoint.String()
1871+
legacyCheckpoint = checkpoint.ConvertToLegacyCheckpoint(spanLevelCheckpoint)
1872+
if err := changefeedProgress.SetCheckpoint(legacyCheckpoint, nil); err != nil {
1873+
return err
1874+
}
18721875
}
18731876

18741877
if ptsUpdated, err = cf.manageProtectedTimestamps(ctx, txn, changefeedProgress); err != nil {
@@ -1890,13 +1893,20 @@ func (cf *changeFrontier) checkpointJobProgress(
18901893
cf.lastProtectedTimestampUpdate = timeutil.Now()
18911894
}
18921895
if log.V(2) {
1893-
log.Infof(cf.Ctx(), "change frontier persisted highwater=%s and checkpoint=%s",
1894-
frontier, checkpointStr)
1896+
if cv.IsActive(cf.Ctx(), clusterversion.V25_2) {
1897+
log.Infof(cf.Ctx(), "change frontier persisted highwater=%s and checkpoint=%s",
1898+
frontier, spanLevelCheckpoint)
1899+
} else {
1900+
log.Infof(cf.Ctx(), "change frontier persisted highwater=%s and checkpoint=%s",
1901+
frontier, legacyCheckpoint)
1902+
}
18951903
}
18961904
}
18971905

18981906
cf.localState.SetHighwater(frontier)
1899-
cf.localState.SetCheckpoint(spanLevelCheckpoint)
1907+
if err := cf.localState.SetCheckpoint(spanLevelCheckpoint); err != nil {
1908+
return false, err
1909+
}
19001910

19011911
return true, nil
19021912
}

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1642,7 +1642,9 @@ func reconcileJobStateWithLocalState(
16421642
if updateHW {
16431643
localState.SetHighwater(sf.Frontier())
16441644
}
1645-
localState.SetCheckpoint(checkpoint)
1645+
if err := localState.SetCheckpoint(checkpoint); err != nil {
1646+
return err
1647+
}
16461648
if log.V(1) {
16471649
log.Infof(ctx, "Applying checkpoint to job record: hw=%v, cf=%v",
16481650
localState.progress.GetHighWater(), localState.progress.GetChangefeed())

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11713,3 +11713,83 @@ func TestCloudstorageParallelCompression(t *testing.T) {
1171311713
}
1171411714
})
1171511715
}
11716+
11717+
// TestChangefeedResumeWithBothLegacyAndCurrentCheckpoint is a regression
11718+
// test for #148620, which was a bug where the legacy checkpoint was not
11719+
// being cleared after the cluster was upgraded to 25.2 and subsequently
11720+
// causing an assertion error when resuming.
11721+
func TestChangefeedResumeWithBothLegacyAndCurrentCheckpoint(t *testing.T) {
11722+
defer leaktest.AfterTest(t)()
11723+
defer log.Scope(t).Close(t)
11724+
11725+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
11726+
ctx := context.Background()
11727+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
11728+
11729+
// Create a table with 11 ranges.
11730+
const numRows = 10
11731+
const numRanges = numRows + 1
11732+
sqlDB.ExecMultiple(t,
11733+
`CREATE TABLE foo (a INT PRIMARY KEY)`,
11734+
fmt.Sprintf(`INSERT INTO foo SELECT * FROM generate_series(1, %d)`, numRows),
11735+
fmt.Sprintf(`ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, %d))`, numRows),
11736+
)
11737+
fooSpan := desctestutils.
11738+
TestingGetPublicTableDescriptor(s.Server.DB(), s.Codec, "d", "foo").
11739+
PrimaryIndexSpan(s.Codec)
11740+
ranges, _, err := s.Server.
11741+
DistSenderI().(*kvcoord.DistSender).
11742+
AllRangeSpans(ctx, []roachpb.Span{fooSpan})
11743+
require.NoError(t, err)
11744+
require.Len(t, ranges, numRanges)
11745+
11746+
cf := feed(t, f, `CREATE CHANGEFEED FOR foo WITH no_initial_scan`)
11747+
defer closeFeed(t, cf)
11748+
11749+
jobFeed, ok := cf.(cdctest.EnterpriseTestFeed)
11750+
require.True(t, ok)
11751+
11752+
sqlDB.Exec(t, `PAUSE JOB $1`, jobFeed.JobID())
11753+
waitForJobState(sqlDB, t, jobFeed.JobID(), jobs.StatePaused)
11754+
hw, err := jobFeed.HighWaterMark()
11755+
require.NoError(t, err)
11756+
11757+
registry := s.Server.JobRegistry().(*jobs.Registry)
11758+
11759+
// Manually insert both a legacy and current checkpoint containing
11760+
// a random range from the table.
11761+
rnd, _ := randutil.NewTestRand()
11762+
randomRange := ranges[rnd.Intn(len(ranges))]
11763+
err = registry.UpdateJobWithTxn(ctx, jobFeed.JobID(), nil,
11764+
func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
11765+
checkpointTS := hw.Add(int64(time.Nanosecond), 0)
11766+
progress := md.Progress
11767+
progress.Details = jobspb.WrapProgressDetails(jobspb.ChangefeedProgress{
11768+
//lint:ignore SA1019 deprecated usage
11769+
Checkpoint: &jobspb.ChangefeedProgress_Checkpoint{
11770+
Spans: []roachpb.Span{randomRange},
11771+
Timestamp: checkpointTS,
11772+
},
11773+
SpanLevelCheckpoint: jobspb.NewTimestampSpansMap(
11774+
map[hlc.Timestamp]roachpb.Spans{
11775+
checkpointTS: []roachpb.Span{randomRange},
11776+
},
11777+
),
11778+
})
11779+
ju.UpdateProgress(progress)
11780+
return nil
11781+
})
11782+
require.NoError(t, err)
11783+
11784+
sqlDB.Exec(t, `RESUME JOB $1`, jobFeed.JobID())
11785+
waitForJobState(sqlDB, t, jobFeed.JobID(), jobs.StateRunning)
11786+
11787+
// Wait for highwater to advance past the current time.
11788+
var tsStr string
11789+
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&tsStr)
11790+
ts := parseTimeToHLC(t, tsStr)
11791+
require.NoError(t, jobFeed.WaitForHighWaterMark(ts))
11792+
}
11793+
11794+
cdcTest(t, testFn, feedTestEnterpriseSinks)
11795+
}

pkg/jobs/jobspb/jobs.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1717
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
1818
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
19+
"github.com/cockroachdb/errors"
1920
)
2021

2122
// JobID is the ID of a job.
@@ -152,6 +153,20 @@ func (m *ChangefeedProgress_Checkpoint) IsEmpty() bool {
152153
return m == nil || (len(m.Spans) == 0 && m.Timestamp.IsEmpty())
153154
}
154155

156+
// SetCheckpoint is a setter for the checkpoint fields in ChangefeedProgress.
157+
// It enforces the invariant that at most one of them can be non-nil.
158+
func (m *ChangefeedProgress) SetCheckpoint(
159+
legacyCheckpoint *ChangefeedProgress_Checkpoint, spanLevelCheckpoint *TimestampSpansMap,
160+
) error {
161+
if legacyCheckpoint != nil && spanLevelCheckpoint != nil {
162+
return errors.AssertionFailedf(
163+
"attempting to set both legacy and current checkpoint on changefeed job progress")
164+
}
165+
m.Checkpoint = legacyCheckpoint
166+
m.SpanLevelCheckpoint = spanLevelCheckpoint
167+
return nil
168+
}
169+
155170
func (r RestoreDetails) OnlineImpl() bool {
156171
return r.ExperimentalCopy || r.ExperimentalOnline
157172
}

pkg/jobs/jobspb/jobs.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1217,6 +1217,10 @@ message ChangefeedProgress {
12171217
// changefeed. It consists of a list of spans and a single timestamp
12181218
// representing the minimum resolved timestamp of the spans.
12191219
// It is now deprecated in favor of SpanLevelCheckpoint.
1220+
//
1221+
// Invariant: At most one of Checkpoint and SpanLevelCheckpoint should be
1222+
// non-nil at any time.
1223+
//
12201224
// TODO(#139734): Delete this field and mark field number as reserved.
12211225
Checkpoint checkpoint = 4 [deprecated=true];
12221226

@@ -1243,6 +1247,9 @@ message ChangefeedProgress {
12431247
// checkpoint to its corresponding resolved timestamp, which will be higher
12441248
// than the overall resolved timestamp and thus allow us to do less work.
12451249
// This is especially useful during backfills or if some spans are lagging.
1250+
//
1251+
// Invariant: At most one of Checkpoint and SpanLevelCheckpoint should be
1252+
// non-nil at any time.
12461253
TimestampSpansMap span_level_checkpoint = 5;
12471254
}
12481255

pkg/sql/sem/eval/deps.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ type ChangefeedState interface {
665665
SetHighwater(frontier hlc.Timestamp)
666666

667667
// SetCheckpoint sets the checkpoint for the changefeed.
668-
SetCheckpoint(checkpoint *jobspb.TimestampSpansMap)
668+
SetCheckpoint(checkpoint *jobspb.TimestampSpansMap) error
669669
}
670670

671671
// TenantOperator is capable of interacting with tenant state, allowing SQL

pkg/storage/pebble.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,21 @@ var (
374374
int64(metamorphic.ConstantWithTestRange("storage.value_separation.max_reference_depth", 10 /* default */, 2, 20)),
375375
settings.IntWithMinimum(2),
376376
)
377+
valueSeparationRewriteMinimumAge = settings.RegisterDurationSetting(
378+
settings.SystemVisible,
379+
"storage.value_separation.rewrite_minimum_age",
380+
"the minimum age of a blob file before it is eligible for a rewrite compaction",
381+
5*time.Minute, // 5 minutes
382+
settings.DurationWithMinimum(0),
383+
)
384+
valueSeparationCompactionGarbageThreshold = settings.RegisterIntSetting(
385+
settings.SystemVisible,
386+
"storage.value_separation.compaction_garbage_threshold",
387+
"the max garbage threshold configures the percentage of unreferenced value "+
388+
"bytes that trigger blob-file rewrite compactions; 100 disables these compactions",
389+
100, /* default; disables blob-file rewrites */
390+
settings.IntInRange(1, 100),
391+
)
377392
)
378393

379394
// EngineComparer is a pebble.Comparer object that implements MVCC-specific
@@ -848,8 +863,8 @@ func newPebble(ctx context.Context, cfg engineConfig) (p *Pebble, err error) {
848863
Enabled: true,
849864
MinimumSize: int(valueSeparationMinimumSize.Get(&cfg.settings.SV)),
850865
MaxBlobReferenceDepth: int(valueSeparationMaxReferenceDepth.Get(&cfg.settings.SV)),
851-
RewriteMinimumAge: time.Minute,
852-
TargetGarbageRatio: 1.0, // Disable blob file rewrites
866+
RewriteMinimumAge: valueSeparationRewriteMinimumAge.Get(&cfg.settings.SV),
867+
TargetGarbageRatio: float64(valueSeparationCompactionGarbageThreshold.Get(&cfg.settings.SV)) / 100.0,
853868
}
854869
}
855870

0 commit comments

Comments
 (0)