@@ -100,9 +100,6 @@ func registerCDCBench(r registry.Registry) {
100
100
RequiresLicense : true ,
101
101
Timeout : 2 * time .Hour , // catchup scans with 100k ranges can take >1 hour
102
102
Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
103
- if ranges == 100000 && scanType == cdcBenchCatchupScan {
104
- t .Skip ("fails to complete, see https://github.com/cockroachdb/cockroach/issues/108157" )
105
- }
106
103
runCDCBenchScan (ctx , t , c , scanType , rows , ranges , protocol , format )
107
104
},
108
105
})
@@ -173,6 +170,37 @@ func makeCDCBenchOptions() (option.StartOpts, install.ClusterSettings) {
173
170
// ranges aren't able to keep up.
174
171
settings .ClusterSettings ["kv.rangefeed.range_stuck_threshold" ] = "0"
175
172
173
+ // Checkpoint frequently. Some of the larger benchmarks might overload the
174
+ // cluster. Producing frequent span-level checkpoints helps with recovery.
175
+ settings .ClusterSettings ["changefeed.frontier_checkpoint_frequency" ] = "60s"
176
+ settings .ClusterSettings ["changefeed.frontier_highwater_lag_checkpoint_threshold" ] = "30s"
177
+
178
+ // Bump up the number of allowed catchup scans. Doing catchup for 100k ranges with default
179
+ // configuration (8 client side, 16 per store) takes a while (~1500-2000 ranges per min minutes).
180
+ settings .ClusterSettings ["kv.rangefeed.catchup_scan_concurrency" ] = "16"
181
+ settings .ClusterSettings ["kv.rangefeed.concurrent_catchup_iterators" ] = "16"
182
+
183
+ // Give changefeed more memory and slow down rangefeed checkpoints.
184
+ // When running large catchup scan benchmarks (100k ranges), as the benchmark
185
+ // nears completion, more and more ranges generate checkpoint events. When
186
+ // the rate of checkpoints high (default used to be 200ms), the changefeed
187
+ // begins to block on memory acquisition since the fan in factor (~20k
188
+ // ranges/node) greatly exceeds processing loop speed (1 goroutine).
189
+ // The current pipeline looks like this:
190
+ // rangefeed ->
191
+ // 1 goroutine physicalKVFeed (acquire Memory) ->
192
+ // 1 goroutine copyFromSourceToDestination (filter events) ->
193
+ // 1 goroutine changeAggregator.Next ->
194
+ // N goroutines rest of the pipeline (encode and emit)
195
+ // The memory for the checkpoint events (even ones after end_time) must be allocated
196
+ // first; then these events are thrown away (many inefficiencies here -- but
197
+ // it's the only thing we can do w/out having to add "end time" support to the rangefeed library).
198
+ // The rate of incoming events greatly exceeds the rate with which we consume these events
199
+ // (and release allocations), resulting in significant drop in completed ranges throughput.
200
+ // Current default is 3s, but if needed increase this time out:
201
+ // settings.ClusterSettings["kv.rangefeed.closed_timestamp_refresh_interval"] = "5s"
202
+ settings .ClusterSettings ["changefeed.memory.per_changefeed_limit" ] = "4G"
203
+
176
204
// Scheduled backups may interfere with performance, disable them.
177
205
opts .RoachprodOpts .ScheduleBackups = false
178
206
@@ -181,6 +209,13 @@ func makeCDCBenchOptions() (option.StartOpts, install.ClusterSettings) {
181
209
// catchup scans.
182
210
settings .Env = append (settings .Env , "COCKROACH_RANGEFEED_SEND_TIMEOUT=0" )
183
211
212
+ // If this benchmark experiences periodic changefeed restarts due to rpc errors
213
+ // (grpc context canceled), consider increase network timeout.
214
+ // Under significant load (due to rangefeed), timeout could easily be triggered
215
+ // due to elevated goroutine scheduling latency.
216
+ // Current default is 4s which should be sufficient.
217
+ // settings.Env = append(settings.Env, "COCKROACH_NETWORK_TIMEOUT=6s")
218
+
184
219
return opts , settings
185
220
}
186
221
@@ -286,6 +321,11 @@ func runCDCBenchScan(
286
321
default :
287
322
t .Fatalf ("unknown scan type %q" , scanType )
288
323
}
324
+
325
+ // Lock schema so that changefeed schema feed runs under fast path.
326
+ _ , err := conn .ExecContext (ctx , "ALTER TABLE kv.kv SET (schema_locked = true);" )
327
+ require .NoError (t , err )
328
+
289
329
var jobID int
290
330
require .NoError (t , conn .QueryRowContext (ctx ,
291
331
fmt .Sprintf (`CREATE CHANGEFEED FOR kv.kv INTO '%s' WITH %s` , sink , with )).
@@ -430,6 +470,11 @@ func runCDCBenchWorkload(
430
470
var done atomic.Value // time.Time
431
471
if cdcEnabled {
432
472
t .L ().Printf ("starting changefeed" )
473
+
474
+ // Lock schema so that changefeed schema feed runs under fast path.
475
+ _ , err := conn .ExecContext (ctx , "ALTER TABLE kv.kv SET (schema_locked = true);" )
476
+ require .NoError (t , err )
477
+
433
478
require .NoError (t , conn .QueryRowContext (ctx , fmt .Sprintf (
434
479
`CREATE CHANGEFEED FOR kv.kv INTO '%s' WITH format = '%s', initial_scan = 'no'` ,
435
480
sink , format )).
0 commit comments