diff --git a/index/index.go b/index/index.go index 1a1e9bf3..ab9b26ee 100644 --- a/index/index.go +++ b/index/index.go @@ -44,6 +44,8 @@ const ( FormatV1 = 1 // FormatV2 represents 2 version of index. FormatV2 = 2 + // FormatV3 represents 3 version of index (using PrefixCompressedPostings for postings). + FormatV3 = 3 labelNameSeperator = "\xff" @@ -121,7 +123,7 @@ type Writer struct { // Reusable memory. buf1 encoding.Encbuf buf2 encoding.Encbuf - uint32s []uint32 + uint64s []uint64 symbols map[string]uint32 // symbol offsets seriesOffsets map[uint64]uint64 // offsets of series @@ -205,7 +207,7 @@ func NewWriter(fn string) (*Writer, error) { // Reusable memory. buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - uint32s: make([]uint32, 0, 1<<15), + uint64s: make([]uint64, 0, 1<<15), // Caches. symbols: make(map[string]uint32, 1<<13), @@ -290,7 +292,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error { func (w *Writer) writeMeta() error { w.buf1.Reset() w.buf1.PutBE32(MagicIndex) - w.buf1.PutByte(FormatV2) + w.buf1.PutByte(FormatV3) return w.write(w.buf1.Get()) } @@ -522,30 +524,25 @@ func (w *Writer) WritePostings(name, value string, it Postings) error { // Order of the references in the postings list does not imply order // of the series references within the persisted block they are mapped to. // We have to sort the new references again. - refs := w.uint32s[:0] + refs := w.uint64s[:0] for it.Next() { offset, ok := w.seriesOffsets[it.At()] if !ok { return errors.Errorf("%p series for reference %d not found", w, it.At()) } - if offset > (1<<32)-1 { - return errors.Errorf("series offset %d exceeds 4 bytes", offset) - } - refs = append(refs, uint32(offset)) + refs = append(refs, offset) } if err := it.Err(); err != nil { return err } - sort.Sort(uint32slice(refs)) + sort.Sort(uint64slice(refs)) w.buf2.Reset() w.buf2.PutBE32int(len(refs)) - for _, r := range refs { - w.buf2.PutBE32(r) - } - w.uint32s = refs + writePrefixCompressedPostings(&w.buf2, refs) + w.uint64s = refs w.buf1.Reset() w.buf1.PutBE32int(w.buf2.Len()) @@ -556,11 +553,11 @@ func (w *Writer) WritePostings(name, value string, it Postings) error { return errors.Wrap(err, "write postings") } -type uint32slice []uint32 +type uint64slice []uint64 -func (s uint32slice) Len() int { return len(s) } -func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] } +func (s uint64slice) Len() int { return len(s) } +func (s uint64slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s uint64slice) Less(i, j int) bool { return s[i] < s[j] } type labelIndexHashEntry struct { keys []string @@ -678,7 +675,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { } r.version = int(r.b.Range(4, 5)[0]) - if r.version != FormatV1 && r.version != FormatV2 { + if r.version != FormatV1 && r.version != FormatV2 && r.version != FormatV3 { return nil, errors.Errorf("unknown index file version %d", r.version) } @@ -782,14 +779,14 @@ func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]strin symbolSlice []string symbols = map[uint32]string{} ) - if version == FormatV2 { + if version == FormatV2 || version == FormatV3 { symbolSlice = make([]string, 0, cnt) } for d.Err() == nil && d.Len() > 0 && cnt > 0 { s := d.UvarintStr() - if version == FormatV2 { + if version == FormatV2 || version == FormatV3 { symbolSlice = append(symbolSlice, s) } else { symbols[nextPos] = s @@ -911,7 +908,7 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err offset := id // In version 2 series IDs are no longer exact references but series are 16-byte padded // and the ID is the multiple of 16 of the actual position. - if r.version == FormatV2 { + if r.version == FormatV2 || r.version == FormatV3 { offset = id * 16 } d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable) @@ -935,7 +932,7 @@ func (r *Reader) Postings(name, value string) (Postings, error) { if d.Err() != nil { return nil, errors.Wrap(d.Err(), "get postings entry") } - _, p, err := r.dec.Postings(d.Get()) + _, p, err := r.dec.Postings(d.Get(), r.version) if err != nil { return nil, errors.Wrap(err, "decode postings") } @@ -1059,11 +1056,18 @@ type Decoder struct { } // Postings returns a postings list for b and its number of elements. -func (dec *Decoder) Postings(b []byte) (int, Postings, error) { +func (dec *Decoder) Postings(b []byte, version int) (int, Postings, error) { d := encoding.Decbuf{B: b} n := d.Be32int() + if n == 0 { + return n, EmptyPostings(), d.Err() + } l := d.Get() - return n, newBigEndianPostings(l), d.Err() + if version == FormatV3 { + return n, newPrefixCompressedPostings(l), d.Err() + } else { + return n, newBigEndianPostings(l), d.Err() + } } // Series decodes a series entry from the given byte slice into lset and chks. diff --git a/index/postings.go b/index/postings.go index cef2d886..346b144f 100644 --- a/index/postings.go +++ b/index/postings.go @@ -21,6 +21,7 @@ import ( "strings" "sync" + "github.com/prometheus/tsdb/encoding" "github.com/prometheus/tsdb/labels" ) @@ -689,3 +690,171 @@ func (it *bigEndianPostings) Seek(x uint64) bool { func (it *bigEndianPostings) Err() error { return nil } + +type prefixCompressedPostings struct { + bs []byte + cur uint64 + inside bool + idx int // The current offset inside the bs. + footerAddr int + key uint64 + numBlock int + blockIdx int // The current block idx. + nextBlock int // The starting offset of the next block. +} + +func newPrefixCompressedPostings(bstream []byte) *prefixCompressedPostings { + return &prefixCompressedPostings{ + bs: bstream[8:], + numBlock: int(binary.BigEndian.Uint32(bstream[4:])), + footerAddr: int(binary.BigEndian.Uint32(bstream)), + } +} + +func (it *prefixCompressedPostings) At() uint64 { + return it.cur +} + +func (it *prefixCompressedPostings) Next() bool { + if it.inside { // Already entered the block. + if it.idx < it.nextBlock { + it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:])) + it.idx += 2 + return true + } + it.blockIdx += 1 // Go to the next block. + } + // Currently not entered any block. + if it.idx < it.footerAddr { + it.key = binary.BigEndian.Uint64(it.bs[it.idx:]) + it.idx += 8 + it.inside = true + it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):])) + it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:])) + it.idx += 2 + return true + } else { + return false + } +} + +func (it *prefixCompressedPostings) seekInBlock(x uint64) bool { + curVal := x & 0xffff + num := (it.nextBlock - it.idx) >> 1 + j := sort.Search(num, func(i int) bool { + return uint64(binary.BigEndian.Uint16(it.bs[it.idx+(i<<1):])) >= curVal + }) + if j == num { + // Fast-path to the next block. + // The first element in next block should be >= x. + it.idx = it.nextBlock + it.blockIdx += 1 + if it.idx < it.footerAddr { + it.key = binary.BigEndian.Uint64(it.bs[it.idx:]) + it.idx += 8 + it.inside = true + it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):])) + it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:])) + it.idx += 2 + return true + } else { + return false + } + } + it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx+(j<<1):])) + it.idx += (j + 1) << 1 + return true +} + +func (it *prefixCompressedPostings) Seek(x uint64) bool { + if it.cur >= x { + return true + } + curKey := (x >> 16) << 16 + if it.inside && it.key == curKey { + // Fast path for x in current block. + return it.seekInBlock(x) + } else { + i := sort.Search(it.numBlock-it.blockIdx, func(i int) bool { + off := int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+i)<<2):])) + k := binary.BigEndian.Uint64(it.bs[off:]) + return k >= curKey + }) + if i == it.numBlock-it.blockIdx { + return false + } + it.blockIdx += i + if i > 0 { + it.inside = false + it.idx = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx)<<2):])) + } + } + it.key = binary.BigEndian.Uint64(it.bs[it.idx:]) + it.idx += 8 + + it.inside = true + + it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):])) + return it.seekInBlock(x) +} + +func (it *prefixCompressedPostings) Err() error { + return nil +} + +// The size of values inside the block is 2 bytes. +func writePrefixCompressedPostingsBlock(e *encoding.Encbuf, vals []uint16, key uint64, c []byte) { + e.PutBE64(key) + for _, val := range vals { + binary.BigEndian.PutUint16(c[:], val) + e.PutByte(c[0]) + e.PutByte(c[1]) + } +} + +func writePrefixCompressedPostings(e *encoding.Encbuf, arr []uint64) { + if len(arr) == 0 { + return + } + key := uint64(0) + mask := uint64((1 << uint(16)) - 1) // Mask for the elements in the block. + invertedMask := ^mask + var ( + curKey uint64 + curVal uint64 + idx int // Index of current element in arr. + startingOffs []uint32 // The starting offsets of each block. + vals []uint16 // The converted values in the current block. + startOff = len(e.Get()) + c = make([]byte, 2) + ) + e.PutBE32(0) // Footer starting offset. + e.PutBE32(0) // Number of blocks. + for idx < len(arr) { + curKey = arr[idx] & invertedMask // Key of block. + curVal = arr[idx] & mask // Value inside block. + if curKey != key { + // Move to next block. + if idx != 0 { + startingOffs = append(startingOffs, uint32(len(e.B))) + writePrefixCompressedPostingsBlock(e, vals, key, c) + vals = vals[:0] + } + key = curKey + } + vals = append(vals, uint16(curVal)) + idx += 1 + } + startingOffs = append(startingOffs, uint32(len(e.B))) + writePrefixCompressedPostingsBlock(e, vals, key, c) + + // Store the ending offset can save the check of whether to read the next block address from the footer or + // just len(it.bs) each time before entering the next block. + startingOffs = append(startingOffs, uint32(len(e.B))) + + binary.BigEndian.PutUint32(e.B[startOff:], uint32(len(e.B)-8-startOff)) // Put footer starting offset. + binary.BigEndian.PutUint32(e.B[startOff+4:], uint32(len(startingOffs)-1)) // Put number of blocks. + for _, off := range startingOffs { + e.PutBE32(off - 8 - uint32(startOff)) + } +} diff --git a/index/postings_test.go b/index/postings_test.go index 1eed1dbf..e5a1dadb 100644 --- a/index/postings_test.go +++ b/index/postings_test.go @@ -20,6 +20,7 @@ import ( "sort" "testing" + "github.com/prometheus/tsdb/encoding" "github.com/prometheus/tsdb/testutil" ) @@ -718,6 +719,240 @@ func TestBigEndian(t *testing.T) { }) } +func TestPrefixCompressedPostings(t *testing.T) { + num := 1000 + // mock a list as postings + ls := make([]uint64, num) + ls[0] = 2 + for i := 1; i < num; i++ { + ls[i] = ls[i-1] + uint64(rand.Int31n(25)) + 2 + } + + buf := encoding.Encbuf{} + writePrefixCompressedPostings(&buf, ls) + + t.Run("Iteration", func(t *testing.T) { + rbp := newPrefixCompressedPostings(buf.Get()) + for i := 0; i < num; i++ { + testutil.Assert(t, rbp.Next() == true, "") + if uint64(ls[i]) != rbp.At() { + t.Log("ls[i] =", ls[i], "rbp.At() =", rbp.At(), " i =", i) + } + testutil.Equals(t, uint64(ls[i]), rbp.At()) + } + + testutil.Assert(t, rbp.Next() == false, "") + testutil.Assert(t, rbp.Err() == nil, "") + }) + + t.Run("Seek", func(t *testing.T) { + table := []struct { + seek uint64 + val uint64 + found bool + }{ + { + ls[0] - 1, ls[0], true, + }, + { + ls[4], ls[4], true, + }, + { + ls[500] - 1, ls[500], true, + }, + { + ls[600] + 1, ls[601], true, + }, + { + ls[600] + 1, ls[601], true, + }, + { + ls[600] + 1, ls[601], true, + }, + { + ls[0], ls[601], true, + }, + { + ls[600], ls[601], true, + }, + { + ls[999], ls[999], true, + }, + { + ls[999] + 10, ls[999], false, + }, + } + + rbp := newPrefixCompressedPostings(buf.Get()) + + for _, v := range table { + testutil.Equals(t, v.found, rbp.Seek(uint64(v.seek))) + testutil.Equals(t, uint64(v.val), rbp.At()) + testutil.Assert(t, rbp.Err() == nil, "") + } + }) +} + +func BenchmarkPostings(b *testing.B) { + num := 100000 + // mock a list as postings + ls := make([]uint32, num) + ls[0] = 2 + for i := 1; i < num; i++ { + ls[i] = ls[i-1] + uint32(rand.Int31n(25)) + 2 + } + + // bigEndianPostings. + bufBE := make([]byte, num*4) + for i := 0; i < num; i++ { + b := bufBE[i*4 : i*4+4] + binary.BigEndian.PutUint32(b, ls[i]) + } + + // prefixCompressedPostings. + bufPCP := encoding.Encbuf{} + temp := make([]uint64, 0, len(ls)) + for _, x := range ls { + temp = append(temp, uint64(x)) + } + writePrefixCompressedPostings(&bufPCP, temp) + + table := []struct { + seek uint32 + val uint32 + found bool + }{ + { + ls[0] - 1, ls[0], true, + }, + { + ls[1000], ls[1000], true, + }, + { + ls[1001], ls[1001], true, + }, + { + ls[2000] + 1, ls[2001], true, + }, + { + ls[3000], ls[3000], true, + }, + { + ls[3001], ls[3001], true, + }, + { + ls[4000] + 1, ls[4001], true, + }, + { + ls[5000], ls[5000], true, + }, + { + ls[5001], ls[5001], true, + }, + { + ls[6000] + 1, ls[6001], true, + }, + { + ls[10000], ls[10000], true, + }, + { + ls[10001], ls[10001], true, + }, + { + ls[20000] + 1, ls[20001], true, + }, + { + ls[30000], ls[30000], true, + }, + { + ls[30001], ls[30001], true, + }, + { + ls[40000] + 1, ls[40001], true, + }, + { + ls[50000], ls[50000], true, + }, + { + ls[50001], ls[50001], true, + }, + { + ls[60000] + 1, ls[60001], true, + }, + { + ls[70000], ls[70000], true, + }, + { + ls[70001], ls[70001], true, + }, + { + ls[80000] + 1, ls[80001], true, + }, + { + ls[99999], ls[99999], true, + }, + { + ls[99999] + 10, ls[99999], false, + }, + } + + b.Run("bigEndianIteration", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + bep := newBigEndianPostings(bufBE) + + for i := 0; i < num; i++ { + testutil.Assert(bench, bep.Next() == true, "") + testutil.Equals(bench, uint64(ls[i]), bep.At()) + } + testutil.Assert(bench, bep.Next() == false, "") + testutil.Assert(bench, bep.Err() == nil, "") + } + }) + b.Run("prefixCompressedPostingsIteration", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + rbm := newPrefixCompressedPostings(bufPCP.Get()) + + for i := 0; i < num; i++ { + testutil.Assert(bench, rbm.Next() == true, "") + testutil.Equals(bench, uint64(ls[i]), rbm.At()) + } + testutil.Assert(bench, rbm.Next() == false, "") + testutil.Assert(bench, rbm.Err() == nil, "") + } + }) + + b.Run("bigEndianSeek", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + bep := newBigEndianPostings(bufBE) + + for _, v := range table { + testutil.Equals(bench, v.found, bep.Seek(uint64(v.seek))) + testutil.Equals(bench, uint64(v.val), bep.At()) + testutil.Assert(bench, bep.Err() == nil, "") + } + } + }) + b.Run("prefixCompressedPostingsSeek", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + rbm := newPrefixCompressedPostings(bufPCP.Get()) + + for _, v := range table { + testutil.Equals(bench, v.found, rbm.Seek(uint64(v.seek))) + testutil.Equals(bench, uint64(v.val), rbm.At()) + testutil.Assert(bench, rbm.Err() == nil, "") + } + } + }) +} + func TestIntersectWithMerge(t *testing.T) { // One of the reproducible cases for: // https://github.com/prometheus/prometheus/issues/2616