diff --git a/pkg/experiment/metastore/compaction/compactor/compaction_queue.go b/pkg/experiment/metastore/compaction/compactor/compaction_queue.go index 72be959aa5..792dc62c1e 100644 --- a/pkg/experiment/metastore/compaction/compactor/compaction_queue.go +++ b/pkg/experiment/metastore/compaction/compactor/compaction_queue.go @@ -385,10 +385,6 @@ type blockIter struct { } func newBlockIter() *blockIter { - // Assuming that block IDs (16b ULID) are globally unique. - // We could achieve the same with more efficiency by marking visited - // batches. However, marking visited blocks seems to be more robust, - // and the size of the map is expected to be small. visited := make(map[string]struct{}, 64) visited[zeroBlockEntry.id] = struct{}{} return &blockIter{visited: visited} diff --git a/pkg/experiment/metastore/compaction/compactor/compactor.go b/pkg/experiment/metastore/compaction/compactor/compactor.go index 886860e2de..92aead218c 100644 --- a/pkg/experiment/metastore/compaction/compactor/compactor.go +++ b/pkg/experiment/metastore/compaction/compactor/compactor.go @@ -75,12 +75,7 @@ func (c *Compactor) NewPlan(cmd *raft.Log) compaction.Plan { now := cmd.AppendedAt.UnixNano() before := cmd.AppendedAt.Add(-c.config.CleanupDelay) tombstones := c.tombstones.ListTombstones(before) - return &plan{ - compactor: c, - tombstones: tombstones, - blocks: newBlockIter(), - now: now, - } + return newPlan(c, tombstones, now) } func (c *Compactor) UpdatePlan(tx *bbolt.Tx, plan *raft_log.CompactionPlanUpdate) error { diff --git a/pkg/experiment/metastore/compaction/compactor/plan.go b/pkg/experiment/metastore/compaction/compactor/plan.go index 6fdfe0335b..6b20e0e824 100644 --- a/pkg/experiment/metastore/compaction/compactor/plan.go +++ b/pkg/experiment/metastore/compaction/compactor/plan.go @@ -24,9 +24,20 @@ type plan struct { compactor *Compactor batches *batchIter blocks *blockIter + visited map[compactionKey]struct{} now int64 } +func newPlan(c *Compactor, t iter.Iterator[*metastorev1.Tombstones], now int64) *plan { + return &plan{ + compactor: c, + tombstones: t, + blocks: newBlockIter(), + visited: make(map[compactionKey]struct{}), + now: now, + } +} + func (p *plan) CreateJob() (*raft_log.CompactionJobPlan, error) { planned := p.nextJob() if planned == nil { @@ -80,6 +91,11 @@ func (p *plan) nextJob() *jobPlan { continue } + if _, ok = p.visited[b.staged.key]; ok { + // We've already checked all the blocks with this compaction key. + continue + } + // We've found the oldest batch, it's time to plan a job. // Job levels are zero based: L0 job means that it includes blocks // with compaction level 0. This can be altered (1-based levels): @@ -130,6 +146,7 @@ func (p *plan) nextJob() *jobPlan { // The job plan is canceled for the compaction key, and we need to // continue with the next compaction key, or level. + p.visited[job.compactionKey] = struct{}{} } return nil diff --git a/pkg/experiment/metastore/compaction/compactor/plan_test.go b/pkg/experiment/metastore/compaction/compactor/plan_test.go index 0933632f8c..22dd5e892e 100644 --- a/pkg/experiment/metastore/compaction/compactor/plan_test.go +++ b/pkg/experiment/metastore/compaction/compactor/plan_test.go @@ -61,7 +61,7 @@ func TestPlan_same_level(t *testing.T) { }, } - p := &plan{compactor: c, blocks: newBlockIter()} + p := newPlan(c, nil, 0) planned := make([]*jobPlan, 0, len(expected)) for j := p.nextJob(); j != nil; j = p.nextJob() { planned = append(planned, j) @@ -95,7 +95,7 @@ func TestPlan_same_level(t *testing.T) { i++ } - p = &plan{compactor: c, blocks: newBlockIter()} + p = newPlan(c, nil, 0) planned = planned[:0] // Old jobs should be re-planned. for j := p.nextJob(); j != nil; j = p.nextJob() { planned = append(planned, j) @@ -134,7 +134,7 @@ func TestPlan_level_priority(t *testing.T) { }, } - p := &plan{compactor: c, blocks: newBlockIter()} + p := newPlan(c, nil, 0) planned := make([]*jobPlan, 0, len(expected)) for j := p.nextJob(); j != nil; j = p.nextJob() { planned = append(planned, j) @@ -146,7 +146,7 @@ func TestPlan_level_priority(t *testing.T) { func TestPlan_empty_queue(t *testing.T) { c := NewCompactor(testConfig, nil, nil, nil) - p := &plan{compactor: c, blocks: newBlockIter()} + p := newPlan(c, nil, 0) assert.Nil(t, p.nextJob()) c.enqueue(compaction.BlockEntry{ @@ -159,7 +159,7 @@ func TestPlan_empty_queue(t *testing.T) { // L0 queue is empty. // L1 queue has one block. - p = &plan{compactor: c, blocks: newBlockIter()} + p = newPlan(c, nil, 0) assert.Nil(t, p.nextJob()) c.enqueue(compaction.BlockEntry{ @@ -172,7 +172,7 @@ func TestPlan_empty_queue(t *testing.T) { // L0 queue is empty. // L2 has blocks for a job. - p = &plan{compactor: c, blocks: newBlockIter()} + p = newPlan(c, nil, 0) assert.NotNil(t, p.nextJob()) } @@ -190,9 +190,7 @@ func TestPlan_deleted_blocks(t *testing.T) { } { e.Index = uint64(i) e.ID = strconv.Itoa(i) - if !c.enqueue(e) { - t.Errorf("failed to enqueue: %v", e) - } + c.enqueue(e) i++ } @@ -217,7 +215,7 @@ func TestPlan_deleted_blocks(t *testing.T) { }, } - p := &plan{compactor: c, blocks: newBlockIter()} + p := newPlan(c, nil, 0) planned := make([]*jobPlan, 0, len(expected)) for j := p.nextJob(); j != nil; j = p.nextJob() { planned = append(planned, j) @@ -245,7 +243,7 @@ func TestPlan_deleted_blocks(t *testing.T) { }, }, expected...) - p = &plan{compactor: c, blocks: newBlockIter()} + p = newPlan(c, nil, 0) planned = planned[:0] for j := p.nextJob(); j != nil; j = p.nextJob() { planned = append(planned, j) @@ -264,7 +262,7 @@ func TestPlan_deleted_batch(t *testing.T) { remove(c.queue.levels[0], compactionKey{}, "0", "1", "2") - p := &plan{compactor: c, blocks: newBlockIter()} + p := newPlan(c, nil, 0) assert.Nil(t, p.nextJob()) } @@ -299,12 +297,7 @@ func TestPlan_compact_by_time(t *testing.T) { }, } - p := &plan{ - compactor: c, - blocks: newBlockIter(), - now: 40, - } - + p := newPlan(c, nil, 40) planned := make([]*jobPlan, 0, len(expected)) for j := p.nextJob(); j != nil; j = p.nextJob() { planned = append(planned, j) @@ -347,12 +340,7 @@ func TestPlan_time_split(t *testing.T) { c.enqueue(e) } - p := &plan{ - compactor: c, - blocks: newBlockIter(), - now: now.UnixNano(), - } - + p := newPlan(c, nil, now.UnixNano()) var i int var n int for j := p.nextJob(); j != nil; j = p.nextJob() {