Skip to content

Commit 0d073f4

Browse files
craig[bot]Yevgeniy Miretskiymsbutlererikgrinaker
committed
109439: changefeedccl: Emit span resolved event when end time reached r=miretskiy a=miretskiy Changefeed supports a mode where the user wants to emit all events that occurred since some time in the past (`cursor`), and end the changefeed (`end_time) at the time in the near future. In this mode, the rangefeed catchup scan starting from `cursor` position could take some time -- maybe even a lot of time -- and in this case, the very first checkpoint kvfeed will observe will be after `end_time`. All of the events, including checkpoints after `end_time` are skipped, as they should. However, this meant that no changefeed checkpoint records could be produced until entire changefeed completes. This PR ensures that once the `end_time` is reached, we will emit 1 "resolved event" for that span, so that changefeed can produce span based checkpoint if needed. Fixes #108464 Release note: None 110267: roachtest: during c2c/shutdown, shutdown main driver if shutdown executor fails r=stevendanna a=msbutler During #110166, the c2c/shutdown test fataled while the job shutdown executor was running, yet the test kept running for quite a while because the goroutine that manages the c2c job had not realized the test failed. This patch refactors the c2c/shutdown tests such that when the job shutdown executor detects a failure, it cancels the context used by the goroutine managing the c2c job. Informs #110166 Release note: none 110329: rangefeed: reuse annotated context in `ScheduledProcessor.process()` r=erikgrinaker a=erikgrinaker Context construction is expensive enough to show up in CPU profiles. With 20k rangefeeds/node on an idle cluster, this made up 1% of overall CPU usage, or 4% of rangefeed scheduler CPU usage. Epic: none Release note: None Co-authored-by: Yevgeniy Miretskiy <[email protected]> Co-authored-by: Michael Butler <[email protected]> Co-authored-by: Erik Grinaker <[email protected]>
4 parents dc0aaba + 526100a + 01db2d8 + 6d136d8 commit 0d073f4

File tree

16 files changed

+340
-81
lines changed

16 files changed

+340
-81
lines changed

pkg/ccl/changefeedccl/cdcutils/throttle.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"context"
1313
"encoding/json"
1414
"fmt"
15-
"math"
1615
"sync"
1716
"time"
1817

@@ -69,7 +68,7 @@ func (t *Throttler) AcquireFlushQuota(ctx context.Context) error {
6968
func (t *Throttler) updateConfig(config changefeedbase.SinkThrottleConfig) {
7069
setLimits := func(rl *quotapool.RateLimiter, rate, burst float64) {
7170
// set rateBudget to unlimited if rate is 0.
72-
rateBudget := quotapool.Limit(math.MaxInt64)
71+
rateBudget := quotapool.Inf()
7372
if rate > 0 {
7473
rateBudget = quotapool.Limit(rate)
7574
}

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 45 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ import (
8585
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
8686
"github.com/cockroachdb/cockroach/pkg/util/randident"
8787
"github.com/cockroachdb/cockroach/pkg/util/randutil"
88+
"github.com/cockroachdb/cockroach/pkg/util/span"
8889
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
8990
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
9091
"github.com/cockroachdb/cockroach/pkg/util/uuid"
@@ -7142,47 +7143,65 @@ func TestChangefeedEndTimeWithCursor(t *testing.T) {
71427143
defer log.Scope(t).Close(t)
71437144

71447145
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
7145-
knobs := s.TestingKnobs.
7146-
DistSQL.(*execinfra.TestingKnobs).
7147-
Changefeed.(*TestingKnobs)
7148-
endTimeReached := make(chan struct{})
7149-
knobs.FeedKnobs.EndTimeReached = func() bool {
7150-
select {
7151-
case <-endTimeReached:
7152-
return true
7153-
default:
7154-
return false
7155-
}
7156-
}
7157-
71587146
sqlDB := sqlutils.MakeSQLRunner(s.DB)
71597147

71607148
sqlDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)")
7161-
sqlDB.Exec(t, "INSERT INTO foo VALUES (1), (2), (3)")
71627149

71637150
var tsCursor string
71647151
sqlDB.QueryRow(t, "SELECT (cluster_logical_timestamp())").Scan(&tsCursor)
7165-
sqlDB.Exec(t, "INSERT INTO foo VALUES (4), (5), (6)")
71667152

7167-
fakeEndTime := s.Server.Clock().Now().Add(int64(time.Hour), 0).AsOfSystemTime()
7168-
feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH cursor = $1, end_time = $2, no_initial_scan", tsCursor, fakeEndTime)
7169-
defer closeFeed(t, feed)
7153+
// Insert 1k rows -- using separate statements to get different MVCC timestamps.
7154+
for i := 0; i < 1024; i++ {
7155+
sqlDB.Exec(t, "INSERT INTO foo VALUES ($1)", i)
7156+
}
71707157

7171-
assertPayloads(t, feed, []string{
7172-
`foo: [4]->{"after": {"a": 4}}`,
7173-
`foo: [5]->{"after": {"a": 5}}`,
7174-
`foo: [6]->{"after": {"a": 6}}`,
7175-
})
7176-
close(endTimeReached)
7158+
// Split table into multiple ranges to make things more interesting.
7159+
sqlDB.Exec(t, "ALTER TABLE foo SPLIT AT VALUES (100), (200), (400), (800)")
7160+
7161+
knobs := s.TestingKnobs.
7162+
DistSQL.(*execinfra.TestingKnobs).
7163+
Changefeed.(*TestingKnobs)
7164+
fooSpan := func() roachpb.Span {
7165+
fooDesc := desctestutils.TestingGetPublicTableDescriptor(
7166+
s.Server.DB(), s.Codec, "d", "foo")
7167+
return fooDesc.PrimaryIndexSpan(s.Codec)
7168+
}()
71777169

7170+
// Capture resolved events emitted during changefeed. We expect
7171+
// every range to emit resolved event with end_time timestamp.
7172+
frontier, err := span.MakeFrontier(fooSpan)
7173+
require.NoError(t, err)
7174+
knobs.FilterSpanWithMutation = func(rs *jobspb.ResolvedSpan) (bool, error) {
7175+
_, err := frontier.Forward(rs.Span, rs.Timestamp)
7176+
return false, err
7177+
}
7178+
7179+
// endTime must be after creation time (5 seconds should be enough
7180+
// to reach create changefeed statement and process it).
7181+
endTime := s.Server.Clock().Now().AddDuration(5 * time.Second)
7182+
feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH cursor = $1, end_time = $2, no_initial_scan",
7183+
tsCursor, eval.TimestampToDecimalDatum(endTime).String())
7184+
defer closeFeed(t, feed)
7185+
7186+
// Don't care much about the values emitted (tested elsewhere) -- all
7187+
// we want to make sure is that the feed terminates.
71787188
testFeed := feed.(cdctest.EnterpriseTestFeed)
71797189
require.NoError(t, testFeed.WaitForStatus(func(s jobs.Status) bool {
71807190
return s == jobs.StatusSucceeded
71817191
}))
7192+
7193+
// After changefeed completes, verify we have seen all ranges emit resolved
7194+
// event with end_time timestamp. That is: verify frontier.Frontier() is at end_time.
7195+
expectedFrontier := endTime.Prev()
7196+
testutils.SucceedsWithin(t, func() error {
7197+
if expectedFrontier.EqOrdering(frontier.Frontier()) {
7198+
return nil
7199+
}
7200+
return errors.Newf("still waiting for frontier to reach %s, current %s",
7201+
expectedFrontier, frontier.Frontier())
7202+
}, 5*time.Second)
71827203
}
71837204

7184-
// TODO: Fix sinkless feeds not providing pre-close events if Next is called
7185-
// after the feed was closed
71867205
cdcTest(t, testFn, feedTestEnterpriseSinks)
71877206
}
71887207

pkg/ccl/changefeedccl/kvevent/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ go_test(
5252
"//pkg/sql/rowenc/keyside",
5353
"//pkg/sql/sem/tree",
5454
"//pkg/sql/types",
55+
"//pkg/testutils",
5556
"//pkg/util",
5657
"//pkg/util/ctxgroup",
5758
"//pkg/util/encoding",

pkg/ccl/changefeedccl/kvevent/blocking_buffer.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ func newMemBuffer(
105105

106106
b.qp = allocPool{
107107
AbstractPool: quotapool.New("changefeed", quota, opts...),
108+
sv: sv,
108109
metrics: metrics,
109110
}
110111

@@ -218,6 +219,7 @@ func (b *blockingBuffer) enqueue(ctx context.Context, e Event) (err error) {
218219
}
219220

220221
b.metrics.BufferEntriesIn.Inc(1)
222+
b.metrics.BufferEntriesByType[e.et.Index()].Inc(1)
221223
b.mu.queue.enqueue(e)
222224

223225
select {
@@ -246,6 +248,7 @@ func (b *blockingBuffer) AcquireMemory(ctx context.Context, n int64) (alloc Allo
246248
return alloc, err
247249
}
248250
b.metrics.BufferEntriesMemAcquired.Inc(n)
251+
b.metrics.AllocatedMem.Inc(n)
249252
return alloc, nil
250253
}
251254

@@ -324,6 +327,7 @@ func (b *blockingBuffer) CloseWithReason(ctx context.Context, reason error) erro
324327
quota := r.(*memQuota)
325328
quota.closed = true
326329
quota.acc.Close(ctx)
330+
b.metrics.AllocatedMem.Dec(quota.allocated)
327331
return false
328332
})
329333

@@ -442,16 +446,22 @@ func (r *memRequest) ShouldWait() bool {
442446
type allocPool struct {
443447
*quotapool.AbstractPool
444448
metrics *Metrics
449+
sv *settings.Values
445450
}
446451

447452
func (ap allocPool) Release(ctx context.Context, bytes, entries int64) {
453+
if bytes < 0 {
454+
logcrash.ReportOrPanic(ctx, ap.sv, "attempt to release negative bytes (%d) into pool", bytes)
455+
}
456+
448457
ap.AbstractPool.Update(func(r quotapool.Resource) (shouldNotify bool) {
449458
quota := r.(*memQuota)
450459
if quota.closed {
451460
return false
452461
}
453462
quota.acc.Shrink(ctx, bytes)
454463
quota.allocated -= bytes
464+
ap.metrics.AllocatedMem.Dec(bytes)
455465
ap.metrics.BufferEntriesMemReleased.Inc(bytes)
456466
ap.metrics.BufferEntriesReleased.Inc(entries)
457467
return true

pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ import (
1515
"time"
1616

1717
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
18+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1819
"github.com/cockroachdb/cockroach/pkg/keys"
1920
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2021
"github.com/cockroachdb/cockroach/pkg/roachpb"
2122
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2223
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
2324
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside"
2425
"github.com/cockroachdb/cockroach/pkg/sql/types"
26+
"github.com/cockroachdb/cockroach/pkg/testutils"
2527
"github.com/cockroachdb/cockroach/pkg/util"
2628
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
2729
"github.com/cockroachdb/cockroach/pkg/util/encoding"
@@ -32,6 +34,7 @@ import (
3234
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
3335
"github.com/cockroachdb/cockroach/pkg/util/randutil"
3436
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
37+
"github.com/cockroachdb/errors"
3538
"github.com/stretchr/testify/require"
3639
)
3740

@@ -94,9 +97,6 @@ func TestBlockingBuffer(t *testing.T) {
9497
}
9598
st := cluster.MakeTestingClusterSettings()
9699
buf := kvevent.TestingNewMemBuffer(ba, &st.SV, &metrics, notifyWait)
97-
defer func() {
98-
require.NoError(t, buf.CloseWithReason(context.Background(), nil))
99-
}()
100100

101101
producerCtx, stopProducers := context.WithCancel(context.Background())
102102
wg := ctxgroup.WithContext(producerCtx)
@@ -105,26 +105,79 @@ func TestBlockingBuffer(t *testing.T) {
105105
}()
106106

107107
// Start adding KVs to the buffer until we block.
108+
var numResolvedEvents, numKVEvents int
108109
wg.GoCtx(func(ctx context.Context) error {
109110
rnd, _ := randutil.NewTestRand()
110111
for {
111-
err := buf.Add(ctx, kvevent.MakeKVEvent(makeRangeFeedEvent(rnd, 256, 0)))
112-
if err != nil {
113-
return err
112+
if rnd.Int()%20 == 0 {
113+
prefix := keys.SystemSQLCodec.TablePrefix(42)
114+
sp := roachpb.Span{Key: prefix, EndKey: prefix.Next()}
115+
if err := buf.Add(ctx, kvevent.NewBackfillResolvedEvent(sp, hlc.Timestamp{}, jobspb.ResolvedSpan_BACKFILL)); err != nil {
116+
return err
117+
}
118+
numResolvedEvents++
119+
} else {
120+
if err := buf.Add(ctx, kvevent.MakeKVEvent(makeRangeFeedEvent(rnd, 256, 0))); err != nil {
121+
return err
122+
}
123+
numKVEvents++
114124
}
115125
}
116126
})
117127

118-
<-waitCh
128+
require.NoError(t, timeutil.RunWithTimeout(
129+
context.Background(), "wait", 10*time.Second, func(ctx context.Context) error {
130+
select {
131+
case <-ctx.Done():
132+
return ctx.Err()
133+
case <-waitCh:
134+
return nil
135+
}
136+
}))
119137

120138
// Keep consuming events until we get pushback metrics updated.
139+
var numPopped, numFlush int
121140
for metrics.BufferPushbackNanos.Count() == 0 {
122141
e, err := buf.Get(context.Background())
123142
require.NoError(t, err)
124143
a := e.DetachAlloc()
125144
a.Release(context.Background())
145+
numPopped++
146+
if e.Type() == kvevent.TypeFlush {
147+
numFlush++
148+
}
126149
}
150+
151+
// Allocated memory gauge should be non-zero once we buffer some events.
152+
testutils.SucceedsWithin(t, func() error {
153+
if metrics.AllocatedMem.Value() > 0 {
154+
return nil
155+
}
156+
return errors.New("waiting for allocated mem > 0")
157+
}, 5*time.Second)
158+
127159
stopProducers()
160+
require.ErrorIs(t, wg.Wait(), context.Canceled)
161+
162+
require.EqualValues(t, numKVEvents+numResolvedEvents, metrics.BufferEntriesIn.Count())
163+
require.EqualValues(t, numPopped, metrics.BufferEntriesOut.Count())
164+
require.Greater(t, metrics.BufferEntriesMemReleased.Count(), int64(0))
165+
166+
// Flush events are special in that they are ephemeral event that doesn't get
167+
// counted when releasing (it's 0 entries and 0 byte event).
168+
require.EqualValues(t, numPopped-numFlush, metrics.BufferEntriesReleased.Count())
169+
170+
require.EqualValues(t, numKVEvents, metrics.BufferEntriesByType[kvevent.TypeKV].Count())
171+
require.EqualValues(t, numResolvedEvents, metrics.BufferEntriesByType[kvevent.TypeResolved].Count())
172+
173+
// We might have seen numFlush events, but they are synthetic, and only explicitly enqueued
174+
// flush events are counted.
175+
require.EqualValues(t, 0, metrics.BufferEntriesByType[kvevent.TypeFlush].Count())
176+
177+
// After buffer closed, resources are released, and metrics adjusted to reflect.
178+
require.NoError(t, buf.CloseWithReason(context.Background(), context.Canceled))
179+
180+
require.EqualValues(t, 0, metrics.AllocatedMem.Value())
128181
}
129182

130183
func TestBlockingBufferNotifiesConsumerWhenOutOfMemory(t *testing.T) {

pkg/ccl/changefeedccl/kvevent/event.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,10 @@ type MemAllocator interface {
7777
type Type uint8
7878

7979
const (
80-
// TypeUnknown indicates the event could not be parsed. Will fail the feed.
81-
TypeUnknown Type = iota
82-
8380
// TypeFlush indicates a request to flush buffered data.
8481
// This request type is emitted by blocking buffer when it's blocked, waiting
8582
// for more memory.
86-
TypeFlush
83+
TypeFlush Type = iota
8784

8885
// TypeKV indicates that the KV, PrevKeyValue, and BackfillTimestamp methods
8986
// on the Event meaningful.
@@ -98,6 +95,9 @@ const (
9895
// TypeResolved indicates that the Resolved method on the Event will be
9996
// meaningful.
10097
TypeResolved = resolvedNone
98+
99+
// number of event types.
100+
numEventTypes = TypeResolved + 1
101101
)
102102

103103
// Event represents an event emitted by a kvfeed. It is either a KV or a
@@ -120,6 +120,27 @@ func (e *Event) Type() Type {
120120
}
121121
}
122122

123+
// Index returns numerical/ordinal type index suitable for indexing into arrays.
124+
func (t Type) Index() int {
125+
switch t {
126+
case TypeFlush:
127+
return int(TypeFlush)
128+
case TypeKV:
129+
return int(TypeKV)
130+
case TypeResolved, resolvedBackfill, resolvedRestart, resolvedExit:
131+
return int(TypeResolved)
132+
default:
133+
log.Warningf(context.TODO(),
134+
"returning TypeFlush boundary type for unknown event type %d", t)
135+
return int(TypeFlush)
136+
}
137+
}
138+
139+
// Raw returns the underlying RangeFeedEvent.
140+
func (e *Event) Raw() *kvpb.RangeFeedEvent {
141+
return e.ev
142+
}
143+
123144
// ApproximateSize returns events approximate size in bytes.
124145
func (e *Event) ApproximateSize() int {
125146
if e.et == TypeFlush {

0 commit comments

Comments
 (0)