Skip to content

Commit aaa9a22

Browse files
committed
changefeedccl: checkpoint on initial scan for core changefeeds
Previously we would only checkpoint during backfills for enterprise changefeeds, even though we had a test asserting otherwise. Adding checkpointing prevents us from emitting further duplicates when core changefeeds encounter transient errors during an initial scan. Epic: none Fixes: #143153 Release note: None
1 parent 7898c83 commit aaa9a22

File tree

2 files changed

+4
-2
lines changed

2 files changed

+4
-2
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -951,8 +951,7 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (retu
951951

952952
// At a lower frequency, we checkpoint specific spans in the job progress
953953
// either in backfills or if the highwater mark is excessively lagging behind.
954-
checkpointSpans := ca.spec.JobID != 0 && /* enterprise changefeed */
955-
(ca.frontier.InBackfill(resolved) || ca.frontier.HasLaggingSpans(sv)) &&
954+
checkpointSpans := (ca.frontier.InBackfill(resolved) || ca.frontier.HasLaggingSpans(sv)) &&
956955
canCheckpointSpans(sv, ca.lastSpanFlush)
957956

958957
if checkpointSpans {

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8825,9 +8825,11 @@ func TestCoreChangefeedBackfillScanCheckpoint(t *testing.T) {
88258825
context.Background(), &s.Server.ClusterSettings().SV, 100<<20)
88268826

88278827
emittedCount := 0
8828+
errorCount := 0
88288829
knobs.RaiseRetryableError = func() error {
88298830
emittedCount++
88308831
if emittedCount%200 == 0 {
8832+
errorCount++
88318833
return errors.New("test transient error")
88328834
}
88338835
return nil
@@ -8841,6 +8843,7 @@ func TestCoreChangefeedBackfillScanCheckpoint(t *testing.T) {
88418843
payloads[i] = fmt.Sprintf(`foo: [%d]->{"after": {"a": %d}}`, i, i)
88428844
}
88438845
assertPayloads(t, foo, payloads)
8846+
require.GreaterOrEqual(t, errorCount, 1)
88448847
}
88458848

88468849
cdcTest(t, testFn, feedTestForceSink("sinkless"))

0 commit comments

Comments
 (0)