Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Optionally compress the WAL using Snappy
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Marchbanks <[email protected]>
  • Loading branch information
csmarchbanks committed May 18, 2019
1 parent d48a5e2 commit 238f946
Show file tree
Hide file tree
Showing 14 changed files with 156 additions and 103 deletions.
2 changes: 1 addition & 1 deletion checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
return nil, errors.Wrap(err, "create checkpoint dir")
}
cp, err := wal.New(nil, nil, cpdirtmp)
cp, err := wal.New(nil, nil, cpdirtmp, w.Compress())
if err != nil {
return nil, errors.Wrap(err, "open checkpoint")
}
Expand Down
6 changes: 3 additions & 3 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestCheckpoint(t *testing.T) {
testutil.Ok(t, seg.Close())

// Manually create checkpoint for 99 and earlier.
w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"))
w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"), false)
testutil.Ok(t, err)

// Add some data we expect to be around later.
Expand All @@ -111,7 +111,7 @@ func TestCheckpoint(t *testing.T) {
testutil.Ok(t, w.Close())

// Start a WAL and write records to it as usual.
w, err = wal.NewSize(nil, nil, dir, 64*1024)
w, err = wal.NewSize(nil, nil, dir, 64*1024, false)
testutil.Ok(t, err)

var last int64
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.NewSize(nil, nil, dir, 64*1024)
w, err := wal.NewSize(nil, nil, dir, 64*1024, false)
testutil.Ok(t, err)
testutil.Ok(t, w.Log([]byte{99}))
w.Close()
Expand Down
5 changes: 4 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ type Options struct {
// Overlapping blocks are allowed if AllowOverlappingBlocks is true.
// This in-turn enables vertical compaction and vertical query merge.
AllowOverlappingBlocks bool

// WALCompression will turn on Snappy compression for records on the WAL.
WALCompression bool
}

// Appender allows appending a batch of data. It must be completed with a
Expand Down Expand Up @@ -300,7 +303,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if opts.WALSegmentSize > 0 {
segmentSize = opts.WALSegmentSize
}
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize)
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,7 +1404,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
}()

testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
testutil.Ok(t, err)

var enc RecordEncoder
Expand Down Expand Up @@ -1454,7 +1454,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
createBlock(t, dir, genSeries(1, 1, 1000, 6000))

testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
testutil.Ok(t, err)

var enc RecordEncoder
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/go-stack/stack v1.8.0 // indirect
github.com/gogo/protobuf v1.1.1 // indirect
github.com/golang/protobuf v1.2.0 // indirect
github.com/golang/snappy v0.0.1
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/oklog/ulid v1.3.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
Expand Down
14 changes: 7 additions & 7 deletions head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestHead_ReadWAL(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir))
}()

w, err := wal.New(nil, nil, dir)
w, err := wal.New(nil, nil, dir, false)
testutil.Ok(t, err)
defer w.Close()
populateTestWAL(t, w, entries)
Expand Down Expand Up @@ -290,7 +290,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir))
}()

w, err := wal.New(nil, nil, dir)
w, err := wal.New(nil, nil, dir, false)
testutil.Ok(t, err)
defer w.Close()
populateTestWAL(t, w, entries)
Expand Down Expand Up @@ -348,7 +348,7 @@ Outer:
testutil.Ok(t, os.RemoveAll(dir))
}()

w, err := wal.New(nil, nil, path.Join(dir, "wal"))
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
testutil.Ok(t, err)
defer w.Close()

Expand All @@ -370,7 +370,7 @@ Outer:
}

// Compare the samples for both heads - before and after the reload.
reloadedW, err := wal.New(nil, nil, w.Dir()) // Use a new wal to ensure deleted samples are gone even after a reload.
reloadedW, err := wal.New(nil, nil, w.Dir(), false) // Use a new wal to ensure deleted samples are gone even after a reload.
testutil.Ok(t, err)
defer reloadedW.Close()
reloadedHead, err := NewHead(nil, nil, reloadedW, 1000)
Expand Down Expand Up @@ -511,7 +511,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
wlog, err := wal.NewSize(nil, nil, dir, 32768)
wlog, err := wal.NewSize(nil, nil, dir, 32768, false)
testutil.Ok(t, err)

// Enough samples to cause a checkpoint.
Expand Down Expand Up @@ -977,7 +977,7 @@ func TestHead_LogRollback(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir))
}()

w, err := wal.New(nil, nil, dir)
w, err := wal.New(nil, nil, dir, false)
testutil.Ok(t, err)
defer w.Close()
h, err := NewHead(nil, nil, w, 1000)
Expand Down Expand Up @@ -1046,7 +1046,7 @@ func TestWalRepair(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir))
}()

w, err := wal.New(nil, nil, dir)
w, err := wal.New(nil, nil, dir, false)
testutil.Ok(t, err)
defer w.Close()

Expand Down
2 changes: 1 addition & 1 deletion wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,7 +1245,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) {
if err := os.RemoveAll(tmpdir); err != nil {
return errors.Wrap(err, "cleanup replacement dir")
}
repl, err := wal.New(logger, nil, tmpdir)
repl, err := wal.New(logger, nil, tmpdir, false)
if err != nil {
return errors.Wrap(err, "open new WAL")
}
Expand Down
7 changes: 6 additions & 1 deletion wal/live_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/golang/snappy"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -166,10 +167,11 @@ func (r *LiveReader) buildRecord() (bool, error) {
return false, nil
}

