diff --git a/checkpoint.go b/checkpoint.go index d8dee28a..3c6b3947 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -135,7 +135,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) if err := os.MkdirAll(cpdirtmp, 0777); err != nil { return nil, errors.Wrap(err, "create checkpoint dir") } - cp, err := wal.New(nil, nil, cpdirtmp) + cp, err := wal.New(nil, nil, cpdirtmp, w.Compress()) if err != nil { return nil, errors.Wrap(err, "open checkpoint") } diff --git a/checkpoint_test.go b/checkpoint_test.go index fe8ee4e9..3bd5474b 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -99,7 +99,7 @@ func TestCheckpoint(t *testing.T) { testutil.Ok(t, seg.Close()) // Manually create checkpoint for 99 and earlier. - w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099")) + w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"), false) testutil.Ok(t, err) // Add some data we expect to be around later. @@ -111,7 +111,7 @@ func TestCheckpoint(t *testing.T) { testutil.Ok(t, w.Close()) // Start a WAL and write records to it as usual. - w, err = wal.NewSize(nil, nil, dir, 64*1024) + w, err = wal.NewSize(nil, nil, dir, 64*1024, false) testutil.Ok(t, err) var last int64 @@ -197,7 +197,7 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) { defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := wal.NewSize(nil, nil, dir, 64*1024) + w, err := wal.NewSize(nil, nil, dir, 64*1024, false) testutil.Ok(t, err) testutil.Ok(t, w.Log([]byte{99})) w.Close() diff --git a/db.go b/db.go index 52b21c2f..91452878 100644 --- a/db.go +++ b/db.go @@ -80,6 +80,9 @@ type Options struct { // Overlapping blocks are allowed if AllowOverlappingBlocks is true. // This in-turn enables vertical compaction and vertical query merge. AllowOverlappingBlocks bool + + // WALCompression will turn on Snappy compression for records on the WAL. + WALCompression bool } // Appender allows appending a batch of data. It must be completed with a @@ -300,7 +303,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if opts.WALSegmentSize > 0 { segmentSize = opts.WALSegmentSize } - wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize) + wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression) if err != nil { return nil, err } diff --git a/db_test.go b/db_test.go index 6014a1d0..20dcf630 100644 --- a/db_test.go +++ b/db_test.go @@ -1404,7 +1404,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { }() testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) - w, err := wal.New(nil, nil, path.Join(dir, "wal")) + w, err := wal.New(nil, nil, path.Join(dir, "wal"), false) testutil.Ok(t, err) var enc RecordEncoder @@ -1454,7 +1454,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { createBlock(t, dir, genSeries(1, 1, 1000, 6000)) testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) - w, err := wal.New(nil, nil, path.Join(dir, "wal")) + w, err := wal.New(nil, nil, path.Join(dir, "wal"), false) testutil.Ok(t, err) var enc RecordEncoder diff --git a/go.mod b/go.mod index 02f3cf9e..74977b25 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/go-stack/stack v1.8.0 // indirect github.com/gogo/protobuf v1.1.1 // indirect github.com/golang/protobuf v1.2.0 // indirect + github.com/golang/snappy v0.0.1 github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/oklog/ulid v1.3.1 diff --git a/go.sum b/go.sum index 266fbe96..81e88150 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,8 @@ github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= diff --git a/head_test.go b/head_test.go index 79092dd5..c0c86363 100644 --- a/head_test.go +++ b/head_test.go @@ -124,7 +124,7 @@ func TestHead_ReadWAL(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := wal.New(nil, nil, dir) + w, err := wal.New(nil, nil, dir, false) testutil.Ok(t, err) defer w.Close() populateTestWAL(t, w, entries) @@ -290,7 +290,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := wal.New(nil, nil, dir) + w, err := wal.New(nil, nil, dir, false) testutil.Ok(t, err) defer w.Close() populateTestWAL(t, w, entries) @@ -348,7 +348,7 @@ Outer: testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := wal.New(nil, nil, path.Join(dir, "wal")) + w, err := wal.New(nil, nil, path.Join(dir, "wal"), false) testutil.Ok(t, err) defer w.Close() @@ -370,7 +370,7 @@ Outer: } // Compare the samples for both heads - before and after the reload. - reloadedW, err := wal.New(nil, nil, w.Dir()) // Use a new wal to ensure deleted samples are gone even after a reload. + reloadedW, err := wal.New(nil, nil, w.Dir(), false) // Use a new wal to ensure deleted samples are gone even after a reload. testutil.Ok(t, err) defer reloadedW.Close() reloadedHead, err := NewHead(nil, nil, reloadedW, 1000) @@ -511,7 +511,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - wlog, err := wal.NewSize(nil, nil, dir, 32768) + wlog, err := wal.NewSize(nil, nil, dir, 32768, false) testutil.Ok(t, err) // Enough samples to cause a checkpoint. @@ -977,7 +977,7 @@ func TestHead_LogRollback(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := wal.New(nil, nil, dir) + w, err := wal.New(nil, nil, dir, false) testutil.Ok(t, err) defer w.Close() h, err := NewHead(nil, nil, w, 1000) @@ -1046,7 +1046,7 @@ func TestWalRepair(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := wal.New(nil, nil, dir) + w, err := wal.New(nil, nil, dir, false) testutil.Ok(t, err) defer w.Close() diff --git a/wal.go b/wal.go index 86b3bf79..41835232 100644 --- a/wal.go +++ b/wal.go @@ -1245,7 +1245,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { if err := os.RemoveAll(tmpdir); err != nil { return errors.Wrap(err, "cleanup replacement dir") } - repl, err := wal.New(logger, nil, tmpdir) + repl, err := wal.New(logger, nil, tmpdir, false) if err != nil { return errors.Wrap(err, "open new WAL") } diff --git a/wal/live_reader.go b/wal/live_reader.go index 8394bfd0..e91a20d6 100644 --- a/wal/live_reader.go +++ b/wal/live_reader.go @@ -22,6 +22,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/golang/snappy" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -166,10 +167,11 @@ func (r *LiveReader) buildRecord() (bool, error) { return false, nil } - rt := recType(r.hdr[0]) + rt := recTypeFromHeader(r.hdr[0]) if rt == recFirst || rt == recFull { r.rec = r.rec[:0] } + compressed := r.hdr[0]&8 == 8 r.rec = append(r.rec, temp...) if err := validateRecord(rt, r.index); err != nil { @@ -178,6 +180,9 @@ func (r *LiveReader) buildRecord() (bool, error) { } if rt == recLast || rt == recFull { r.index = 0 + if compressed && len(r.rec) > 0 { + r.rec, _ = snappy.Decode(nil, r.rec) + } return true, nil } // Only increment i for non-zero records since we use it diff --git a/wal/reader.go b/wal/reader.go index 297463b0..efb0343f 100644 --- a/wal/reader.go +++ b/wal/reader.go @@ -19,6 +19,7 @@ import ( "hash/crc32" "io" + "github.com/golang/snappy" "github.com/pkg/errors" ) @@ -45,7 +46,7 @@ func (r *Reader) Next() bool { // The last WAL segment record shouldn't be torn(should be full or last). // The last record would be torn after a crash just before // the last record part could be persisted to disk. - if recType(r.curRecTyp) == recFirst || recType(r.curRecTyp) == recMiddle { + if r.curRecTyp == recFirst || r.curRecTyp == recMiddle { r.err = errors.New("last record is torn") } return false @@ -68,7 +69,8 @@ func (r *Reader) next() (err error) { return errors.Wrap(err, "read first header byte") } r.total++ - r.curRecTyp = recType(hdr[0]) + r.curRecTyp = recTypeFromHeader(hdr[0]) + compressed := hdr[0]&8 == 8 // Gobble up zero bytes. if r.curRecTyp == recPageTerm { @@ -129,6 +131,9 @@ func (r *Reader) next() (err error) { return err } if r.curRecTyp == recLast || r.curRecTyp == recFull { + if compressed && len(r.rec) > 0 { + r.rec, _ = snappy.Decode(nil, r.rec) + } return nil } diff --git a/wal/reader_test.go b/wal/reader_test.go index 1178aa5e..2f2b9f80 100644 --- a/wal/reader_test.go +++ b/wal/reader_test.go @@ -310,36 +310,38 @@ func allSegments(dir string) (io.ReadCloser, error) { func TestReaderFuzz(t *testing.T) { for name, fn := range readerConstructors { - t.Run(name, func(t *testing.T) { - dir, err := ioutil.TempDir("", "wal_fuzz_live") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() - - w, err := NewSize(nil, nil, dir, 128*pageSize) - testutil.Ok(t, err) - - // Buffering required as we're not reading concurrently. - input := make(chan []byte, fuzzLen) - err = generateRandomEntries(w, input) - testutil.Ok(t, err) - close(input) - - err = w.Close() - testutil.Ok(t, err) - - sr, err := allSegments(w.Dir()) - testutil.Ok(t, err) - defer sr.Close() - - reader := fn(sr) - for expected := range input { - testutil.Assert(t, reader.Next(), "expected record: %v", reader.Err()) - testutil.Equals(t, expected, reader.Record(), "read wrong record") - } - testutil.Assert(t, !reader.Next(), "unexpected record") - }) + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("%s,compress=%t", name, compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "wal_fuzz_live") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + + w, err := NewSize(nil, nil, dir, 128*pageSize, compress) + testutil.Ok(t, err) + + // Buffering required as we're not reading concurrently. + input := make(chan []byte, fuzzLen) + err = generateRandomEntries(w, input) + testutil.Ok(t, err) + close(input) + + err = w.Close() + testutil.Ok(t, err) + + sr, err := allSegments(w.Dir()) + testutil.Ok(t, err) + defer sr.Close() + + reader := fn(sr) + for expected := range input { + testutil.Assert(t, reader.Next(), "expected record: %v", reader.Err()) + testutil.Equals(t, expected, reader.Record(), "read wrong record") + } + testutil.Assert(t, !reader.Next(), "unexpected record") + }) + } } } @@ -351,7 +353,7 @@ func TestReaderFuzz_Live(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := NewSize(nil, nil, dir, 128*pageSize) + w, err := NewSize(nil, nil, dir, 128*pageSize, false) testutil.Ok(t, err) defer w.Close() @@ -434,7 +436,7 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := NewSize(nil, nil, dir, pageSize) + w, err := NewSize(nil, nil, dir, pageSize, false) testutil.Ok(t, err) rec := make([]byte, pageSize-recordHeaderSize) @@ -478,7 +480,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := NewSize(nil, nil, dir, pageSize*2) + w, err := NewSize(nil, nil, dir, pageSize*2, false) testutil.Ok(t, err) rec := make([]byte, pageSize-recordHeaderSize) @@ -525,7 +527,7 @@ func TestReaderData(t *testing.T) { for name, fn := range readerConstructors { t.Run(name, func(t *testing.T) { - w, err := New(nil, nil, dir) + w, err := New(nil, nil, dir, false) testutil.Ok(t, err) sr, err := allSegments(dir) diff --git a/wal/wal.go b/wal/wal.go index 46504f0d..bc3e19fd 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -29,6 +29,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/golang/snappy" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/fileutil" @@ -165,6 +166,8 @@ type WAL struct { stopc chan chan struct{} actorc chan func() closed bool // To allow calling Close() more than once without blocking. + compress bool + snappyBuf []byte fsyncDuration prometheus.Summary pageFlushes prometheus.Counter @@ -175,13 +178,13 @@ type WAL struct { } // New returns a new WAL over the given directory. -func New(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) { - return NewSize(logger, reg, dir, DefaultSegmentSize) +func New(logger log.Logger, reg prometheus.Registerer, dir string, compress bool) (*WAL, error) { + return NewSize(logger, reg, dir, DefaultSegmentSize, compress) } // NewSize returns a new WAL over the given directory. // New segments are created with the specified size. -func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int) (*WAL, error) { +func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress bool) (*WAL, error) { if segmentSize%pageSize != 0 { return nil, errors.New("invalid segment size") } @@ -198,6 +201,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi page: &page{}, actorc: make(chan func(), 100), stopc: make(chan chan struct{}), + compress: compress, } w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Name: "prometheus_tsdb_wal_fsync_duration_seconds", @@ -256,6 +260,11 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi return w, nil } +// Compress returns if compression is enabled on this WAL. +func (w *WAL) Compress() bool { + return w.compress +} + // Dir returns the directory of the WAL. func (w *WAL) Dir() string { return w.dir @@ -465,6 +474,10 @@ const ( recLast recType = 4 // Final fragment of a record. ) +func recTypeFromHeader(header byte) recType { + return recType(header & 7) +} + func (t recType) String() string { switch t { case recPageTerm: @@ -525,6 +538,17 @@ func (w *WAL) log(rec []byte, final bool) error { } } + compressed := false + if w.compress && len(rec) > 0 { + // Allow Snappy to use the full capacity of the buffer. + w.snappyBuf = w.snappyBuf[:cap(w.snappyBuf)] + w.snappyBuf = snappy.Encode(w.snappyBuf, rec) + if len(w.snappyBuf) < len(rec) { + rec = w.snappyBuf + compressed = true + } + } + // Populate as many pages as necessary to fit the record. // Be careful to always do one pass to ensure we write zero-length records. for i := 0; i == 0 || len(rec) > 0; i++ { @@ -548,6 +572,9 @@ func (w *WAL) log(rec []byte, final bool) error { default: typ = recMiddle } + if compressed { + typ |= 8 + } buf[0] = byte(typ) crc := crc32.Checksum(part, castagnoliTable) diff --git a/wal/wal_test.go b/wal/wal_test.go index 5363ebf4..9908892b 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -118,7 +118,7 @@ func TestWAL_Repair(t *testing.T) { // then corrupt a given record in a given segment. // As a result we want a repaired WAL with given intact records. segSize := 3 * pageSize - w, err := NewSize(nil, nil, dir, segSize) + w, err := NewSize(nil, nil, dir, segSize, false) testutil.Ok(t, err) var records [][]byte @@ -139,7 +139,7 @@ func TestWAL_Repair(t *testing.T) { testutil.Ok(t, f.Close()) - w, err = NewSize(nil, nil, dir, segSize) + w, err = NewSize(nil, nil, dir, segSize, false) testutil.Ok(t, err) defer w.Close() @@ -199,7 +199,7 @@ func TestCorruptAndCarryOn(t *testing.T) { // Produce a WAL with a two segments of 3 pages with 3 records each, // so when we truncate the file we're guaranteed to split a record. { - w, err := NewSize(logger, nil, dir, segmentSize) + w, err := NewSize(logger, nil, dir, segmentSize, false) testutil.Ok(t, err) for i := 0; i < 18; i++ { @@ -270,7 +270,7 @@ func TestCorruptAndCarryOn(t *testing.T) { err = sr.Close() testutil.Ok(t, err) - w, err := NewSize(logger, nil, dir, segmentSize) + w, err := NewSize(logger, nil, dir, segmentSize, false) testutil.Ok(t, err) err = w.Repair(corruptionErr) @@ -313,7 +313,7 @@ func TestClose(t *testing.T) { defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := NewSize(nil, nil, dir, pageSize) + w, err := NewSize(nil, nil, dir, pageSize, false) testutil.Ok(t, err) testutil.Ok(t, w.Close()) testutil.NotOk(t, w.Close()) @@ -330,7 +330,7 @@ func TestSegmentMetric(t *testing.T) { defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := NewSize(nil, nil, dir, segmentSize) + w, err := NewSize(nil, nil, dir, segmentSize, false) testutil.Ok(t, err) initialSegment := client_testutil.ToFloat64(w.currentSegment) @@ -349,55 +349,63 @@ func TestSegmentMetric(t *testing.T) { } func BenchmarkWAL_LogBatched(b *testing.B) { - dir, err := ioutil.TempDir("", "bench_logbatch") - testutil.Ok(b, err) - defer func() { - testutil.Ok(b, os.RemoveAll(dir)) - }() + for _, compress := range []bool{true, false} { + b.Run(fmt.Sprintf("compress=%t", compress), func(b *testing.B) { + dir, err := ioutil.TempDir("", "bench_logbatch") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(dir)) + }() - w, err := New(nil, nil, "testdir") - testutil.Ok(b, err) - defer w.Close() + w, err := New(nil, nil, "testdir", compress) + testutil.Ok(b, err) + defer w.Close() - var buf [2048]byte - var recs [][]byte - b.SetBytes(2048) + var buf [2048]byte + var recs [][]byte + b.SetBytes(2048) - for i := 0; i < b.N; i++ { - recs = append(recs, buf[:]) - if len(recs) < 1000 { - continue - } - err := w.Log(recs...) - testutil.Ok(b, err) - recs = recs[:0] + for i := 0; i < b.N; i++ { + recs = append(recs, buf[:]) + if len(recs) < 1000 { + continue + } + err := w.Log(recs...) + testutil.Ok(b, err) + recs = recs[:0] + } + // Stop timer to not count fsync time on close. + // If it's counted batched vs. single benchmarks are very similar but + // do not show burst throughput well. + b.StopTimer() + }) } - // Stop timer to not count fsync time on close. - // If it's counted batched vs. single benchmarks are very similar but - // do not show burst throughput well. - b.StopTimer() } func BenchmarkWAL_Log(b *testing.B) { - dir, err := ioutil.TempDir("", "bench_logsingle") - testutil.Ok(b, err) - defer func() { - testutil.Ok(b, os.RemoveAll(dir)) - }() + for _, compress := range []bool{true, false} { + b.Run(fmt.Sprintf("compress=%t", compress), func(b *testing.B) { + dir, err := ioutil.TempDir("", "bench_logsingle") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(dir)) + }() - w, err := New(nil, nil, "testdir") - testutil.Ok(b, err) - defer w.Close() + w, err := New(nil, nil, "testdir", compress) + testutil.Ok(b, err) + defer w.Close() - var buf [2048]byte - b.SetBytes(2048) + var buf [2048]byte + b.SetBytes(2048) - for i := 0; i < b.N; i++ { - err := w.Log(buf[:]) - testutil.Ok(b, err) + for i := 0; i < b.N; i++ { + err := w.Log(buf[:]) + testutil.Ok(b, err) + } + // Stop timer to not count fsync time on close. + // If it's counted batched vs. single benchmarks are very similar but + // do not show burst throughput well. + b.StopTimer() + }) } - // Stop timer to not count fsync time on close. - // If it's counted batched vs. single benchmarks are very similar but - // do not show burst throughput well. - b.StopTimer() } diff --git a/wal_test.go b/wal_test.go index 7f07a63b..0fed5b41 100644 --- a/wal_test.go +++ b/wal_test.go @@ -459,7 +459,7 @@ func TestMigrateWAL_Empty(t *testing.T) { wdir := path.Join(dir, "wal") // Initialize empty WAL. - w, err := wal.New(nil, nil, wdir) + w, err := wal.New(nil, nil, wdir, false) testutil.Ok(t, err) testutil.Ok(t, w.Close()) @@ -506,7 +506,7 @@ func TestMigrateWAL_Fuzz(t *testing.T) { // Perform migration. testutil.Ok(t, MigrateWAL(nil, wdir)) - w, err := wal.New(nil, nil, wdir) + w, err := wal.New(nil, nil, wdir, false) testutil.Ok(t, err) // We can properly write some new data after migration.