@@ -21,7 +21,6 @@ import (
21
21
22
22
sgbucket "github.com/couchbase/sg-bucket"
23
23
"github.com/google/uuid"
24
- pkgerrors "github.com/pkg/errors"
25
24
)
26
25
27
26
// Number of non-checkpoint updates per vbucket required to trigger metadata persistence. Must be greater than zero to avoid
@@ -33,9 +32,6 @@ const kCheckpointThreshold = 1
33
32
// Only persist checkpoint once per kCheckpointTimeThreshold (per vbucket)
34
33
const kCheckpointTimeThreshold = 1 * time .Minute
35
34
36
- // Persist backfill progress every 10s
37
- const kBackfillPersistInterval = 10 * time .Second
38
-
39
35
// DCP Feed IDs are used to build unique DCP identifiers
40
36
const DCPCachingFeedID = "SG"
41
37
const DCPImportFeedID = "SGI"
@@ -54,7 +50,6 @@ type DCPCommon struct {
54
50
updatesSinceCheckpoint []uint64 // Number of updates since the last checkpoint. Used to avoid checkpoint persistence feedback loop
55
51
lastCheckpointTime []time.Time // Time of last checkpoint persistence, per vbucket. Used to manage checkpoint persistence volume
56
52
callback sgbucket.FeedEventCallbackFunc // Function to callback for mutation processing
57
- backfill * backfillStatus // Backfill state and stats
58
53
feedID string // Unique feed ID, used for logging
59
54
loggingCtx context.Context // Logging context, prefixes feedID
60
55
checkpointPrefix string // DCP checkpoint key prefix
@@ -63,9 +58,6 @@ type DCPCommon struct {
63
58
// NewDCPCommon creates a new DCPCommon which manages updates coming from a cbgt-based DCP feed. The callback function will receive events from a DCP feed. The bucket is the gocb bucket to stream events from. It stores checkpoints in the metaStore collection prefixes from metaKeys + checkpointPrefix. The feed name will start with feedID and DCPCommon will add unique string. Specific stats for DCP are stored in expvars rather than SgwStats. The janitorRollback function is supplied by the global cbgt.PIndexImplType.New function, for initial opening of a partition index, and cbgt.PIndexImplType.OpenUsing for reopening of a partition index. The rollback function provides a way to pass cbgt.JANITOR_ROLLBACK_PINDEX to cbgt.Mgr and is supplied.
64
59
func NewDCPCommon (ctx context.Context , callback sgbucket.FeedEventCallbackFunc , bucket Bucket , metaStore DataStore ,
65
60
maxVbNo uint16 , persistCheckpoints bool , dbStats * expvar.Map , feedID , checkpointPrefix string , metaKeys * MetadataKeys ) (* DCPCommon , error ) {
66
- newBackfillStatus := backfillStatus {
67
- metaKeys : metaKeys ,
68
- }
69
61
70
62
couchbaseStore , ok := AsCouchbaseBucketStore (bucket )
71
63
if ! ok {
@@ -85,7 +77,6 @@ func NewDCPCommon(ctx context.Context, callback sgbucket.FeedEventCallbackFunc,
85
77
updatesSinceCheckpoint : make ([]uint64 , maxVbNo ),
86
78
callback : callback ,
87
79
lastCheckpointTime : make ([]time.Time , maxVbNo ),
88
- backfill : & newBackfillStatus ,
89
80
feedID : feedID ,
90
81
checkpointPrefix : checkpointPrefix ,
91
82
}
@@ -104,13 +95,6 @@ func (c *DCPCommon) dataUpdate(seq uint64, event sgbucket.FeedEvent) {
104
95
}
105
96
106
97
func (c * DCPCommon ) snapshotStart (vbNo uint16 , snapStart , snapEnd uint64 ) {
107
- // During initial backfill, we persist snapshot information to support resuming the DCP
108
- // stream midway through a snapshot. This is primarily for the import when initially
109
- // connection to a populated bucket, to avoid restarting the import from
110
- // zero if SG is terminated before completing processing of the initial snapshots.
111
- if c .backfill .isActive () && c .backfill .isVbActive (vbNo ) {
112
- c .backfill .snapshotStart (vbNo , snapStart , snapEnd )
113
- }
114
98
}
115
99
116
100
// setMetaData and getMetaData may used internally by dcp clients. Expects send/receive of opaque
@@ -219,47 +203,6 @@ func (c *DCPCommon) InitVbMeta(vbNo uint16) {
219
203
c .m .Unlock ()
220
204
}
221
205
222
- func (c * DCPCommon ) initMetadata (maxVbNo uint16 ) {
223
- c .m .Lock ()
224
- defer c .m .Unlock ()
225
-
226
- // Check for persisted backfill sequences
227
- backfillSeqs , err := c .backfill .loadBackfillSequences (c .loggingCtx , c .metaStore )
228
- if err != nil {
229
- // Backfill sequences not present or invalid - will use metadata only
230
- backfillSeqs = nil
231
- }
232
-
233
- // Load persisted metadata
234
- for i := uint16 (0 ); i < maxVbNo ; i ++ {
235
- metadata , snapStart , snapEnd , err := c .loadCheckpoint (i )
236
- if err != nil {
237
- WarnfCtx (c .loggingCtx , "Unexpected error attempting to load DCP checkpoint for vbucket %d. Will restart DCP for that vbucket from zero. Error: %v" , i , err )
238
- c .meta [i ] = []byte {}
239
- c .seqs [i ] = 0
240
- } else {
241
- c .meta [i ] = metadata
242
- c .seqs [i ] = snapStart
243
- // Check whether we persisted a sequence midway through a previous incomplete backfill
244
- if backfillSeqs != nil {
245
- var partialBackfillSequence uint64
246
- if backfillSeqs .Seqs [i ] < backfillSeqs .SnapEnd [i ] {
247
- partialBackfillSequence = backfillSeqs .Seqs [i ]
248
- }
249
- // If we have a backfill sequence later than the DCP checkpoint's snapStart, start from there
250
- if partialBackfillSequence > snapStart {
251
- InfofCtx (c .loggingCtx , KeyDCP , "Restarting vb %d using backfill sequence %d ([%d-%d])" , i , partialBackfillSequence , backfillSeqs .SnapStart [i ], backfillSeqs .SnapEnd [i ])
252
- c .seqs [i ] = partialBackfillSequence
253
- c .meta [i ] = makeVbucketMetadata (c .vbuuids [i ], partialBackfillSequence , backfillSeqs .SnapStart [i ], backfillSeqs .SnapEnd [i ])
254
- } else {
255
- InfofCtx (c .loggingCtx , KeyDCP , "Restarting vb %d using metadata sequence %d (backfill %d not in [%d-%d])" , i , snapStart , partialBackfillSequence , snapStart , snapEnd )
256
- }
257
- }
258
- }
259
- }
260
-
261
- }
262
-
263
206
// TODO: Convert checkpoint persistence to an asynchronous batched process, since
264
207
//
265
208
// restarting w/ an older checkpoint:
@@ -288,207 +231,6 @@ func (c *DCPCommon) updateSeq(vbucketId uint16, seq uint64, warnOnLowerSeqNo boo
288
231
// Update c.seqs for use by GetMetaData()
289
232
c .seqs [vbucketId ] = seq
290
233
291
- // If in backfill, update backfill tracking
292
- if c .backfill .isActive () {
293
- c .backfill .updateStats (c .loggingCtx , vbucketId , previousSequence , c .seqs , c .metaStore )
294
- }
295
-
296
- }
297
-
298
- // Initializes DCP Feed. Determines starting position based on feed type.
299
- func (c * DCPCommon ) initFeed (backfillType uint64 ) (highSeqnos map [uint16 ]uint64 , err error ) {
300
-
301
- var statsUuids map [uint16 ]uint64
302
- statsUuids , highSeqnos , err = c .couchbaseStore .GetStatsVbSeqno (c .maxVbNo , false )
303
- if err != nil {
304
- return nil , pkgerrors .Wrap (err , "Error retrieving stats-vbseqno - DCP not supported" )
305
- }
306
-
307
- c .vbuuids = statsUuids
308
-
309
- switch backfillType {
310
- case sgbucket .FeedNoBackfill :
311
- // For non-backfill, use vbucket uuids, high sequence numbers
312
- DebugfCtx (c .loggingCtx , KeyDCP , "Initializing DCP with no backfill - seeding seqnos: %v" , highSeqnos )
313
- c .seedSeqnos (statsUuids , highSeqnos )
314
- case sgbucket .FeedResume :
315
- // For resume case, load previously persisted checkpoints from bucket
316
- c .initMetadata (c .maxVbNo )
317
- // Track backfill (from persisted checkpoints to current high seqno)
318
- c .backfill .init (c .seqs , highSeqnos , c .maxVbNo , c .dbStatsExpvars )
319
- DebugfCtx (c .loggingCtx , KeyDCP , "Initializing DCP feed based on persisted checkpoints" )
320
- default :
321
- // Otherwise, start feed from zero
322
- startSeqnos := make (map [uint16 ]uint64 , c .maxVbNo )
323
- vbuuids := make (map [uint16 ]uint64 , c .maxVbNo )
324
- c .seedSeqnos (vbuuids , startSeqnos )
325
- // Track backfill (from zero to current high seqno)
326
- c .backfill .init (c .seqs , highSeqnos , c .maxVbNo , c .dbStatsExpvars )
327
- DebugfCtx (c .loggingCtx , KeyDCP , "Initializing DCP feed to start from zero" )
328
- }
329
-
330
- return highSeqnos , nil
331
- }
332
-
333
- // Seeds the sequence numbers returned by GetMetadata to support starting DCP from a particular
334
- // sequence.
335
- func (c * DCPCommon ) seedSeqnos (uuids map [uint16 ]uint64 , seqs map [uint16 ]uint64 ) {
336
- c .m .Lock ()
337
- defer c .m .Unlock ()
338
-
339
- // Set the high seqnos as-is
340
- for vbNo , seq := range seqs {
341
- c .seqs [vbNo ] = seq
342
- }
343
-
344
- // For metadata, we need to do more work to build metadata based on uuid and map values. This
345
- // isn't strictly to the design of cbdatasource.Receiver, which intends metadata to be opaque, but
346
- // is required in order to have the BucketDataSource start the UPRStream as needed.
347
- // The implementation has been reviewed with the cbdatasource owners and they agree this is a
348
- // reasonable approach, as the structure of VBucketMetaData is expected to rarely change.
349
- for vbucketId , uuid := range uuids {
350
- c .meta [vbucketId ] = makeVbucketMetadataForSequence (uuid , seqs [vbucketId ])
351
- }
352
- }
353
-
354
- // BackfillStatus
355
-
356
- // BackfillStatus manages tracking of DCP backfill progress, to provide diagnostics and mid-snapshot restart capability
357
- type backfillStatus struct {
358
- active bool // Whether this DCP feed is in backfill
359
- vbActive []bool // Whether a vbucket is in backfill
360
- receivedSequences uint64 // Number of backfill sequences received
361
- expectedSequences uint64 // Expected number of sequences in backfill
362
- endSeqs []uint64 // Backfill complete sequences, indexed by vbno
363
- snapStart []uint64 // Start sequence of current backfill snapshot
364
- snapEnd []uint64 // End sequence of current backfill snapshot
365
- lastPersistTime time.Time // The last time backfill stats were emitted (log, expvar)
366
- statsMap * expvar.Map // Stats map for backfill
367
- metaKeys * MetadataKeys // MetadataKeys for backfill
368
- }
369
-
370
- func (b * backfillStatus ) init (start []uint64 , end map [uint16 ]uint64 , maxVbNo uint16 , statsMap * expvar.Map ) {
371
- b .vbActive = make ([]bool , maxVbNo )
372
- b .snapStart = make ([]uint64 , maxVbNo )
373
- b .snapEnd = make ([]uint64 , maxVbNo )
374
- b .endSeqs = make ([]uint64 , maxVbNo )
375
- b .statsMap = statsMap
376
-
377
- // Calculate total sequences in backfill
378
- b .expectedSequences = 0
379
- for vbNo := uint16 (0 ); vbNo < maxVbNo ; vbNo ++ {
380
- b .endSeqs [vbNo ] = end [vbNo ]
381
- if end [vbNo ] > start [vbNo ] {
382
- b .expectedSequences += end [vbNo ] - start [vbNo ]
383
- b .vbActive [vbNo ] = true
384
- // Set backfill as active if any vb is in backfill
385
- b .active = true
386
- }
387
- }
388
-
389
- // Initialize backfill expvars
390
- // NOTE: this is a legacy stat, but cannot be removed b/c there are unit tests that depend on these stats
391
- totalVar := & expvar.Int {}
392
- completedVar := & expvar.Int {}
393
- totalVar .Set (int64 (b .expectedSequences ))
394
- completedVar .Set (0 )
395
- statsMap .Set ("dcp_backfill_expected" , totalVar )
396
- statsMap .Set ("dcp_backfill_completed" , completedVar )
397
-
398
- }
399
-
400
- func (b * backfillStatus ) isActive () bool {
401
- return b .active
402
- }
403
-
404
- func (b * backfillStatus ) isVbActive (vbNo uint16 ) bool {
405
- return b .vbActive [vbNo ]
406
- }
407
-
408
- func (b * backfillStatus ) snapshotStart (vbNo uint16 , snapStart uint64 , snapEnd uint64 ) {
409
- b .snapStart [vbNo ] = snapStart
410
- b .snapEnd [vbNo ] = snapEnd
411
- }
412
- func (b * backfillStatus ) updateStats (ctx context.Context , vbno uint16 , previousVbSequence uint64 , currentSequences []uint64 , datastore DataStore ) {
413
- if ! b .vbActive [vbno ] {
414
- return
415
- }
416
-
417
- currentVbSequence := currentSequences [vbno ]
418
-
419
- // Update backfill progress. If this vbucket has run past the end of the backfill, only include up to
420
- // the backfill target for progress tracking.
421
- var backfillDelta uint64
422
- if currentVbSequence >= b .endSeqs [vbno ] {
423
- backfillDelta = b .endSeqs [vbno ] - previousVbSequence
424
- b .vbActive [vbno ] = false
425
- } else {
426
- backfillDelta = currentVbSequence - previousVbSequence
427
- }
428
-
429
- b .receivedSequences += backfillDelta
430
-
431
- // NOTE: this is a legacy stat, but cannot be removed b/c there are unit tests that depend on these stats
432
- b .statsMap .Add ("dcp_backfill_completed" , int64 (backfillDelta ))
433
-
434
- // Check if it's time to persist and log backfill progress
435
- if time .Since (b .lastPersistTime ) > kBackfillPersistInterval {
436
- b .lastPersistTime = time .Now ()
437
- err := b .persistBackfillSequences (datastore , currentSequences )
438
- if err != nil {
439
- WarnfCtx (ctx , "Error persisting back-fill sequences: %v" , err )
440
- }
441
- b .logBackfillProgress (ctx )
442
- }
443
-
444
- // If backfill is complete, log and do backfill inactivation/cleanup
445
- if b .receivedSequences >= b .expectedSequences {
446
- InfofCtx (ctx , KeyDCP , "Backfill complete" )
447
- b .active = false
448
- err := b .purgeBackfillSequences (datastore )
449
- if err != nil {
450
- WarnfCtx (ctx , "Error purging back-fill sequences: %v" , err )
451
- }
452
- }
453
- }
454
-
455
- // Logs current backfill progress. Expects caller to have the lock on r.m
456
- func (b * backfillStatus ) logBackfillProgress (ctx context.Context ) {
457
- if ! b .active {
458
- return
459
- }
460
- InfofCtx (ctx , KeyDCP , "Backfill in progress: %d%% (%d / %d)" , int (b .receivedSequences * 100 / b .expectedSequences ), b .receivedSequences , b .expectedSequences )
461
- }
462
-
463
- // BackfillSequences defines the format used to persist snapshot information to the _sync:dcp_backfill document
464
- // to support mid-snapshot restart
465
- type BackfillSequences struct {
466
- Seqs []uint64
467
- SnapStart []uint64
468
- SnapEnd []uint64
469
- }
470
-
471
- func (b * backfillStatus ) persistBackfillSequences (datastore DataStore , currentSeqs []uint64 ) error {
472
- backfillSeqs := & BackfillSequences {
473
- Seqs : currentSeqs ,
474
- SnapStart : b .snapStart ,
475
- SnapEnd : b .snapEnd ,
476
- }
477
- return datastore .Set (b .metaKeys .DCPBackfillKey (), 0 , nil , backfillSeqs )
478
- }
479
-
480
- func (b * backfillStatus ) loadBackfillSequences (ctx context.Context , datastore DataStore ) (* BackfillSequences , error ) {
481
- var backfillSeqs BackfillSequences
482
- _ , err := datastore .Get (b .metaKeys .DCPBackfillKey (), & backfillSeqs )
483
- if err != nil {
484
- return nil , err
485
- }
486
- InfofCtx (ctx , KeyDCP , "Previously persisted backfill sequences found - will resume" )
487
- return & backfillSeqs , nil
488
- }
489
-
490
- func (b * backfillStatus ) purgeBackfillSequences (datastore DataStore ) error {
491
- return datastore .Delete (b .metaKeys .DCPBackfillKey ())
492
234
}
493
235
494
236
// DCP-related utilities
0 commit comments