rt := recType(r.hdr[0])
rt := recTypeFromHeader(r.hdr[0])
if rt == recFirst || rt == recFull {
r.rec = r.rec[:0]
}
compressed := r.hdr[0]&8 == 8
r.rec = append(r.rec, temp...)

if err := validateRecord(rt, r.index); err != nil {
Expand All @@ -178,6 +180,9 @@ func (r *LiveReader) buildRecord() (bool, error) {
}
if rt == recLast || rt == recFull {
r.index = 0
if compressed && len(r.rec) > 0 {
r.rec, _ = snappy.Decode(nil, r.rec)
}
return true, nil
}
// Only increment i for non-zero records since we use it
Expand Down
9 changes: 7 additions & 2 deletions wal/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"hash/crc32"
"io"

"github.com/golang/snappy"
"github.com/pkg/errors"
)

Expand All @@ -45,7 +46,7 @@ func (r *Reader) Next() bool {
// The last WAL segment record shouldn't be torn(should be full or last).
// The last record would be torn after a crash just before
// the last record part could be persisted to disk.
if recType(r.curRecTyp) == recFirst || recType(r.curRecTyp) == recMiddle {
if r.curRecTyp == recFirst || r.curRecTyp == recMiddle {
r.err = errors.New("last record is torn")
}
return false
Expand All @@ -68,7 +69,8 @@ func (r *Reader) next() (err error) {
return errors.Wrap(err, "read first header byte")
}
r.total++
r.curRecTyp = recType(hdr[0])
r.curRecTyp = recTypeFromHeader(hdr[0])
compressed := hdr[0]&8 == 8

// Gobble up zero bytes.
if r.curRecTyp == recPageTerm {
Expand Down Expand Up @@ -129,6 +131,9 @@ func (r *Reader) next() (err error) {
return err
}
if r.curRecTyp == recLast || r.curRecTyp == recFull {
if compressed && len(r.rec) > 0 {
r.rec, _ = snappy.Decode(nil, r.rec)
}
return nil
}

Expand Down
70 changes: 36 additions & 34 deletions wal/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,36 +310,38 @@ func allSegments(dir string) (io.ReadCloser, error) {

func TestReaderFuzz(t *testing.T) {
for name, fn := range readerConstructors {
t.Run(name, func(t *testing.T) {
dir, err := ioutil.TempDir("", "wal_fuzz_live")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()

w, err := NewSize(nil, nil, dir, 128*pageSize)
testutil.Ok(t, err)

// Buffering required as we're not reading concurrently.
input := make(chan []byte, fuzzLen)
err = generateRandomEntries(w, input)
testutil.Ok(t, err)
close(input)

err = w.Close()
testutil.Ok(t, err)

sr, err := allSegments(w.Dir())
testutil.Ok(t, err)
defer sr.Close()

reader := fn(sr)
for expected := range input {
testutil.Assert(t, reader.Next(), "expected record: %v", reader.Err())
testutil.Equals(t, expected, reader.Record(), "read wrong record")
}
testutil.Assert(t, !reader.Next(), "unexpected record")
})
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("%s,compress=%t", name, compress), func(t *testing.T) {
dir, err := ioutil.TempDir("", "wal_fuzz_live")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()

w, err := NewSize(nil, nil, dir, 128*pageSize, compress)
testutil.Ok(t, err)

// Buffering required as we're not reading concurrently.
input := make(chan []byte, fuzzLen)
err = generateRandomEntries(w, input)
testutil.Ok(t, err)
close(input)

err = w.Close()
testutil.Ok(t, err)

sr, err := allSegments(w.Dir())
testutil.Ok(t, err)
defer sr.Close()

reader := fn(sr)
for expected := range input {
testutil.Assert(t, reader.Next(), "expected record: %v", reader.Err())
testutil.Equals(t, expected, reader.Record(), "read wrong record")
}
testutil.Assert(t, !reader.Next(), "unexpected record")
})
}
}
}

Expand All @@ -351,7 +353,7 @@ func TestReaderFuzz_Live(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir))
}()

w, err := NewSize(nil, nil, dir, 128*pageSize)
w, err := NewSize(nil, nil, dir, 128*pageSize, false)
testutil.Ok(t, err)
defer w.Close()

Expand Down Expand Up @@ -434,7 +436,7 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir))
}()

w, err := NewSize(nil, nil, dir, pageSize)
w, err := NewSize(nil, nil, dir, pageSize, false)
testutil.Ok(t, err)

rec := make([]byte, pageSize-recordHeaderSize)
Expand Down Expand Up @@ -478,7 +480,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir))
}()

w, err := NewSize(nil, nil, dir, pageSize*2)
w, err := NewSize(nil, nil, dir, pageSize*2, false)
testutil.Ok(t, err)

rec := make([]byte, pageSize-recordHeaderSize)
Expand Down Expand Up @@ -525,7 +527,7 @@ func TestReaderData(t *testing.T) {

for name, fn := range readerConstructors {
t.Run(name, func(t *testing.T) {
w, err := New(nil, nil, dir)
w, err := New(nil, nil, dir, false)
testutil.Ok(t, err)

sr, err := allSegments(dir)
Expand Down
Loading

0 comments on commit 238f946

Please sign in to comment.