Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Move tombstones to it's own package.
Browse files Browse the repository at this point in the history
Signed-off-by: Callum Styan <[email protected]>
  • Loading branch information
cstyan committed Jun 7, 2019
1 parent 3dc6155 commit 0c895bd
Show file tree
Hide file tree
Showing 17 changed files with 173 additions and 163 deletions.
24 changes: 12 additions & 12 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/record"
"github.com/prometheus/tsdb/tombstones"
)

// IndexWriter serializes the index for a block of series data.
Expand Down Expand Up @@ -137,7 +137,7 @@ type BlockReader interface {
Chunks() (ChunkReader, error)

// Tombstones returns a TombstoneReader over the block's deleted data.
Tombstones() (record.TombstoneReader, error)
Tombstones() (tombstones.TombstoneReader, error)

// MinTime returns the min time of the block.
MinTime() int64
Expand Down Expand Up @@ -283,7 +283,7 @@ type Block struct {

chunkr ChunkReader
indexr IndexReader
tombstones record.TombstoneReader
tombstones tombstones.TombstoneReader

logger log.Logger
}
Expand Down Expand Up @@ -320,7 +320,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
}
closers = append(closers, ir)

tr, tsr, err := record.ReadTombstones(dir)
tr, tsr, err := tombstones.ReadTombstones(dir)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -424,7 +424,7 @@ func (pb *Block) Chunks() (ChunkReader, error) {
}

