Skip to content

CBG-560 Remove unused DCP backfill status code #7532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 0 additions & 258 deletions base/dcp_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

sgbucket "github.com/couchbase/sg-bucket"
"github.com/google/uuid"
pkgerrors "github.com/pkg/errors"
)

// Number of non-checkpoint updates per vbucket required to trigger metadata persistence. Must be greater than zero to avoid
Expand All @@ -33,9 +32,6 @@ const kCheckpointThreshold = 1
// Only persist checkpoint once per kCheckpointTimeThreshold (per vbucket)
const kCheckpointTimeThreshold = 1 * time.Minute

// Persist backfill progress every 10s
const kBackfillPersistInterval = 10 * time.Second

// DCP Feed IDs are used to build unique DCP identifiers
const DCPCachingFeedID = "SG"
const DCPImportFeedID = "SGI"
Expand All @@ -54,7 +50,6 @@ type DCPCommon struct {
updatesSinceCheckpoint []uint64 // Number of updates since the last checkpoint. Used to avoid checkpoint persistence feedback loop
lastCheckpointTime []time.Time // Time of last checkpoint persistence, per vbucket. Used to manage checkpoint persistence volume
callback sgbucket.FeedEventCallbackFunc // Function to callback for mutation processing
backfill *backfillStatus // Backfill state and stats
feedID string // Unique feed ID, used for logging
loggingCtx context.Context // Logging context, prefixes feedID
checkpointPrefix string // DCP checkpoint key prefix
Expand All @@ -63,9 +58,6 @@ type DCPCommon struct {
// NewDCPCommon creates a new DCPCommon which manages updates coming from a cbgt-based DCP feed. The callback function will receive events from a DCP feed. The bucket is the gocb bucket to stream events from. It stores checkpoints in the metaStore collection prefixes from metaKeys + checkpointPrefix. The feed name will start with feedID and DCPCommon will add unique string. Specific stats for DCP are stored in expvars rather than SgwStats. The janitorRollback function is supplied by the global cbgt.PIndexImplType.New function, for initial opening of a partition index, and cbgt.PIndexImplType.OpenUsing for reopening of a partition index. The rollback function provides a way to pass cbgt.JANITOR_ROLLBACK_PINDEX to cbgt.Mgr and is supplied.
func NewDCPCommon(ctx context.Context, callback sgbucket.FeedEventCallbackFunc, bucket Bucket, metaStore DataStore,
maxVbNo uint16, persistCheckpoints bool, dbStats *expvar.Map, feedID, checkpointPrefix string, metaKeys *MetadataKeys) (*DCPCommon, error) {
newBackfillStatus := backfillStatus{
metaKeys: metaKeys,
}

couchbaseStore, ok := AsCouchbaseBucketStore(bucket)
if !ok {
Expand All @@ -85,7 +77,6 @@ func NewDCPCommon(ctx context.Context, callback sgbucket.FeedEventCallbackFunc,
updatesSinceCheckpoint: make([]uint64, maxVbNo),
callback: callback,
lastCheckpointTime: make([]time.Time, maxVbNo),
backfill: &newBackfillStatus,
feedID: feedID,
checkpointPrefix: checkpointPrefix,
}
Expand All @@ -104,13 +95,6 @@ func (c *DCPCommon) dataUpdate(seq uint64, event sgbucket.FeedEvent) {
}

func (c *DCPCommon) snapshotStart(vbNo uint16, snapStart, snapEnd uint64) {
// During initial backfill, we persist snapshot information to support resuming the DCP
// stream midway through a snapshot. This is primarily for the import when initially
// connection to a populated bucket, to avoid restarting the import from
// zero if SG is terminated before completing processing of the initial snapshots.
if c.backfill.isActive() && c.backfill.isVbActive(vbNo) {
c.backfill.snapshotStart(vbNo, snapStart, snapEnd)
}
}

// setMetaData and getMetaData may used internally by dcp clients. Expects send/receive of opaque
Expand Down Expand Up @@ -219,47 +203,6 @@ func (c *DCPCommon) InitVbMeta(vbNo uint16) {
c.m.Unlock()
}

func (c *DCPCommon) initMetadata(maxVbNo uint16) {
c.m.Lock()
defer c.m.Unlock()

// Check for persisted backfill sequences
backfillSeqs, err := c.backfill.loadBackfillSequences(c.loggingCtx, c.metaStore)
if err != nil {
// Backfill sequences not present or invalid - will use metadata only
backfillSeqs = nil
}

// Load persisted metadata
for i := uint16(0); i < maxVbNo; i++ {
metadata, snapStart, snapEnd, err := c.loadCheckpoint(i)
if err != nil {
WarnfCtx(c.loggingCtx, "Unexpected error attempting to load DCP checkpoint for vbucket %d. Will restart DCP for that vbucket from zero. Error: %v", i, err)
c.meta[i] = []byte{}
c.seqs[i] = 0
} else {
c.meta[i] = metadata
c.seqs[i] = snapStart
// Check whether we persisted a sequence midway through a previous incomplete backfill
if backfillSeqs != nil {
var partialBackfillSequence uint64
if backfillSeqs.Seqs[i] < backfillSeqs.SnapEnd[i] {
partialBackfillSequence = backfillSeqs.Seqs[i]
}
// If we have a backfill sequence later than the DCP checkpoint's snapStart, start from there
if partialBackfillSequence > snapStart {
InfofCtx(c.loggingCtx, KeyDCP, "Restarting vb %d using backfill sequence %d ([%d-%d])", i, partialBackfillSequence, backfillSeqs.SnapStart[i], backfillSeqs.SnapEnd[i])
c.seqs[i] = partialBackfillSequence
c.meta[i] = makeVbucketMetadata(c.vbuuids[i], partialBackfillSequence, backfillSeqs.SnapStart[i], backfillSeqs.SnapEnd[i])
} else {
InfofCtx(c.loggingCtx, KeyDCP, "Restarting vb %d using metadata sequence %d (backfill %d not in [%d-%d])", i, snapStart, partialBackfillSequence, snapStart, snapEnd)
}
}
}
}

}

// TODO: Convert checkpoint persistence to an asynchronous batched process, since
//
// restarting w/ an older checkpoint:
Expand Down Expand Up @@ -288,207 +231,6 @@ func (c *DCPCommon) updateSeq(vbucketId uint16, seq uint64, warnOnLowerSeqNo boo
// Update c.seqs for use by GetMetaData()
c.seqs[vbucketId] = seq

// If in backfill, update backfill tracking
if c.backfill.isActive() {
c.backfill.updateStats(c.loggingCtx, vbucketId, previousSequence, c.seqs, c.metaStore)
}

}

// Initializes DCP Feed. Determines starting position based on feed type.
func (c *DCPCommon) initFeed(backfillType uint64) (highSeqnos map[uint16]uint64, err error) {

var statsUuids map[uint16]uint64
statsUuids, highSeqnos, err = c.couchbaseStore.GetStatsVbSeqno(c.maxVbNo, false)
if err != nil {
return nil, pkgerrors.Wrap(err, "Error retrieving stats-vbseqno - DCP not supported")
}

c.vbuuids = statsUuids

switch backfillType {
case sgbucket.FeedNoBackfill:
// For non-backfill, use vbucket uuids, high sequence numbers
DebugfCtx(c.loggingCtx, KeyDCP, "Initializing DCP with no backfill - seeding seqnos: %v", highSeqnos)
c.seedSeqnos(statsUuids, highSeqnos)
case sgbucket.FeedResume:
// For resume case, load previously persisted checkpoints from bucket
c.initMetadata(c.maxVbNo)
// Track backfill (from persisted checkpoints to current high seqno)
c.backfill.init(c.seqs, highSeqnos, c.maxVbNo, c.dbStatsExpvars)
DebugfCtx(c.loggingCtx, KeyDCP, "Initializing DCP feed based on persisted checkpoints")
default:
// Otherwise, start feed from zero
startSeqnos := make(map[uint16]uint64, c.maxVbNo)
vbuuids := make(map[uint16]uint64, c.maxVbNo)
c.seedSeqnos(vbuuids, startSeqnos)
// Track backfill (from zero to current high seqno)
c.backfill.init(c.seqs, highSeqnos, c.maxVbNo, c.dbStatsExpvars)
DebugfCtx(c.loggingCtx, KeyDCP, "Initializing DCP feed to start from zero")
}

return highSeqnos, nil
}

// Seeds the sequence numbers returned by GetMetadata to support starting DCP from a particular
// sequence.
func (c *DCPCommon) seedSeqnos(uuids map[uint16]uint64, seqs map[uint16]uint64) {
c.m.Lock()
defer c.m.Unlock()

// Set the high seqnos as-is
for vbNo, seq := range seqs {
c.seqs[vbNo] = seq
}

// For metadata, we need to do more work to build metadata based on uuid and map values. This
// isn't strictly to the design of cbdatasource.Receiver, which intends metadata to be opaque, but
// is required in order to have the BucketDataSource start the UPRStream as needed.
// The implementation has been reviewed with the cbdatasource owners and they agree this is a
// reasonable approach, as the structure of VBucketMetaData is expected to rarely change.
for vbucketId, uuid := range uuids {
c.meta[vbucketId] = makeVbucketMetadataForSequence(uuid, seqs[vbucketId])
}
}

// BackfillStatus

// BackfillStatus manages tracking of DCP backfill progress, to provide diagnostics and mid-snapshot restart capability
type backfillStatus struct {
active bool // Whether this DCP feed is in backfill
vbActive []bool // Whether a vbucket is in backfill
receivedSequences uint64 // Number of backfill sequences received
expectedSequences uint64 // Expected number of sequences in backfill
endSeqs []uint64 // Backfill complete sequences, indexed by vbno
snapStart []uint64 // Start sequence of current backfill snapshot
snapEnd []uint64 // End sequence of current backfill snapshot
lastPersistTime time.Time // The last time backfill stats were emitted (log, expvar)
statsMap *expvar.Map // Stats map for backfill
metaKeys *MetadataKeys // MetadataKeys for backfill
}

func (b *backfillStatus) init(start []uint64, end map[uint16]uint64, maxVbNo uint16, statsMap *expvar.Map) {
b.vbActive = make([]bool, maxVbNo)
b.snapStart = make([]uint64, maxVbNo)
b.snapEnd = make([]uint64, maxVbNo)
b.endSeqs = make([]uint64, maxVbNo)
b.statsMap = statsMap

// Calculate total sequences in backfill
b.expectedSequences = 0
for vbNo := uint16(0); vbNo < maxVbNo; vbNo++ {
b.endSeqs[vbNo] = end[vbNo]
if end[vbNo] > start[vbNo] {
b.expectedSequences += end[vbNo] - start[vbNo]
b.vbActive[vbNo] = true
// Set backfill as active if any vb is in backfill
b.active = true
}
}

// Initialize backfill expvars
// NOTE: this is a legacy stat, but cannot be removed b/c there are unit tests that depend on these stats
totalVar := &expvar.Int{}
completedVar := &expvar.Int{}
totalVar.Set(int64(b.expectedSequences))
completedVar.Set(0)
statsMap.Set("dcp_backfill_expected", totalVar)
statsMap.Set("dcp_backfill_completed", completedVar)

}

func (b *backfillStatus) isActive() bool {
return b.active
}

func (b *backfillStatus) isVbActive(vbNo uint16) bool {
return b.vbActive[vbNo]
}

func (b *backfillStatus) snapshotStart(vbNo uint16, snapStart uint64, snapEnd uint64) {
b.snapStart[vbNo] = snapStart
b.snapEnd[vbNo] = snapEnd
}
func (b *backfillStatus) updateStats(ctx context.Context, vbno uint16, previousVbSequence uint64, currentSequences []uint64, datastore DataStore) {
if !b.vbActive[vbno] {
return
}

currentVbSequence := currentSequences[vbno]

// Update backfill progress. If this vbucket has run past the end of the backfill, only include up to
// the backfill target for progress tracking.
var backfillDelta uint64
if currentVbSequence >= b.endSeqs[vbno] {
backfillDelta = b.endSeqs[vbno] - previousVbSequence
b.vbActive[vbno] = false
} else {
backfillDelta = currentVbSequence - previousVbSequence
}

b.receivedSequences += backfillDelta

// NOTE: this is a legacy stat, but cannot be removed b/c there are unit tests that depend on these stats
b.statsMap.Add("dcp_backfill_completed", int64(backfillDelta))

// Check if it's time to persist and log backfill progress
if time.Since(b.lastPersistTime) > kBackfillPersistInterval {
b.lastPersistTime = time.Now()
err := b.persistBackfillSequences(datastore, currentSequences)
if err != nil {
WarnfCtx(ctx, "Error persisting back-fill sequences: %v", err)
}
b.logBackfillProgress(ctx)
}

// If backfill is complete, log and do backfill inactivation/cleanup
if b.receivedSequences >= b.expectedSequences {
InfofCtx(ctx, KeyDCP, "Backfill complete")
b.active = false
err := b.purgeBackfillSequences(datastore)
if err != nil {
WarnfCtx(ctx, "Error purging back-fill sequences: %v", err)
}
}
}

// Logs current backfill progress. Expects caller to have the lock on r.m
func (b *backfillStatus) logBackfillProgress(ctx context.Context) {
if !b.active {
return
}
InfofCtx(ctx, KeyDCP, "Backfill in progress: %d%% (%d / %d)", int(b.receivedSequences*100/b.expectedSequences), b.receivedSequences, b.expectedSequences)
}

// BackfillSequences defines the format used to persist snapshot information to the _sync:dcp_backfill document
// to support mid-snapshot restart
type BackfillSequences struct {
Seqs []uint64
SnapStart []uint64
SnapEnd []uint64
}

func (b *backfillStatus) persistBackfillSequences(datastore DataStore, currentSeqs []uint64) error {
backfillSeqs := &BackfillSequences{
Seqs: currentSeqs,
SnapStart: b.snapStart,
SnapEnd: b.snapEnd,
}
return datastore.Set(b.metaKeys.DCPBackfillKey(), 0, nil, backfillSeqs)
}

func (b *backfillStatus) loadBackfillSequences(ctx context.Context, datastore DataStore) (*BackfillSequences, error) {
var backfillSeqs BackfillSequences
_, err := datastore.Get(b.metaKeys.DCPBackfillKey(), &backfillSeqs)
if err != nil {
return nil, err
}
InfofCtx(ctx, KeyDCP, "Previously persisted backfill sequences found - will resume")
return &backfillSeqs, nil
}

func (b *backfillStatus) purgeBackfillSequences(datastore DataStore) error {
return datastore.Delete(b.metaKeys.DCPBackfillKey())
}

// DCP-related utilities
Expand Down
5 changes: 0 additions & 5 deletions base/dcp_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ func init() {

type SGDest interface {
cbgt.Dest
initFeed(backfillType uint64) (map[uint16]uint64, error)
}

// DCPDest implements SGDest (superset of cbgt.Dest) interface to manage updates coming from a
Expand Down Expand Up @@ -329,7 +328,3 @@ func (d *DCPLoggingDest) Query(pindex *cbgt.PIndex, req []byte, w io.Writer,
func (d *DCPLoggingDest) Stats(w io.Writer) error {
return d.dest.Stats(w)
}

func (d *DCPLoggingDest) initFeed(backfillType uint64) (map[uint16]uint64, error) {
return d.dest.initFeed(backfillType)
}
55 changes: 0 additions & 55 deletions rest/importtest/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1747,61 +1747,6 @@ func TestImportRevisionCopyDisabled(t *testing.T) {
assert.Equal(t, 404, response.Code)
}

// Test DCP backfill stats
func TestDcpBackfill(t *testing.T) {

t.Skip("Test disabled pending CBG-560")

rt := rest.NewRestTester(t, nil)

log.Printf("Starting get bucket....")

dataStore := rt.GetSingleDataStore()

// Write enough documents directly to the bucket to ensure multiple docs per vbucket (on average)
docBody := make(map[string]interface{})
docBody["type"] = "sdk_write"
for i := 0; i < 2500; i++ {
err := dataStore.Set(fmt.Sprintf("doc_%d", i), 0, nil, docBody)
assert.NoError(t, err, fmt.Sprintf("error setting doc_%d", i))
}

// Close the previous test context
rt.Close()

log.Print("Creating new database context")

// Create a new context, with import docs enabled, to process backfill
newRtConfig := rest.RestTesterConfig{
DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{
AutoImport: true,
}},
}
newRt := rest.NewRestTester(t, &newRtConfig)
defer newRt.Close()
log.Printf("Poke the rest tester so it starts DCP processing:")

backfillComplete := false
var expectedBackfill, completedBackfill int
for i := 0; i < 20; i++ {
importFeedStats := newRt.GetDatabase().DbStats.Database().ImportFeedMapStats
expectedBackfill, _ := strconv.Atoi(importFeedStats.Get("dcp_backfill_expected").String())
completedBackfill, _ := strconv.Atoi(importFeedStats.Get("dcp_backfill_completed").String())
if expectedBackfill > 0 && completedBackfill >= expectedBackfill {
log.Printf("backfill complete: %d/%d", completedBackfill, expectedBackfill)
backfillComplete = true
break
} else {
log.Printf("backfill still in progress: %d/%d", completedBackfill, expectedBackfill)
time.Sleep(1 * time.Second)
}
}
assert.True(t, backfillComplete, fmt.Sprintf("Backfill didn't complete after 20s. Latest: %d/%d", completedBackfill, expectedBackfill))

log.Printf("done...%s (%d/%d)", newRt.ServerContext().Database(newRt.Context(), "db").Name, completedBackfill, expectedBackfill)

}

// Validate SG behaviour if there's an unexpected body on a tombstone
func TestUnexpectedBodyOnTombstone(t *testing.T) {

Expand Down
Loading