-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
changefeedccl: enable frontier quantization #138328
base: master
Are you sure you want to change the base?
Conversation
ecc531b
to
8eae6cf
Compare
@@ -128,7 +130,12 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { | |||
} | |||
stop() | |||
case *kvpb.RangeFeedCheckpoint: | |||
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) { | |||
ev := e.RangeFeedEvent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while this is a pretty simple implementation, would you mind investigating how much work it would be to switch to using the higher-level rangefeed client implementation that LDR uses? it already includes an implementation of quantization and more things that may prove useful. see https://github.com/asg0451/cockroach/blob/ed5ed37ea91b04fba061084ba73fe9225c018288/pkg/crosscluster/producer/event_stream.go#L147 &/ talk to me about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's too much work / too incompatible an api, then maybe dont. however i think it would be beneficial if it's low to moderate effort
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will look into the refactoring. Can we try merging this PR anyways since this is pretty straightforward and already up? I will look into the refactoring as part of the follow up work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put up #138346 and assigned myself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks. lmk how it goes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also be in favor of using the already-built frontier quantization rangefeed client code if it's not too complicated. Maybe you could try prototyping it and seeing how involved it would be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, I'm exploring this option now and see how much work this turns out to be. I would still prefer to drive this PR forward given this is already up, but I'm actively prototyping for this refactoring.
8eae6cf
to
d165a8c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good but i have some questions about the tests.
require.NoError(t, err) | ||
} | ||
frontier.Entries(func(sp roachpb.Span, ts hlc.Timestamp) span.OpResult { | ||
t.Logf("span: %v, ts: %v\n", sp, ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit rm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it might be worth leaving it in since it was helpful for debugging test failures wdyt?
@@ -128,7 +130,12 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { | |||
} | |||
stop() | |||
case *kvpb.RangeFeedCheckpoint: | |||
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) { | |||
ev := e.RangeFeedEvent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks. lmk how it goes
const quantize = time.Duration(10) | ||
for _, e := range tc.events { | ||
if tc.withFrontierQuantization { | ||
e.Checkpoint.ResolvedTS = mockQuantization(e.Checkpoint.ResolvedTS, quantize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not really sure what this test is testing. just your mockQuantization function? can you make sure it tests the actual implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, I refactored the actual quantization to a helper function and made tests call into the helper function instead.
mockQuantization := func(ts hlc.Timestamp, quantize time.Duration) hlc.Timestamp { | ||
ts.Logical = 0 | ||
ts.WallTime -= ts.WallTime % int64(quantize) | ||
return ts | ||
} | ||
|
||
makeCheckpointEvent := func(key []byte, endKey []byte, ts int, logical int32) *kvpb.RangeFeedEvent { | ||
return &kvpb.RangeFeedEvent{ | ||
Checkpoint: &kvpb.RangeFeedCheckpoint{ | ||
Span: roachpb.Span{Key: key, EndKey: endKey}, | ||
ResolvedTS: hlc.Timestamp{WallTime: int64(ts), Logical: logical}, | ||
}, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit these can be pulled out between these two tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
quantizedFrontier, err := span.MakeFrontier([]roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}...) | ||
require.NoError(t, err) | ||
for _, e := range events { | ||
quantizedTs := mockQuantization(e.Checkpoint.ResolvedTS, quantize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto on the "what does this test" question
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added more comments for these tests.
cd1cba8
to
8bcc47f
Compare
64820b0
to
3f5a7ef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! Left some comments.
if quantize := p.cfg.WithFrontierQuantize; quantize != 0 { | ||
ev.Checkpoint.ResolvedTS = quantizeTS(ev.Checkpoint.ResolvedTS, quantize) | ||
} | ||
if resolvedTs := ev.Checkpoint.ResolvedTS; resolvedTs.IsEmpty() && resolvedTs.Less(p.cfg.Frontier) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code used to check that the timestamp wasn't empty. Is the change intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, done.
@@ -128,7 +130,12 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { | |||
} | |||
stop() | |||
case *kvpb.RangeFeedCheckpoint: | |||
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) { | |||
ev := e.RangeFeedEvent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also be in favor of using the already-built frontier quantization rangefeed client code if it's not too complicated. Maybe you could try prototyping it and seeing how involved it would be?
@@ -126,6 +126,7 @@ func Run(ctx context.Context, cfg Config) error { | |||
cfg.Writer, cfg.Spans, cfg.CheckpointSpans, cfg.CheckpointTimestamp, | |||
cfg.SchemaChangeEvents, cfg.SchemaChangePolicy, | |||
cfg.NeedsInitialScan, cfg.WithDiff, cfg.WithFiltering, | |||
changefeedbase.Quantize.Get(&cfg.Settings.SV), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should pass the quantization factor in the processor specs so that we don't end up in a situation where multiple change aggregators have different quantization factors because of a race condition between changing the cluster setting and (re)starting a changefeed. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Let's discuss this further offline. At first glance, having different granularities across aggregators doesn’t seem like a major issue. It might reduce quantization but may not justify the complexity of passing it to processor specs. We can discuss this more offline, and I'm looking into this now.
|
||
var Quantize = settings.RegisterDurationSettingWithExplicitUnit( | ||
settings.ApplicationLevel, | ||
"changefeed.frontier.timestamp_granularity", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer changefeed.resolved_timestamp.granularity
since ultimately this will be surfaced to the user via quantized resolved timestamps. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
_, err := buf.Get(ctx) | ||
for eventIdx := 0; eventIdx < tc.expEventsCount; eventIdx++ { | ||
e, err := buf.Get(ctx) | ||
if tc.expEvents != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this if branch should go under the assert.NoError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
frontier, err := span.MakeFrontier([]roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}...) | ||
require.NoError(t, err) | ||
for _, e := range events { | ||
_, err := frontier.Forward(e.Checkpoint.Span, e.Checkpoint.ResolvedTS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: create both frontiers at the same time and have a single loop where you forward the events to both frontiers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
3f5a7ef
to
bfe3ed5
Compare
bfe3ed5
to
30a63fc
Compare
Previously, all checkpoint timestamps were forwarded precisely to the resolved timestamp. To improve merging of adjacent frontier spans, this patch introduces a frontier quantization mode. In this mode, resolved timestamps from rangefeeds are rounded down to the nearest multiple of the quantization granularity. Rounding down ensures no progress is missed and that we consistently track the lowest possible resolved timestamp. Resolves: cockroachdb#125723 Release note: changefeed.frontier.timestamp_granularity (default off) has been introduced to enable more efficient tracking of resolved timestamps.
30a63fc
to
592c7b6
Compare
Previously, all checkpoint timestamps were forwarded precisely to the resolved
timestamp. To improve merging of adjacent frontier spans, this patch introduces
a frontier quantization mode. In this mode, resolved timestamps from rangefeeds
are rounded down to the nearest multiple of the quantization granularity.
Rounding down ensures no progress is missed and that we consistently track the
lowest possible resolved timestamp.
Resolves: #125723
Release note: changefeed.frontier.timestamp_granularity (default off)
has been introduced to enable more efficient tracking of resolved timestamps.