// Tombstones returns a new TombstoneReader against the block data.
func (pb *Block) Tombstones() (record.TombstoneReader, error) {
func (pb *Block) Tombstones() (tombstones.TombstoneReader, error) {
if err := pb.startRead(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -488,7 +488,7 @@ func (r blockIndexReader) Close() error {
}

type blockTombstoneReader struct {
record.TombstoneReader
tombstones.TombstoneReader
b *Block
}

Expand Down Expand Up @@ -524,7 +524,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
ir := pb.indexr

// Choose only valid postings which have chunks in the time-range.
stones := record.NewMemTombstones()
stones := tombstones.NewMemTombstones()

var lset labels.Labels
var chks []chunks.Meta
Expand All @@ -540,7 +540,7 @@ Outer:
if chk.OverlapsClosedInterval(mint, maxt) {
// Delete only until the current values and not beyond.
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
stones.AddInterval(p.At(), record.Interval{tmin, tmax})
stones.AddInterval(p.At(), tombstones.Interval{tmin, tmax})
continue Outer
}
}
Expand All @@ -550,7 +550,7 @@ Outer:
return p.Err()
}

err = pb.tombstones.Iter(func(id uint64, ivs record.Intervals) error {
err = pb.tombstones.Iter(func(id uint64, ivs tombstones.Intervals) error {
for _, iv := range ivs {
stones.AddInterval(id, iv)
}
Expand All @@ -562,7 +562,7 @@ Outer:
pb.tombstones = stones
pb.meta.Stats.NumTombstones = pb.tombstones.Total()

if err := record.WriteTombstoneFile(pb.logger, pb.dir, pb.tombstones); err != nil {
if err := tombstones.WriteTombstoneFile(pb.logger, pb.dir, pb.tombstones); err != nil {
return err
}
return writeMetaFile(pb.logger, pb.dir, &pb.meta)
Expand All @@ -573,7 +573,7 @@ Outer:
func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) {
numStones := 0

if err := pb.tombstones.Iter(func(id uint64, ivs record.Intervals) error {
if err := pb.tombstones.Iter(func(id uint64, ivs tombstones.Intervals) error {
numStones += len(ivs)
return nil
}); err != nil {
Expand Down Expand Up @@ -608,7 +608,7 @@ func (pb *Block) Snapshot(dir string) error {
for _, fname := range []string{
metaFilename,
indexFilename,
record.TombstoneFilename,
tombstones.TombstoneFilename,
} {
if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil {
return errors.Wrapf(err, "create snapshot %s", fname)
Expand Down
18 changes: 9 additions & 9 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/record"
"github.com/prometheus/tsdb/tombstones"
)

// ExponentialBlockRanges returns the time ranges based on the stepSize.
Expand Down Expand Up @@ -606,7 +606,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
}

// Create an empty tombstones file.
if err := record.WriteTombstoneFile(c.logger, tmp, record.NewMemTombstones()); err != nil {
if err := tombstones.WriteTombstoneFile(c.logger, tmp, tombstones.NewMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file")
}

Expand Down Expand Up @@ -849,15 +849,15 @@ type compactionSeriesSet struct {
p index.Postings
index IndexReader
chunks ChunkReader
tombstones record.TombstoneReader
tombstones tombstones.TombstoneReader

l labels.Labels
c []chunks.Meta
intervals record.Intervals
intervals tombstones.Intervals
err error
}

func newCompactionSeriesSet(i IndexReader, c ChunkReader, t record.TombstoneReader, p index.Postings) *compactionSeriesSet {
func newCompactionSeriesSet(i IndexReader, c ChunkReader, t tombstones.TombstoneReader, p index.Postings) *compactionSeriesSet {
return &compactionSeriesSet{
index: i,
chunks: c,
Expand Down Expand Up @@ -887,7 +887,7 @@ func (c *compactionSeriesSet) Next() bool {
if len(c.intervals) > 0 {
chks := make([]chunks.Meta, 0, len(c.c))
for _, chk := range c.c {
if !(record.Interval{chk.MinTime, chk.MaxTime}.IsSubrange(c.intervals)) {
if !(tombstones.Interval{chk.MinTime, chk.MaxTime}.IsSubrange(c.intervals)) {
chks = append(chks, chk)
}
}
Expand Down Expand Up @@ -915,7 +915,7 @@ func (c *compactionSeriesSet) Err() error {
return c.p.Err()
}

func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, record.Intervals) {
func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
return c.l, c.c, c.intervals
}

Expand All @@ -925,7 +925,7 @@ type compactionMerger struct {
aok, bok bool
l labels.Labels
c []chunks.Meta
intervals record.Intervals
intervals tombstones.Intervals
}

func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) {
Expand Down Expand Up @@ -1002,6 +1002,6 @@ func (c *compactionMerger) Err() error {
return c.b.Err()
}

func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, record.Intervals) {
func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
return c.l, c.c, c.intervals
}
4 changes: 2 additions & 2 deletions compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/record"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/tombstones"
)

func TestSplitByRange(t *testing.T) {
Expand Down Expand Up @@ -458,7 +458,7 @@ type erringBReader struct{}

func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") }
func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") }
func (erringBReader) Tombstones() (record.TombstoneReader, error) {
func (erringBReader) Tombstones() (tombstones.TombstoneReader, error) {
return nil, errors.New("tombstones")
}
func (erringBReader) MinTime() int64 { return 0 }
Expand Down
29 changes: 15 additions & 14 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/record"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/tombstones"
"github.com/prometheus/tsdb/tsdbutil"
"github.com/prometheus/tsdb/wal"
)
Expand Down Expand Up @@ -247,27 +248,27 @@ func TestDeleteSimple(t *testing.T) {
numSamples := int64(10)

cases := []struct {
intervals record.Intervals
intervals tombstones.Intervals
remaint []int64
}{
{
intervals: record.Intervals{{0, 3}},
intervals: tombstones.Intervals{{0, 3}},
remaint: []int64{4, 5, 6, 7, 8, 9},
},
{
intervals: record.Intervals{{1, 3}},
intervals: tombstones.Intervals{{1, 3}},
remaint: []int64{0, 4, 5, 6, 7, 8, 9},
},
{
intervals: record.Intervals{{1, 3}, {4, 7}},
intervals: tombstones.Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
},
{
intervals: record.Intervals{{1, 3}, {4, 700}},
intervals: tombstones.Intervals{{1, 3}, {4, 700}},
remaint: []int64{0},
},
{ // This case is to ensure that labels and symbols are deleted.
intervals: record.Intervals{{0, 9}},
intervals: tombstones.Intervals{{0, 9}},
remaint: []int64{},
},
}
Expand Down Expand Up @@ -509,11 +510,11 @@ func TestDB_SnapshotWithDelete(t *testing.T) {

testutil.Ok(t, app.Commit())
cases := []struct {
intervals record.Intervals
intervals tombstones.Intervals
remaint []int64
}{
{
intervals: record.Intervals{{1, 3}, {4, 7}},
intervals: tombstones.Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
},
}
Expand Down Expand Up @@ -836,11 +837,11 @@ func TestTombstoneClean(t *testing.T) {

testutil.Ok(t, app.Commit())
cases := []struct {
intervals record.Intervals
intervals tombstones.Intervals
remaint []int64
}{
{
intervals: record.Intervals{{1, 3}, {4, 7}},
intervals: tombstones.Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
},
}
Expand Down Expand Up @@ -912,7 +913,7 @@ func TestTombstoneClean(t *testing.T) {
}

for _, b := range db.Blocks() {
testutil.Equals(t, record.NewMemTombstones(), b.tombstones)
testutil.Equals(t, tombstones.NewMemTombstones(), b.tombstones)
}
}
}
Expand All @@ -938,8 +939,8 @@ func TestTombstoneCleanFail(t *testing.T) {
block, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
// Add some some fake tombstones to trigger the compaction.
tomb := record.NewMemTombstones()
tomb.AddInterval(0, record.Interval{0, 1})
tomb := tombstones.NewMemTombstones()
tomb.AddInterval(0, tombstones.Interval{0, 1})
block.tombstones = tomb

db.blocks = append(db.blocks, block)
Expand Down Expand Up @@ -1092,7 +1093,7 @@ func dbDiskSize(dir string) int64 {
// Include only index,tombstone and chunks.
if filepath.Dir(path) == chunkDir(filepath.Dir(filepath.Dir(path))) ||
info.Name() == indexFilename ||
info.Name() == record.TombstoneFilename {
info.Name() == tombstones.TombstoneFilename {
statSize += info.Size()
}
return nil
Expand Down
23 changes: 12 additions & 11 deletions head.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/record"
"github.com/prometheus/tsdb/tombstones"
"github.com/prometheus/tsdb/wal"
)

Expand All @@ -43,7 +44,7 @@ var (

// emptyTombstoneReader is a no-op Tombstone Reader.
// This is used by head to satisfy the Tombstones() function call.
emptyTombstoneReader = record.NewMemTombstones()
emptyTombstoneReader = tombstones.NewMemTombstones()
)

// Head handles reads and writes of time series data within a time window.
Expand Down Expand Up @@ -335,8 +336,8 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) error {
dec record.RecordDecoder
series []record.RefSeries
samples []record.RefSample
tstones []record.Stone
allStones = record.NewMemTombstones()
tstones []tombstones.Stone
allStones = tombstones.NewMemTombstones()
err error
)
defer allStones.Close()
Expand All @@ -360,7 +361,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) error {
if !created {
// There's already a different ref for this series.
multiRefLock.Lock()
multiRef[s.Ref] = series.ref
multiRef[s.Ref] = series.Ref
multiRefLock.Unlock()
}

Expand Down Expand Up @@ -450,7 +451,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) error {
}
wg.Wait()

if err := allStones.Iter(func(ref uint64, dranges record.Intervals) error {
if err := allStones.Iter(func(ref uint64, dranges tombstones.Intervals) error {
return h.chunkRewrite(ref, dranges)
}); err != nil {
return errors.Wrap(r.Err(), "deleting samples from tombstones")
Expand Down Expand Up @@ -657,7 +658,7 @@ func (h *rangeHead) Chunks() (ChunkReader, error) {
return h.head.chunksRange(h.mint, h.maxt), nil
}

func (h *rangeHead) Tombstones() (record.TombstoneReader, error) {
func (h *rangeHead) Tombstones() (tombstones.TombstoneReader, error) {
return emptyTombstoneReader, nil
}

Expand Down Expand Up @@ -913,7 +914,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
return errors.Wrap(err, "select series")
}

var stones []record.Stone
var stones []tombstones.Stone
dirty := false
for p.Next() {
series := h.series.getByID(p.At())
Expand All @@ -925,9 +926,9 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
// Delete only until the current values and not beyond.
t0, t1 = clampInterval(mint, maxt, t0, t1)
if h.wal != nil {
stones = append(stones, record.Stone{p.At(), record.Intervals{{t0, t1}}})
stones = append(stones, tombstones.Stone{p.At(), tombstones.Intervals{{t0, t1}}})
}
if err := h.chunkRewrite(p.At(), record.Intervals{{t0, t1}}); err != nil {
if err := h.chunkRewrite(p.At(), tombstones.Intervals{{t0, t1}}); err != nil {
return errors.Wrap(err, "delete samples")
}
dirty = true
Expand All @@ -954,7 +955,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
// chunkRewrite re-writes the chunks which overlaps with deleted ranges
// and removes the samples in the deleted ranges.
// Chunks is deleted if no samples are left at the end.
func (h *Head) chunkRewrite(ref uint64, dranges record.Intervals) (err error) {
func (h *Head) chunkRewrite(ref uint64, dranges tombstones.Intervals) (err error) {
if len(dranges) == 0 {
return nil
}
Expand Down Expand Up @@ -1044,7 +1045,7 @@ func (h *Head) gc() {
}

// Tombstones returns a new reader over the head's tombstones
func (h *Head) Tombstones() (record.TombstoneReader, error) {
func (h *Head) Tombstones() (tombstones.TombstoneReader, error) {
return emptyTombstoneReader, nil
}

Expand Down
Loading

0 comments on commit 0c895bd

Please sign in to comment.