Skip to content

Commit b452213

Browse files
craig[bot]aerfreibghal
committed
148245: changefeedccl: checkpoint on initial scan for core changefeeds r=asg0451,andyyang890 a=aerfrei Previously we would only checkpoint during backfills for enterprise changefeeds, even though we had a test asserting otherwise. Adding checkpointing prevents us from emitting further duplicates when core changefeeds encounter transient errors during an initial scan. Epic: none Fixes: #143153 Release note: None 148747: general: ignore vscode configuration r=bghal a=bghal Epic: none Co-authored-by: Aerin Freilich <[email protected]> Co-authored-by: Brendan Gerrity <[email protected]>
3 parents e9631b4 + aaa9a22 + 79b838b commit b452213

File tree

3 files changed

+5
-2
lines changed

3 files changed

+5
-2
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,4 @@ pkg/testutils/serverutils/*_generated.go
6565

6666
.DS_Store
6767
.idea/
68+
.vscode/

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -951,8 +951,7 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (retu
951951

952952
// At a lower frequency, we checkpoint specific spans in the job progress
953953
// either in backfills or if the highwater mark is excessively lagging behind.
954-
checkpointSpans := ca.spec.JobID != 0 && /* enterprise changefeed */
955-
(ca.frontier.InBackfill(resolved) || ca.frontier.HasLaggingSpans(sv)) &&
954+
checkpointSpans := (ca.frontier.InBackfill(resolved) || ca.frontier.HasLaggingSpans(sv)) &&
956955
canCheckpointSpans(sv, ca.lastSpanFlush)
957956

958957
if checkpointSpans {

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8829,9 +8829,11 @@ func TestCoreChangefeedBackfillScanCheckpoint(t *testing.T) {
88298829
context.Background(), &s.Server.ClusterSettings().SV, 100<<20)
88308830

88318831
emittedCount := 0
8832+
errorCount := 0
88328833
knobs.RaiseRetryableError = func() error {
88338834
emittedCount++
88348835
if emittedCount%200 == 0 {
8836+
errorCount++
88358837
return errors.New("test transient error")
88368838
}
88378839
return nil
@@ -8845,6 +8847,7 @@ func TestCoreChangefeedBackfillScanCheckpoint(t *testing.T) {
88458847
payloads[i] = fmt.Sprintf(`foo: [%d]->{"after": {"a": %d}}`, i, i)
88468848
}
88478849
assertPayloads(t, foo, payloads)
8850+
require.GreaterOrEqual(t, errorCount, 1)
88488851
}
88498852

88508853
cdcTest(t, testFn, feedTestForceSink("sinkless"))

0 commit comments

Comments
 (0)