Skip to content

Commit

Permalink
feat: added interval based wal synchronization (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
hitesh22rana authored Jan 26, 2025
1 parent f71f2f0 commit 9f1a618
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 2 deletions.
10 changes: 9 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package wal

import "os"
import (
"os"
"time"
)

// Options represents the configuration options for a Write-Ahead Log (WAL).
type Options struct {
Expand Down Expand Up @@ -29,6 +32,10 @@ type Options struct {

// BytesPerSync specifies the number of bytes to write before calling fsync.
BytesPerSync uint32

// SyncInterval is the time duration in which explicit synchronization is performed.
// If SyncInterval is zero, no periodic synchronization is performed.
SyncInterval time.Duration
}

const (
Expand All @@ -44,4 +51,5 @@ var DefaultOptions = Options{
SegmentFileExt: ".SEG",
Sync: false,
BytesPerSync: 0,
SyncInterval: 0,
}
27 changes: 27 additions & 0 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sort"
"strings"
"sync"
"time"
)

const (
Expand Down Expand Up @@ -40,6 +41,8 @@ type WAL struct {
pendingWrites [][]byte
pendingSize int64
pendingWritesLock sync.Mutex
closeC chan struct{}
syncTicker *time.Ticker
}

// Reader represents a reader for the WAL.
Expand All @@ -64,6 +67,7 @@ func Open(options Options) (*WAL, error) {
options: options,
olderSegments: make(map[SegmentID]*segment),
pendingWrites: make([][]byte, 0),
closeC: make(chan struct{}),
}

// create the directory if not exists.
Expand Down Expand Up @@ -117,6 +121,22 @@ func Open(options Options) (*WAL, error) {
}
}

// only start the sync operation if the SyncInterval is greater than 0.
if wal.options.SyncInterval > 0 {
wal.syncTicker = time.NewTicker(wal.options.SyncInterval)
go func() {
for {
select {
case <-wal.syncTicker.C:
_ = wal.Sync()
case <-wal.closeC:
wal.syncTicker.Stop()
return
}
}
}()
}

return wal, nil
}

Expand Down Expand Up @@ -428,6 +448,13 @@ func (wal *WAL) Close() error {
wal.mu.Lock()
defer wal.mu.Unlock()

select {
case <-wal.closeC:
// channel is already closed
default:
close(wal.closeC)
}

// close all segment files.
for _, segment := range wal.olderSegments {
if err := segment.Close(); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion wal_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package wal

import (
"github.com/stretchr/testify/assert"
"io"
"os"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func destroyWAL(wal *WAL) {
Expand Down

0 comments on commit 9f1a618

Please sign in to comment.