diff --git a/wal.go b/wal.go index ab5eac8..5d70072 100644 --- a/wal.go +++ b/wal.go @@ -79,17 +79,20 @@ type Options struct { // Perms represents the datafiles modes and permission bits DirPerms os.FileMode FilePerms os.FileMode + // RecoverCorruptedTail will attempt to recover a corrupted tail in the last segment automatically. + RecoverCorruptedTail bool } // DefaultOptions for Open(). var DefaultOptions = &Options{ - NoSync: false, // Fsync after every write - SegmentSize: 20971520, // 20 MB log segment files. - LogFormat: Binary, // Binary format is small and fast. - SegmentCacheSize: 2, // Number of cached in-memory segments - NoCopy: false, // Make a new copy of data for every Read call. - DirPerms: 0750, // Permissions for the created directories - FilePerms: 0640, // Permissions for the created data files + NoSync: false, // Fsync after every write + SegmentSize: 20971520, // 20 MB log segment files. + LogFormat: Binary, // Binary format is small and fast. + SegmentCacheSize: 2, // Number of cached in-memory segments + NoCopy: false, // Make a new copy of data for every Read call. + DirPerms: 0750, // Permissions for the created directories + FilePerms: 0640, // Permissions for the created data files + RecoverCorruptedTail: false, // Don't recover corrupted tail. } // Log represents a write ahead log @@ -262,15 +265,15 @@ func (l *Log) load() error { l.firstIndex = l.segments[0].index // Open the last segment for appending lseg := l.segments[len(l.segments)-1] - l.sfile, err = os.OpenFile(lseg.path, os.O_WRONLY, l.opts.FilePerms) - if err != nil { + // Load the last segment entries + if err := l.loadSegmentEntries(lseg, l.opts.RecoverCorruptedTail); err != nil { return err } - if _, err := l.sfile.Seek(0, 2); err != nil { + l.sfile, err = os.OpenFile(lseg.path, os.O_WRONLY, l.opts.FilePerms) + if err != nil { return err } - // Load the last segment entries - if err := l.loadSegmentEntries(lseg); err != nil { + if _, err := l.sfile.Seek(int64(len(lseg.ebuf)), 0); err != nil { return err } l.lastIndex = lseg.index + uint64(len(lseg.epos)) - 1 @@ -529,7 +532,7 @@ func (l *Log) findSegment(index uint64) int { return i - 1 } -func (l *Log) loadSegmentEntries(s *segment) error { +func (l *Log) loadSegmentEntries(s *segment, ignoreCorruptedTail bool) error { data, err := ioutil.ReadFile(s.path) if err != nil { return err @@ -544,6 +547,9 @@ func (l *Log) loadSegmentEntries(s *segment) error { } else { n, err = loadNextBinaryEntry(data) } + if err == ErrCorrupt && ignoreCorruptedTail { + break + } if err != nil { return err } @@ -551,7 +557,7 @@ func (l *Log) loadSegmentEntries(s *segment) error { epos = append(epos, bpos{pos, pos + n}) pos += n } - s.ebuf = ebuf + s.ebuf = ebuf[:pos] s.epos = epos return nil } @@ -607,7 +613,7 @@ func (l *Log) loadSegment(index uint64) (*segment, error) { s := l.segments[idx] if len(s.epos) == 0 { // load the entries from cache - if err := l.loadSegmentEntries(s); err != nil { + if err := l.loadSegmentEntries(s, false); err != nil { return nil, err } } @@ -791,7 +797,7 @@ func (l *Log) truncateFront(index uint64) (err error) { return err } // Load the last segment entries - if err = l.loadSegmentEntries(s); err != nil { + if err = l.loadSegmentEntries(s, false); err != nil { return err } } @@ -897,7 +903,7 @@ func (l *Log) truncateBack(index uint64) (err error) { l.segments = append([]*segment{}, l.segments[:segIdx+1]...) l.lastIndex = index l.clearCache() - if err = l.loadSegmentEntries(s); err != nil { + if err = l.loadSegmentEntries(s, false); err != nil { return err } return nil diff --git a/wal_test.go b/wal_test.go index b23a002..110ad59 100644 --- a/wal_test.go +++ b/wal_test.go @@ -519,6 +519,8 @@ func TestOutliers(t *testing.T) { t.Run("fail-corrupted-tail-json", func(t *testing.T) { defer os.RemoveAll("testlog/corrupt-tail") opts := makeOpts(512, true, JSON) + optsRecoverTail := *opts + optsRecoverTail.RecoverCorruptedTail = true os.MkdirAll("testlog/corrupt-tail", 0777) ioutil.WriteFile( "testlog/corrupt-tail/00000000000000000001", @@ -527,6 +529,10 @@ func TestOutliers(t *testing.T) { l.Close() t.Fatalf("expected %v, got %v", ErrCorrupt, err) } + if l, err := Open("testlog/corrupt-tail", &optsRecoverTail); err != nil { + l.Close() + t.Fatalf("expected %v, got %v", nil, err) + } ioutil.WriteFile( "testlog/corrupt-tail/00000000000000000001", []byte(`{}`+"\n"), 0666) @@ -534,6 +540,10 @@ func TestOutliers(t *testing.T) { l.Close() t.Fatalf("expected %v, got %v", ErrCorrupt, err) } + if l, err := Open("testlog/corrupt-tail", &optsRecoverTail); err != nil { + l.Close() + t.Fatalf("expected %v, got %v", nil, err) + } ioutil.WriteFile( "testlog/corrupt-tail/00000000000000000001", []byte(`{"index":"1"}`+"\n"), 0666) @@ -541,6 +551,10 @@ func TestOutliers(t *testing.T) { l.Close() t.Fatalf("expected %v, got %v", ErrCorrupt, err) } + if l, err := Open("testlog/corrupt-tail", &optsRecoverTail); err != nil { + l.Close() + t.Fatalf("expected %v, got %v", nil, err) + } ioutil.WriteFile( "testlog/corrupt-tail/00000000000000000001", []byte(`{"index":"1","data":"?"}`), 0666) @@ -548,6 +562,10 @@ func TestOutliers(t *testing.T) { l.Close() t.Fatalf("expected %v, got %v", ErrCorrupt, err) } + if l, err := Open("testlog/corrupt-tail", &optsRecoverTail); err != nil { + l.Close() + t.Fatalf("expected %v, got %v", nil, err) + } }) t.Run("start-marker-file", func(t *testing.T) {