diff --git a/codec.go b/codec.go new file mode 100644 index 0000000..5824b49 --- /dev/null +++ b/codec.go @@ -0,0 +1,18 @@ +package dque + +// Codec handles serialization and deserialization of queue items to and from bytes. +// The bytes produced by Encode must be fully self-contained: Decode receives only +// those bytes plus a builder func and must reconstruct the original value exactly. +// +// Encode uses an append-style signature so callers can provide a pre-allocated +// buffer via dst (e.g. from a sync.Pool) and avoid a heap allocation per call. +// Passing nil or an empty dst is always safe; the codec allocates internally. +type Codec interface { + // Encode appends the encoded form of v to dst and returns the extended slice. + // On error it returns dst unchanged. + Encode(dst []byte, v interface{}) ([]byte, error) + + // Decode decodes data into a new object constructed by builder. + // builder must return a non-nil pointer of the correct concrete type. + Decode(data []byte, builder func() interface{}) (interface{}, error) +} diff --git a/codec_test.go b/codec_test.go new file mode 100644 index 0000000..a1bcd65 --- /dev/null +++ b/codec_test.go @@ -0,0 +1,196 @@ +// codec_test.go — white-box unit tests for Codec implementations in the dque package. +package dque + +import ( + "strings" + "testing" +) + +// testPayload is the struct used in codec roundtrip tests. +type testPayload struct { + Name string + Value int + Tags []string + Counts map[string]int +} + +func testPayloadBuilder() interface{} { + return &testPayload{} +} + +// TestGobCodec_EncodeDecodeRoundtrip verifies that GobCodec can encode and decode +// the same value without data loss, including nested maps and slices. +func TestGobCodec_EncodeDecodeRoundtrip(t *testing.T) { + codec := GobCodec{} + + cases := []struct { + name string + val *testPayload + }{ + { + name: "basic fields", + val: &testPayload{Name: "hello", Value: 42}, + }, + { + name: "with slice", + val: &testPayload{Name: "tagged", Tags: []string{"a", "b", "c"}}, + }, + { + name: "with map", + val: &testPayload{Name: "counted", Counts: map[string]int{"x": 1, "y": 2}}, + }, + { + name: "zero value", + val: &testPayload{}, + }, + { + name: "large string", + val: &testPayload{Name: strings.Repeat("x", 4096)}, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + // Encode: provide a 4-byte placeholder just like segment.add() does. + dst := []byte{0, 0, 0, 0} + encoded, err := codec.Encode(dst, tc.val) + if err != nil { + t.Fatalf("Encode failed: %v", err) + } + if len(encoded) <= 4 { + t.Fatalf("Encode produced no payload bytes (len=%d)", len(encoded)) + } + + // The payload starts at offset 4 (after the placeholder). + payload := encoded[4:] + + decoded, err := codec.Decode(payload, testPayloadBuilder) + if err != nil { + t.Fatalf("Decode failed: %v", err) + } + got, ok := decoded.(*testPayload) + if !ok { + t.Fatalf("decoded type is %T, want *testPayload", decoded) + } + + if got.Name != tc.val.Name { + t.Errorf("Name: got %q want %q", got.Name, tc.val.Name) + } + if got.Value != tc.val.Value { + t.Errorf("Value: got %d want %d", got.Value, tc.val.Value) + } + if len(got.Tags) != len(tc.val.Tags) { + t.Errorf("Tags len: got %d want %d", len(got.Tags), len(tc.val.Tags)) + } + for i := range tc.val.Tags { + if got.Tags[i] != tc.val.Tags[i] { + t.Errorf("Tags[%d]: got %q want %q", i, got.Tags[i], tc.val.Tags[i]) + } + } + for k, v := range tc.val.Counts { + if got.Counts[k] != v { + t.Errorf("Counts[%q]: got %d want %d", k, got.Counts[k], v) + } + } + }) + } +} + +// TestGobCodec_EncodeAppendsToExistingDst verifies the append-style contract: +// bytes before the encode point must be preserved. +func TestGobCodec_EncodeAppendsToExistingDst(t *testing.T) { + codec := GobCodec{} + prefix := []byte{0, 0, 0, 0} // 4-byte placeholder, as used by segment.add + + val := &testPayload{Name: "append-test", Value: 7} + result, err := codec.Encode(prefix, val) + if err != nil { + t.Fatalf("Encode failed: %v", err) + } + + // First 4 bytes must still be the placeholder. + for i := 0; i < 4; i++ { + if result[i] != 0 { + t.Errorf("byte[%d] was overwritten: got %d want 0", i, result[i]) + } + } + // Payload must follow. + if len(result) <= 4 { + t.Fatal("no payload bytes after prefix") + } +} + +// TestGobCodec_DecodeReturnsErrorOnGarbage ensures corrupt data causes a decode error. +func TestGobCodec_DecodeReturnsErrorOnGarbage(t *testing.T) { + codec := GobCodec{} + _, err := codec.Decode([]byte{0xDE, 0xAD, 0xBE, 0xEF}, testPayloadBuilder) + if err == nil { + t.Fatal("expected decode error on garbage data, got nil") + } +} + +// TestGobCodec_EncodeReturnsDstOnError verifies that on an encode error, +// dst is returned unchanged (no partial data appended). +func TestGobCodec_EncodeReturnsDstOnError(t *testing.T) { + codec := GobCodec{} + // gob can't encode a channel type. + ch := make(chan int) + dst := []byte{1, 2, 3} + result, err := codec.Encode(dst, ch) + if err == nil { + t.Fatal("expected encode error for channel type, got nil") + } + // result must be exactly the original dst slice + if len(result) != len(dst) { + t.Errorf("result len changed on error: got %d want %d", len(result), len(dst)) + } +} + +// BenchmarkGobCodec_Encode measures the per-encode allocation and throughput. +func BenchmarkGobCodec_Encode(b *testing.B) { + codec := GobCodec{} + val := &testPayload{ + Name: "benchmark-item", + Value: 42, + Tags: []string{"tag1", "tag2"}, + Counts: map[string]int{"a": 1, "b": 2}, + } + buf := make([]byte, 0, 1024) + b.ResetTimer() + b.ReportAllocs() + for n := 0; n < b.N; n++ { + buf = buf[:0] + buf = append(buf, 0, 0, 0, 0) + result, err := codec.Encode(buf, val) + if err != nil { + b.Fatal(err) + } + _ = result + } +} + +// BenchmarkGobCodec_Decode measures the per-decode allocation and throughput. +func BenchmarkGobCodec_Decode(b *testing.B) { + codec := GobCodec{} + val := &testPayload{ + Name: "benchmark-item", + Value: 42, + Tags: []string{"tag1", "tag2"}, + Counts: map[string]int{"a": 1, "b": 2}, + } + encoded, err := codec.Encode([]byte{0, 0, 0, 0}, val) + if err != nil { + b.Fatal(err) + } + payload := encoded[4:] + + b.ResetTimer() + b.ReportAllocs() + for n := 0; n < b.N; n++ { + _, err := codec.Decode(payload, testPayloadBuilder) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/go.mod b/go.mod index f0ebbac..d0c6bc6 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,10 @@ module github.com/joncrlsn/dque -require github.com/pkg/errors v0.9.1 +go 1.21 require ( - github.com/gofrs/flock v0.7.1 - github.com/kr/pretty v0.2.0 // indirect - gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect + github.com/gofrs/flock v0.7.1 + github.com/kr/pretty v0.2.1 // indirect + github.com/pkg/errors v0.9.1 + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect ) - -go 1.13 diff --git a/go.sum b/go.sum index 0940827..446ff64 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,7 @@ github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc= github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -9,3 +10,4 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/gob_codec.go b/gob_codec.go new file mode 100644 index 0000000..efce489 --- /dev/null +++ b/gob_codec.go @@ -0,0 +1,29 @@ +package dque + +import ( + "bytes" + "encoding/gob" +) + +// GobCodec is the default Codec using encoding/gob. +// It is wire-compatible with the original joncrlsn/dque segment file format, +// so existing queue data on disk can be read without migration. +type GobCodec struct{} + +// Encode gob-encodes v and appends the result to dst. +func (GobCodec) Encode(dst []byte, v interface{}) ([]byte, error) { + var buf bytes.Buffer + if err := gob.NewEncoder(&buf).Encode(v); err != nil { + return dst, err + } + return append(dst, buf.Bytes()...), nil +} + +// Decode gob-decodes data into a new object returned by builder. +func (GobCodec) Decode(data []byte, builder func() interface{}) (interface{}, error) { + obj := builder() + if err := gob.NewDecoder(bytes.NewReader(data)).Decode(obj); err != nil { + return nil, err + } + return obj, nil +} diff --git a/queue.go b/queue.go index fce8d29..3763fd2 100644 --- a/queue.go +++ b/queue.go @@ -1,6 +1,4 @@ -// // Package dque is a fast embedded durable queue for Go -// package dque // @@ -10,17 +8,15 @@ package dque // import ( + "math" + "os" + "path" + "regexp" "strconv" "sync" "github.com/gofrs/flock" "github.com/pkg/errors" - - "io/ioutil" - "math" - "os" - "path" - "regexp" ) const lockFile = "lock.lock" @@ -56,7 +52,8 @@ type DQue struct { fileLock *flock.Flock firstSegment *qSegment lastSegment *qSegment - builder func() interface{} // builds a structure to load via gob + builder func() interface{} + codec Codec mutex sync.Mutex @@ -65,10 +62,14 @@ type DQue struct { turbo bool } -// New creates a new durable queue +// New creates a new durable queue using the default GobCodec. func New(name string, dirPath string, itemsPerSegment int, builder func() interface{}) (*DQue, error) { + return NewWithCodec(name, dirPath, itemsPerSegment, builder, GobCodec{}) +} + +// NewWithCodec creates a new durable queue using the provided Codec. +func NewWithCodec(name string, dirPath string, itemsPerSegment int, builder func() interface{}, codec Codec) (*DQue, error) { - // Validation if len(name) == 0 { return nil, errors.New("the queue name requires a value") } @@ -91,6 +92,7 @@ func New(name string, dirPath string, itemsPerSegment int, builder func() interf q.fullPath = fullPath q.config.ItemsPerSegment = itemsPerSegment q.builder = builder + q.codec = codec q.emptyCond = sync.NewCond(&q.mutex) if err := q.lock(); err != nil { @@ -98,8 +100,7 @@ func New(name string, dirPath string, itemsPerSegment int, builder func() interf } if err := q.load(); err != nil { - er := q.fileLock.Unlock() - if er != nil { + if er := q.fileLock.Unlock(); er != nil { return nil, er } return nil, err @@ -108,10 +109,14 @@ func New(name string, dirPath string, itemsPerSegment int, builder func() interf return &q, nil } -// Open opens an existing durable queue. +// Open opens an existing durable queue using the default GobCodec. func Open(name string, dirPath string, itemsPerSegment int, builder func() interface{}) (*DQue, error) { + return OpenWithCodec(name, dirPath, itemsPerSegment, builder, GobCodec{}) +} + +// OpenWithCodec opens an existing durable queue using the provided Codec. +func OpenWithCodec(name string, dirPath string, itemsPerSegment int, builder func() interface{}, codec Codec) (*DQue, error) { - // Validation if len(name) == 0 { return nil, errors.New("the queue name requires a value") } @@ -130,6 +135,7 @@ func Open(name string, dirPath string, itemsPerSegment int, builder func() inter q.fullPath = fullPath q.config.ItemsPerSegment = itemsPerSegment q.builder = builder + q.codec = codec q.emptyCond = sync.NewCond(&q.mutex) if err := q.lock(); err != nil { @@ -137,8 +143,7 @@ func Open(name string, dirPath string, itemsPerSegment int, builder func() inter } if err := q.load(); err != nil { - er := q.fileLock.Unlock() - if er != nil { + if er := q.fileLock.Unlock(); er != nil { return nil, er } return nil, err @@ -147,10 +152,14 @@ func Open(name string, dirPath string, itemsPerSegment int, builder func() inter return &q, nil } -// NewOrOpen either creates a new queue or opens an existing durable queue. +// NewOrOpen either creates a new queue or opens an existing durable queue using the default GobCodec. func NewOrOpen(name string, dirPath string, itemsPerSegment int, builder func() interface{}) (*DQue, error) { + return NewOrOpenWithCodec(name, dirPath, itemsPerSegment, builder, GobCodec{}) +} + +// NewOrOpenWithCodec either creates a new queue or opens an existing durable queue using the provided Codec. +func NewOrOpenWithCodec(name string, dirPath string, itemsPerSegment int, builder func() interface{}, codec Codec) (*DQue, error) { - // Validation if len(name) == 0 { return nil, errors.New("the queue name requires a value") } @@ -162,10 +171,10 @@ func NewOrOpen(name string, dirPath string, itemsPerSegment int, builder func() } fullPath := path.Join(dirPath, name) if dirExists(fullPath) { - return Open(name, dirPath, itemsPerSegment, builder) + return OpenWithCodec(name, dirPath, itemsPerSegment, builder, codec) } - return New(name, dirPath, itemsPerSegment, builder) + return NewWithCodec(name, dirPath, itemsPerSegment, builder, codec) } // Close releases the lock on the queue rendering it unusable for further usage by this instance. @@ -175,6 +184,7 @@ func (q *DQue) Close() error { q.mutex.Lock() defer q.mutex.Unlock() + // Finally mark this instance as closed to prevent any further access if q.fileLock == nil { return ErrQueueClosed } @@ -184,7 +194,6 @@ func (q *DQue) Close() error { return err } - // Finally mark this instance as closed to prevent any further access q.fileLock = nil // Wake-up any waiting goroutines for blocking queue access - they should get a ErrQueueClosed @@ -219,9 +228,8 @@ func (q *DQue) Enqueue(obj interface{}) error { // If this segment is full then create a new one if q.lastSegment.sizeOnDisk() >= q.config.ItemsPerSegment { - // We have filled our last segment to capacity, so create a new one - seg, err := newQueueSegment(q.fullPath, q.lastSegment.number+1, q.turbo, q.builder) + seg, err := newQueueSegment(q.fullPath, q.lastSegment.number+1, q.turbo, q.builder, q.codec) if err != nil { return errors.Wrapf(err, "error creating new queue segment: %d.", q.lastSegment.number+1) } @@ -229,15 +237,13 @@ func (q *DQue) Enqueue(obj interface{}) error { // If the last segment is not the first segment // then we need to close the file. if q.firstSegment != q.lastSegment { - var err = q.lastSegment.close() - if err != nil { + if err := q.lastSegment.close(); err != nil { return errors.Wrapf(err, "error closing previous segment file #%d.", q.lastSegment.number) } } // Replace the last segment with the new one q.lastSegment = seg - } // Add the object to the last segment @@ -254,7 +260,6 @@ func (q *DQue) Enqueue(obj interface{}) error { // Dequeue removes and returns the first item in the queue. // When the queue is empty, nil and dque.ErrEmpty are returned. func (q *DQue) Dequeue() (interface{}, error) { - // This is heavy-handed but its safe q.mutex.Lock() defer q.mutex.Unlock() @@ -266,7 +271,6 @@ func (q *DQue) dequeueLocked() (interface{}, error) { return nil, ErrQueueClosed } - // Remove the first object from the first segment obj, err := q.firstSegment.remove() if err == errEmptySegment { return nil, ErrEmpty @@ -275,22 +279,16 @@ func (q *DQue) dequeueLocked() (interface{}, error) { return nil, errors.Wrap(err, "error removing item from the first segment") } - // If this segment is empty and we've reached the max for this segment - // then delete the file and open the next one. if q.firstSegment.size() == 0 && q.firstSegment.sizeOnDisk() >= q.config.ItemsPerSegment { - // Delete the segment file if err := q.firstSegment.delete(); err != nil { return obj, errors.Wrap(err, "error deleting queue segment "+q.firstSegment.filePath()+". Queue is in an inconsistent state") } - // We have only one segment and it's now empty so destroy it and - // create a new one. if q.firstSegment.number == q.lastSegment.number { - // Create the next segment - seg, err := newQueueSegment(q.fullPath, q.firstSegment.number+1, q.turbo, q.builder) + seg, err := newQueueSegment(q.fullPath, q.firstSegment.number+1, q.turbo, q.builder, q.codec) if err != nil { return obj, errors.Wrap(err, "error creating new segment. Queue is in an inconsistent state") } @@ -300,12 +298,10 @@ func (q *DQue) dequeueLocked() (interface{}, error) { } else { if q.firstSegment.number+1 == q.lastSegment.number { - // We have 2 segments, moving down to 1 shared segment q.firstSegment = q.lastSegment } else { - // Open the next segment - seg, err := openQueueSegment(q.fullPath, q.firstSegment.number+1, q.turbo, q.builder) + seg, err := openQueueSegment(q.fullPath, q.firstSegment.number+1, q.turbo, q.builder, q.codec) if err != nil { return obj, errors.Wrap(err, "error creating new segment. Queue is in an inconsistent state") } @@ -322,7 +318,6 @@ func (q *DQue) dequeueLocked() (interface{}, error) { // When the queue is empty, nil and dque.ErrEmpty are returned. // Do not use this method with multiple dequeueing threads or you may regret it. func (q *DQue) Peek() (interface{}, error) { - // This is heavy-handed but it is safe q.mutex.Lock() defer q.mutex.Unlock() @@ -334,13 +329,11 @@ func (q *DQue) peekLocked() (interface{}, error) { return nil, ErrQueueClosed } - // Return the first object from the first segment obj, err := q.firstSegment.peek() if err == errEmptySegment { return nil, ErrEmpty } if err != nil { - // In reality this will (i.e. should not) never happen return nil, errors.Wrap(err, "error getting item from the first segment") } @@ -355,8 +348,6 @@ func (q *DQue) DequeueBlock() (interface{}, error) { obj, err := q.dequeueLocked() if err == ErrEmpty { q.emptyCond.Wait() - // Wait() atomically unlocks mutexEmptyCond and suspends execution of the calling goroutine. - // Receiving the signal does not guarantee an item is available, let's loop and check again. continue } else if err != nil { return nil, err @@ -373,8 +364,6 @@ func (q *DQue) PeekBlock() (interface{}, error) { obj, err := q.peekLocked() if err == ErrEmpty { q.emptyCond.Wait() - // Wait() atomically unlocks mutexEmptyCond and suspends execution of the calling goroutine. - // Receiving the signal does not guarantee an item is available, let's loop and check again. continue } else if err != nil { return nil, err @@ -391,7 +380,6 @@ func (q *DQue) Size() int { return 0 } - // This is heavy-handed but it is safe q.mutex.Lock() defer q.mutex.Unlock() @@ -400,11 +388,6 @@ func (q *DQue) Size() int { // SizeUnsafe returns the approximate number of items in the queue. Use Size() if // having the exact size is important to your use-case. -// -// The return value could be wildly inaccurate if the itemsPerSegment value has -// changed since the queue was last empty. -// Also, because this method is not synchronized, the size may change after -// entering this method. func (q *DQue) SizeUnsafe() int { if q.fileLock == nil { return 0 @@ -416,7 +399,7 @@ func (q *DQue) SizeUnsafe() int { return q.firstSegment.size() + (numSegmentsBetween * q.config.ItemsPerSegment) + q.lastSegment.size() } -// SegmentNumbers returns the number of both the first last segmment. +// SegmentNumbers returns the number of both the first and last segment. // There is likely no use for this information other than testing. func (q *DQue) SegmentNumbers() (int, int) { if q.fileLock == nil { @@ -425,18 +408,14 @@ func (q *DQue) SegmentNumbers() (int, int) { return q.firstSegment.number, q.lastSegment.number } -// Turbo returns true if the turbo flag is on. Having turbo on speeds things -// up significantly. +// Turbo returns true if the turbo flag is on. func (q *DQue) Turbo() bool { return q.turbo } // TurboOn allows the filesystem to decide when to sync file changes to disk. -// Throughput is greatly increased by turning turbo on, however there is some -// risk of losing data if a power-loss occurs. -// If turbo is already on an error is returned +// If turbo is already on an error is returned. func (q *DQue) TurboOn() error { - // This is heavy-handed but it is safe q.mutex.Lock() defer q.mutex.Unlock() @@ -453,11 +432,9 @@ func (q *DQue) TurboOn() error { return nil } -// TurboOff re-enables the "safety" mode that syncs every file change to disk as -// they happen. -// If turbo is already off an error is returned +// TurboOff re-enables the "safety" mode that syncs every file change to disk. +// If turbo is already off an error is returned. func (q *DQue) TurboOff() error { - // This is heavy-handed but it is safe q.mutex.Lock() defer q.mutex.Unlock() @@ -479,9 +456,8 @@ func (q *DQue) TurboOff() error { } // TurboSync allows you to fsync changes to disk, but only if turbo is on. -// If turbo is off an error is returned +// If turbo is off an error is returned. func (q *DQue) TurboSync() error { - // This is heavy-handed but it is safe q.mutex.Lock() defer q.mutex.Unlock() @@ -503,18 +479,15 @@ func (q *DQue) TurboSync() error { // load populates the queue from disk func (q *DQue) load() error { - // Find all queue files - files, err := ioutil.ReadDir(q.fullPath) + files, err := os.ReadDir(q.fullPath) if err != nil { return errors.Wrap(err, "unable to read files in "+q.fullPath) } - // Find the smallest and the largest file numbers minNum := math.MaxInt32 maxNum := 0 for _, f := range files { if !f.IsDir() && filePattern.MatchString(f.Name()) { - // Extract number out of the filename fileNumStr := filePattern.FindStringSubmatch(f.Name())[1] fileNum, _ := strconv.Atoi(fileNumStr) if fileNum > maxNum { @@ -526,33 +499,18 @@ func (q *DQue) load() error { } } - // If files were found, set q.firstSegment and q.lastSegment if maxNum > 0 { - // We found files - for { - seg, err := openQueueSegment(q.fullPath, minNum, q.turbo, q.builder) - if err != nil { - return errors.Wrap(err, "unable to create queue segment in "+q.fullPath) - } - // Make sure the first segment is not empty or it's not complete (i.e. is current) - if seg.size() > 0 || seg.sizeOnDisk() < q.config.ItemsPerSegment { - q.firstSegment = seg - break - } - // Delete the segment as it's empty and complete - seg.delete() - // Try the next one - minNum++ + seg, err := openQueueSegment(q.fullPath, minNum, q.turbo, q.builder, q.codec) + if err != nil { + return errors.Wrap(err, "unable to create queue segment in "+q.fullPath) } + q.firstSegment = seg if minNum == maxNum { - // We have only one segment so the - // first and last are the same instance (in this case) q.lastSegment = q.firstSegment } else { - // We have multiple segments - seg, err := openQueueSegment(q.fullPath, maxNum, q.turbo, q.builder) + seg, err = openQueueSegment(q.fullPath, maxNum, q.turbo, q.builder, q.codec) if err != nil { return errors.Wrap(err, "unable to create segment for "+q.fullPath) } @@ -560,13 +518,11 @@ func (q *DQue) load() error { } } else { - // We found no files so build a new queue starting with segment 1 - seg, err := newQueueSegment(q.fullPath, 1, q.turbo, q.builder) + seg, err := newQueueSegment(q.fullPath, 1, q.turbo, q.builder, q.codec) if err != nil { return errors.Wrap(err, "unable to create queue segment in "+q.fullPath) } - // The first and last are the same instance (in this case) q.firstSegment = seg q.lastSegment = seg } diff --git a/queue_test.go b/queue_test.go index 7742d22..fd92b58 100644 --- a/queue_test.go +++ b/queue_test.go @@ -38,15 +38,13 @@ func testQueue_AddRemoveLoop(t *testing.T, turbo bool) { t.Fatal("Error removing queue directory", err) } - // Create a new queue with segment size of 3 - var err error q := newQ(t, qName, turbo) for i := 0; i < 4; i++ { if err := q.Enqueue(&item2{i}); err != nil { t.Fatal("Error enqueueing", err) } - _, err = q.Dequeue() + _, err := q.Dequeue() if err != nil { t.Fatal("Error dequeueing", err) } @@ -55,23 +53,14 @@ func testQueue_AddRemoveLoop(t *testing.T, turbo bool) { assert(t, 0 == q.Size(), "Size is not 0") firstSegNum, lastSegNum := q.SegmentNumbers() - - // Assert that we have just one segment assert(t, firstSegNum == lastSegNum, "The first segment must match the last") - - // Assert that the first segment is #2 assert(t, 2 == firstSegNum, "The first segment is not 2") - // Now reopen the queue and check our assertions again. q.Close() q = openQ(t, qName, turbo) firstSegNum, lastSegNum = q.SegmentNumbers() - - // Assert that we have just one segment assert(t, firstSegNum == lastSegNum, "After opening, the first segment must match the second") - - // Assert that the first segment is #2 assert(t, 2 == firstSegNum, "After opening, the first segment is not 2") if err := os.RemoveAll(qName); err != nil { @@ -79,32 +68,28 @@ func testQueue_AddRemoveLoop(t *testing.T, turbo bool) { } } -// Adds 2 and removes 1 in a loop to ensure that when we've filled -// up the first segment that we delete it and move on to the next segment +// Adds 2 and removes 1 in a loop func TestQueue_Add2Remove1(t *testing.T) { testQueue_Add2Remove1(t, true /* true=turbo */) testQueue_Add2Remove1(t, false /* true=turbo */) } + func testQueue_Add2Remove1(t *testing.T, turbo bool) { qName := "test1" if err := os.RemoveAll(qName); err != nil { t.Fatal("Error removing queue directory", err) } - // Create a new queue with segment size of 3 - var err error q := newQ(t, qName, turbo) - // Add 2 and remove one each loop for i := 0; i < 4; i = i + 2 { - var item interface{} if err := q.Enqueue(&item2{i}); err != nil { t.Fatal("Error enqueueing", err) } if err := q.Enqueue(&item2{i + 1}); err != nil { t.Fatal("Error enqueueing", err) } - item, err = q.Dequeue() + item, err := q.Dequeue() if err != nil { t.Fatal("Error dequeueing", err) } @@ -112,33 +97,22 @@ func testQueue_Add2Remove1(t *testing.T, turbo bool) { } firstSegNum, lastSegNum := q.SegmentNumbers() - - // Assert that we have more than one segment assert(t, firstSegNum < lastSegNum, "The first segment cannot match the second") - - // Assert that the first segment is #2 assert(t, 2 == lastSegNum, "The last segment must be 2") - // Now reopen the queue and check our assertions again. q.Close() q = openQ(t, qName, turbo) firstSegNum, lastSegNum = q.SegmentNumbers() - - // Assert that we have more than one segment assert(t, firstSegNum < lastSegNum, "After opening, the first segment can not match the second") - - // Assert that the first segment is #2 assert(t, 2 == lastSegNum, "After opening, the last segment must be 2") - // Test Peek to make sure the size doesn't change assert(t, 2 == q.Size(), "Queue size is not 2 before peeking") obj, err := q.Peek() if err != nil { t.Fatal("Error peeking at the queue", err) } - - assert(t, 2 == q.Size(), "After peaking, aueue size must still be 2") + assert(t, 2 == q.Size(), "After peeking, queue size must still be 2") assert(t, obj != nil, "Peeked object must not be nil.") if err := os.RemoveAll(qName); err != nil { @@ -158,35 +132,25 @@ func testQueue_Add9Remove8(t *testing.T, turbo bool) { t.Fatal("Error removing queue directory", err) } - // Create new queue with segment size 3 q := newQ(t, qName, turbo) - // Enqueue 9 items for i := 0; i < 9; i++ { if err := q.Enqueue(&item2{i}); err != nil { t.Fatal("Error enqueueing", err) } } - // Check the Size calculation assert(t, 9 == q.Size(), "the size is calculated wrong. Should be 9") firstSegNum, lastSegNum := q.SegmentNumbers() - - // Assert that the first segment is #1 assert(t, 1 == firstSegNum, "the first segment is not 1") - - // Assert that the last segment is #4 assert(t, 3 == lastSegNum, "the last segment is not 3") - // Dequeue 8 items for i := 0; i < 8; i++ { iface, err := q.Dequeue() if err != nil { t.Fatal("Error dequeueing:", err) } - - // Check the Size calculation assert(t, 8-i == q.Size(), "the size is calculated wrong.") item, ok := iface.(item2) if ok { @@ -200,25 +164,17 @@ func testQueue_Add9Remove8(t *testing.T, turbo bool) { } firstSegNum, lastSegNum = q.SegmentNumbers() - - // Assert that we have only one segment assert(t, firstSegNum == lastSegNum, "The first segment must match the second") - - // Assert that the first segment is #3 assert(t, 3 == firstSegNum, "The last segment is not 3") - // Now reopen the queue and check our assertions again. q.Close() _ = openQ(t, qName, turbo) - // Assert that we have more than one segment assert(t, firstSegNum == lastSegNum, "After opening, the first segment must match the second") - - // Assert that the last segment is #3 assert(t, 3 == lastSegNum, "After opening, the last segment is not 3") if err := os.RemoveAll(qName); err != nil { - t.Fatal("Error cleaning up the queue directory:", err) + t.Fatal("Error removing queue directory for Add9Remove8:", err) } } @@ -226,17 +182,16 @@ func TestQueue_EmptyDequeue(t *testing.T) { testQueue_EmptyDequeue(t, true /* true=turbo */) testQueue_EmptyDequeue(t, false /* true=turbo */) } + func testQueue_EmptyDequeue(t *testing.T, turbo bool) { qName := "testEmptyDequeue" if err := os.RemoveAll(qName); err != nil { t.Fatal("Error removing queue directory:", err) } - // Create new queue q := newQ(t, qName, turbo) assert(t, 0 == q.Size(), "Expected an empty queue") - // Dequeue an item from the empty queue item, err := q.Dequeue() assert(t, dque.ErrEmpty == err, "Expected an ErrEmpty error") assert(t, item == nil, "Expected nil because queue is empty") @@ -257,11 +212,9 @@ func testQueue_NewOrOpen(t *testing.T, turbo bool) { t.Fatal("Error removing queue directory:", err) } - // Create new queue with newOrOpen q := newOrOpenQ(t, qName, turbo) q.Close() - // Open the same queue with newOrOpen q = newOrOpenQ(t, qName, turbo) q.Close() @@ -276,30 +229,24 @@ func TestQueue_Turbo(t *testing.T) { t.Fatal("Error removing queue directory:", err) } - // Create new queue q := newQ(t, qName, false) if err := q.TurboOff(); err == nil { t.Fatal("Expected an error") } - if err := q.TurboSync(); err == nil { t.Fatal("Expected an error") } - if err := q.TurboOn(); err != nil { t.Fatal("Error turning on turbo:", err) } - if err := q.TurboOn(); err == nil { t.Fatal("Expected an error") } - if err := q.TurboSync(); err != nil { t.Fatal("Error running TurboSync:", err) } - // Enqueue 1000 items start := time.Now() for i := 0; i < 1000; i++ { if err := q.Enqueue(&item2{i}); err != nil { @@ -309,12 +256,10 @@ func TestQueue_Turbo(t *testing.T) { elapsedTurbo := time.Since(start) assert(t, q.Turbo(), "Expected turbo to be on") - if err := q.TurboOff(); err != nil { t.Fatal("Error turning off turbo:", err) } - // Enqueue 1000 items start = time.Now() for i := 0; i < 1000; i++ { if err := q.Enqueue(&item2{i}); err != nil { @@ -336,13 +281,11 @@ func TestQueue_NewFlock(t *testing.T) { t.Fatal("Error cleaning up the queue directory:", err) } - // New and Close a DQue properly should work q, err := dque.New(qName, ".", 3, item2Builder) if err != nil { t.Fatal("Error creating dque:", err) } - err = q.Close() - if err != nil { + if err = q.Close(); err != nil { t.Fatal("Error closing dque:", err) } @@ -355,8 +298,7 @@ func TestQueue_NewFlock(t *testing.T) { if err == nil { t.Fatal("No error during double-open dque") } - err = q.Close() - if err != nil { + if err = q.Close(); err != nil { t.Fatal("Error closing dque:", err) } @@ -365,16 +307,13 @@ func TestQueue_NewFlock(t *testing.T) { if err != nil { t.Fatal("Error opening dque:", err) } - err = q.Close() - if err != nil { + if err = q.Close(); err != nil { t.Fatal("Error closing dque:", err) } - err = q.Close() - if err == nil { + if err = q.Close(); err == nil { t.Fatal("No error during double-closing dque") } - // Cleanup if err := os.RemoveAll(qName); err != nil { t.Fatal("Error removing queue directory:", err) } @@ -390,45 +329,39 @@ func TestQueue_UseAfterClose(t *testing.T) { if err != nil { t.Fatal("Error creating dque:", err) } - err = q.Enqueue(&item2{0}) - if err != nil { + if err = q.Enqueue(&item2{0}); err != nil { t.Fatal("Error enqueing item:", err) } - err = q.Close() - if err != nil { + if err = q.Close(); err != nil { t.Fatal("Error closing dque:", err) } - queueClosedError := "queue is closed" + const queueClosedError = "queue is closed" err = q.Close() - assert(t, err.Error() == queueClosedError, "Expected error not found", err) + assert(t, err.Error() == queueClosedError, "Expected error not found: %v", err) err = q.Enqueue(&item2{0}) - assert(t, err.Error() == queueClosedError, "Expected error not found", err) + assert(t, err.Error() == queueClosedError, "Expected error not found: %v", err) _, err = q.Dequeue() - assert(t, err.Error() == queueClosedError, "Expected error not found", err) + assert(t, err.Error() == queueClosedError, "Expected error not found: %v", err) _, err = q.Peek() - assert(t, err.Error() == queueClosedError, "Expected error not found", err) + assert(t, err.Error() == queueClosedError, "Expected error not found: %v", err) s := q.Size() - assert(t, s == 0, "Expected error") - + assert(t, s == 0, "Expected 0 size") s = q.SizeUnsafe() - assert(t, s == 0, "Expected error") + assert(t, s == 0, "Expected 0 size") err = q.TurboOn() - assert(t, err.Error() == queueClosedError, "Expected error not found", err) - + assert(t, err.Error() == queueClosedError, "Expected error not found: %v", err) err = q.TurboOff() - assert(t, err.Error() == queueClosedError, "Expected error not found", err) - + assert(t, err.Error() == queueClosedError, "Expected error not found: %v", err) err = q.TurboSync() - assert(t, err.Error() == queueClosedError, "Expected error not found", err) + assert(t, err.Error() == queueClosedError, "Expected error not found: %v", err) - // Cleanup if err := os.RemoveAll(qName); err != nil { t.Fatal("Error removing queue directory:", err) } @@ -443,8 +376,7 @@ func TestQueue_BlockingBehaviour(t *testing.T) { q := newQ(t, qName, false) go func() { - err := q.Enqueue(&item2{0}) - assert(t, err == nil, "Expected no error") + assert(t, q.Enqueue(&item2{0}) == nil, "Expected no error") }() x, err := q.PeekBlock() @@ -455,8 +387,8 @@ func TestQueue_BlockingBehaviour(t *testing.T) { assert(t, err == nil, "Expected no error") assert(t, x != nil, "Item is nil") - x, err = q.Dequeue() - assert(t, err == dque.ErrEmpty, "Expected error not found") + _, err = q.Dequeue() + assert(t, err == dque.ErrEmpty, "Expected ErrEmpty error") timeout := time.After(3 * time.Second) done := make(chan bool) @@ -466,11 +398,9 @@ func TestQueue_BlockingBehaviour(t *testing.T) { assert(t, x != nil, "Item is nil") done <- true }() - go func() { time.Sleep(1 * time.Second) - err := q.Enqueue(&item2{2}) - assert(t, err == nil, "Expected no error") + assert(t, q.Enqueue(&item2{2}) == nil, "Expected no error") }() select { @@ -479,7 +409,6 @@ func TestQueue_BlockingBehaviour(t *testing.T) { case <-done: } - // Cleanup if err := os.RemoveAll(qName); err != nil { t.Fatal("Error removing queue directory:", err) } @@ -495,18 +424,12 @@ func TestQueue_BlockingWithClose(t *testing.T) { go func() { time.Sleep(1 * time.Second) - err := q.Close() - assert(t, err == nil, "Expected no error") + assert(t, q.Close() == nil, "Expected no error") }() timeout := time.After(3 * time.Second) done := make(chan bool) go func() { - // The queue is empty, - // so DequeueBlock should really block and wait, - // until the other goroutine calls Close, - // and the Close should wake-up this DequeueBlock block, - // and return an error because the queue is now closed. _, err := q.DequeueBlock() assert(t, err == dque.ErrQueueClosed, "Expected ErrQueueClosed error") done <- true @@ -518,25 +441,23 @@ func TestQueue_BlockingWithClose(t *testing.T) { case <-done: } - // Cleanup if err := os.RemoveAll(qName); err != nil { t.Fatal("Error removing queue directory:", err) } } -func TestQueue_BlockingAggresive(t *testing.T) { - rand.Seed(0) // ensure we have reproducible sleeps +func TestQueue_BlockingAggressive(t *testing.T) { - qName := "testBlockingAggresive" + qName := "testBlockingAggressive" if err := os.RemoveAll(qName); err != nil { t.Fatal("Error removing queue directory:", err) } q := newQ(t, qName, false) - numProducers := 5 - numItemsPerProducer := 50 - numConsumers := 25 + const numProducers = 5 + const numItemsPerProducer = 50 + const numConsumers = 25 done := make(chan bool) var wg sync.WaitGroup @@ -548,20 +469,18 @@ func TestQueue_BlockingAggresive(t *testing.T) { done <- true }() - // producers for p := 0; p < numProducers; p++ { go func(producer int) { + rng := rand.New(rand.NewSource(int64(producer))) for i := 0; i < numItemsPerProducer; i++ { - s := rand.Intn(150) + s := rng.Intn(150) time.Sleep(time.Duration(s) * time.Millisecond) - err := q.Enqueue(&item2{i}) - assert(t, err == nil, "Expected no error", err) + assert(t, q.Enqueue(&item2{i}) == nil, "Expected no error") fmt.Println("Enqueued item", i, "by producer", producer, "after sleeping", s) } }(p) } - // consumers for c := 0; c < numConsumers; c++ { go func(consumer int) { for { @@ -583,19 +502,55 @@ func TestQueue_BlockingAggresive(t *testing.T) { case <-done: } - // Cleanup if err := os.RemoveAll(qName); err != nil { t.Fatal("Error removing queue directory:", err) } } +// TestQueue_WithCodecRoundtrip verifies that NewOrOpenWithCodec preserves data across close/reopen. +func TestQueue_WithCodecRoundtrip(t *testing.T) { + qName := "testWithCodecRoundtrip" + if err := os.RemoveAll(qName); err != nil { + t.Fatal("Error removing queue directory:", err) + } + defer os.RemoveAll(qName) + + const numItems = 15 + + q, err := dque.NewOrOpenWithCodec(qName, ".", 5, item2Builder, dque.GobCodec{}) + if err != nil { + t.Fatal("Error creating queue:", err) + } + for i := 0; i < numItems; i++ { + if enqErr := q.Enqueue(&item2{i}); enqErr != nil { + t.Fatalf("Enqueue[%d] failed: %s", i, enqErr) + } + } + assert(t, q.Size() == numItems, "Size mismatch after enqueue: got %d want %d", q.Size(), numItems) + q.Close() + + // Reopen with same codec and verify all items come back in order. + q2, err := dque.NewOrOpenWithCodec(qName, ".", 5, item2Builder, dque.GobCodec{}) + if err != nil { + t.Fatal("Error reopening queue:", err) + } + for i := 0; i < numItems; i++ { + raw, deqErr := q2.Dequeue() + if deqErr != nil { + t.Fatalf("Dequeue[%d] failed: %s", i, deqErr) + } + got, ok := raw.(*item2) + assert(t, ok, "Dequeued object is not *item2 at index %d", i) + assert(t, got.Id == i, "item[%d] Id mismatch: got %d want %d", i, got.Id, i) + } + q2.Close() +} + func newOrOpenQ(t *testing.T, qName string, turbo bool) *dque.DQue { - // Create a new segment with segment size of 3 q, err := dque.NewOrOpen(qName, ".", 3, item2Builder) if err != nil { t.Fatal("Error creating or opening dque:", err) } - if turbo { _ = q.TurboOn() } @@ -603,7 +558,6 @@ func newOrOpenQ(t *testing.T, qName string, turbo bool) *dque.DQue { } func newQ(t *testing.T, qName string, turbo bool) *dque.DQue { - // Create a new segment with segment size of 3 q, err := dque.New(qName, ".", 3, item2Builder) if err != nil { t.Fatal("Error creating new dque:", err) @@ -615,7 +569,6 @@ func newQ(t *testing.T, qName string, turbo bool) *dque.DQue { } func openQ(t *testing.T, qName string, turbo bool) *dque.DQue { - // Open an existing segment with segment size of 3 q, err := dque.Open(qName, ".", 3, item2Builder) if err != nil { t.Fatal("Error opening dque:", err) diff --git a/segment.go b/segment.go index 3d1cb46..3b2378f 100644 --- a/segment.go +++ b/segment.go @@ -16,9 +16,7 @@ package dque // import ( - "bytes" "encoding/binary" - "encoding/gob" "fmt" "io" "os" @@ -65,6 +63,15 @@ var ( errEmptySegment = errors.New("Segment is empty") ) +// segWritePool holds reusable byte slice pointers for the add() write path. +// Each entry is *[]byte so the pool can grow the backing array on reallocations. +var segWritePool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, 0, 1024) + return &buf + }, +} + // qSegment represents a portion (segment) of a persistent queue type qSegment struct { dirPath string @@ -77,10 +84,11 @@ type qSegment struct { turbo bool maybeDirty bool // filesystem changes may not have been flushed to disk syncCount int64 // for testing + codec Codec } -// load reads all objects from the queue file into a slice -// returns ErrCorruptedSegment or ErrUnableToDecode for errors pertaining to file contents. +// load reads all objects from the queue file into a slice. +// Returns ErrCorruptedSegment or ErrUnableToDecode for errors pertaining to file contents. func (seg *qSegment) load() error { // This is heavy-handed but its safe @@ -97,7 +105,7 @@ func (seg *qSegment) load() error { // Loop until we can load no more for { - // Read the 4 byte length of the gob + // Read the 4-byte length prefix lenBytes := make([]byte, 4) if n, err := io.ReadFull(seg.file, lenBytes); err != nil { if err == io.EOF { @@ -110,9 +118,9 @@ func (seg *qSegment) load() error { } // Convert the bytes into a 32-bit unsigned int - gobLen := binary.LittleEndian.Uint32(lenBytes) - if gobLen == 0 { - // Remove the first item from the in-memory queue + payloadLen := binary.LittleEndian.Uint32(lenBytes) + if payloadLen == 0 { + // A zero-length prefix is a tombstone marking a consumed item. if len(seg.objects) == 0 { return ErrCorruptedSegment{ Path: seg.filePath(), @@ -120,32 +128,28 @@ func (seg *qSegment) load() error { } } seg.objects = seg.objects[1:] - // log.Println("TEMP: Detected delete in load()") seg.removeCount++ continue } - data := make([]byte, int(gobLen)) + data := make([]byte, int(payloadLen)) if _, err := io.ReadFull(seg.file, data); err != nil { return ErrCorruptedSegment{ Path: seg.filePath(), - Err: errors.Wrap(err, "error reading gob data from file"), + Err: errors.Wrap(err, "error reading payload from file"), } } - // Decode the bytes into an object - object := seg.objectBuilder() - if err := gob.NewDecoder(bytes.NewReader(data)).Decode(object); err != nil { + // Decode the bytes into an object using the configured codec + object, decErr := seg.codec.Decode(data, seg.objectBuilder) + if decErr != nil { return ErrUnableToDecode{ Path: seg.filePath(), - Err: errors.Wrapf(err, "failed to decode %T", object), + Err: errors.Wrapf(decErr, "failed to decode object"), } } - // Add item to the objects slice seg.objects = append(seg.objects, object) - - // log.Printf("TEMP: Loaded: %#v\n", object) } } @@ -158,18 +162,14 @@ func (seg *qSegment) peek() (interface{}, error) { defer seg.mutex.Unlock() if len(seg.objects) == 0 { - // Queue is empty so return nil object (and emptySegment error) return nil, errEmptySegment } - // Save a reference to the first item in the in-memory queue - object := seg.objects[0] - - return object, nil + return seg.objects[0], nil } -// remove removes and returns the first item in the segment and adds -// a zero length marker to the end of the queue file to signify a removal. +// remove removes and returns the first item in the segment and appends +// a zero-length marker to the queue file to record the removal. // If the queue is already empty, the emptySegment error will be returned. func (seg *qSegment) remove() (interface{}, error) { @@ -178,30 +178,21 @@ func (seg *qSegment) remove() (interface{}, error) { defer seg.mutex.Unlock() if len(seg.objects) == 0 { - // Queue is empty so return nil object (and empty_segment error) return nil, errEmptySegment } - // Create a 4-byte length of value zero (this signifies a removal) - deleteLen := 0 + // A 4-byte zero is the tombstone that records a removal without rewriting the file. deleteLenBytes := make([]byte, 4) - binary.LittleEndian.PutUint32(deleteLenBytes, uint32(deleteLen)) + binary.LittleEndian.PutUint32(deleteLenBytes, 0) - // Write the 4-byte length (of zero) first if _, err := seg.file.Write(deleteLenBytes); err != nil { return nil, errors.Wrapf(err, "failed to remove item from segment %d", seg.number) } - // Save a reference to the first item in the in-memory queue object := seg.objects[0] - - // Remove the first item from the in-memory queue seg.objects = seg.objects[1:] - - // Increment the delete count seg.removeCount++ - // Possibly force writes to disk if err := seg._sync(); err != nil { return nil, err } @@ -209,39 +200,43 @@ func (seg *qSegment) remove() (interface{}, error) { return object, nil } -// Add adds an item to the in-memory queue segment and appends it to the persistent file +// add adds an item to the in-memory queue segment and appends it to the persistent file. +// The 4-byte length prefix and encoded payload are written in a single syscall using a +// pooled buffer to avoid a per-call heap allocation. func (seg *qSegment) add(object interface{}) error { // This is heavy-handed but its safe seg.mutex.Lock() defer seg.mutex.Unlock() - // Encode the struct to a byte buffer - var buff bytes.Buffer - enc := gob.NewEncoder(&buff) - if err := enc.Encode(object); err != nil { - return errors.Wrap(err, "error gob encoding object") + // Grab a reusable buffer. Reserve the first 4 bytes as a length placeholder so + // the codec can append the encoded bytes after it with no extra copy. + bp := segWritePool.Get().(*[]byte) + *bp = (*bp)[:0] + *bp = append(*bp, 0, 0, 0, 0) // length placeholder + + result, encErr := seg.codec.Encode(*bp, object) + if encErr != nil { + segWritePool.Put(bp) + return errors.Wrap(encErr, "error encoding object") } + // result = [placeholder(4)] + [encoded payload] + payloadLen := len(result) - 4 + binary.LittleEndian.PutUint32(result[:4], uint32(payloadLen)) - // Count the bytes stored in the byte buffer - // and store the count into a 4-byte byte array - buffLen := len(buff.Bytes()) - buffLenBytes := make([]byte, 4) - binary.LittleEndian.PutUint32(buffLenBytes, uint32(buffLen)) + // Single syscall: length prefix and payload combined. + _, writeErr := seg.file.Write(result) - // Write the 4-byte buffer length first - if _, err := seg.file.Write(buffLenBytes); err != nil { - return errors.Wrapf(err, "failed to write object length to segment %d", seg.number) - } + // Return the (possibly grown) backing array before checking the error. + *bp = result[:0] + segWritePool.Put(bp) - // Then write the buffer bytes - if _, err := seg.file.Write(buff.Bytes()); err != nil { - return errors.Wrapf(err, "failed to write object to segment %d", seg.number) + if writeErr != nil { + return errors.Wrapf(writeErr, "failed to write object to segment %d", seg.number) } seg.objects = append(seg.objects, object) - // Possibly force writes to disk return seg._sync() } @@ -280,15 +275,11 @@ func (seg *qSegment) delete() error { return errors.Wrap(err, "unable to close the segment file before deleting") } - // Delete the storage for this queue - err := os.Remove(seg.filePath()) - if err != nil { + if err := os.Remove(seg.filePath()); err != nil { return errors.Wrap(err, "error deleting file: "+seg.filePath()) } - // Empty the in-memory slice of objects seg.objects = seg.objects[:0] - seg.file = nil return nil @@ -302,8 +293,8 @@ func (seg *qSegment) filePath() string { return path.Join(seg.dirPath, seg.fileName()) } -// turboOn allows the filesystem to decide when to sync file changes to disk -// Speed is be greatly increased by turning turbo on, however there is some +// turboOn allows the filesystem to decide when to sync file changes to disk. +// Speed is greatly increased by turning turbo on, however there is some // risk of losing data should a power-loss occur. func (seg *qSegment) turboOn() { seg.turbo = true @@ -313,7 +304,7 @@ func (seg *qSegment) turboOn() { // they happen. func (seg *qSegment) turboOff() error { if !seg.turbo { - // turboOff is know to be called twice when the first and last ssegments + // turboOff is known to be called twice when the first and last segments // are the same. return nil } @@ -327,8 +318,6 @@ func (seg *qSegment) turboOff() error { // turboSync does an fsync to disk if turbo is on. func (seg *qSegment) turboSync() error { if !seg.turbo { - // When the first and last segments are the same, this method - // will be called twice. return nil } if seg.maybeDirty { @@ -342,11 +331,9 @@ func (seg *qSegment) turboSync() error { } // _sync must only be called by the add and remove methods on qSegment. -// Only syncs if turbo is off +// Only syncs if turbo is off. func (seg *qSegment) _sync() error { if seg.turbo { - // We do *not* force a sync if turbo is on - // We just mark it maybe dirty seg.maybeDirty = true return nil } @@ -363,18 +350,22 @@ func (seg *qSegment) _sync() error { // creating a new last segment. // This should only be called if this segment is not also the first segment. func (seg *qSegment) close() error { - if err := seg.file.Close(); err != nil { return errors.Wrapf(err, "unable to close segment file %s.", seg.fileName()) } - return nil } -// newQueueSegment creates a new, persistent segment of the queue -func newQueueSegment(dirPath string, number int, turbo bool, builder func() interface{}) (*qSegment, error) { +// newQueueSegment creates a new, persistent segment of the queue. +func newQueueSegment(dirPath string, number int, turbo bool, builder func() interface{}, codec Codec) (*qSegment, error) { - seg := qSegment{dirPath: dirPath, number: number, turbo: turbo, objectBuilder: builder} + seg := qSegment{ + dirPath: dirPath, + number: number, + turbo: turbo, + objectBuilder: builder, + codec: codec, + } if !dirExists(seg.dirPath) { return nil, errors.New("dirPath is not a valid directory: " + seg.dirPath) @@ -384,21 +375,25 @@ func newQueueSegment(dirPath string, number int, turbo bool, builder func() inte return nil, errors.New("file already exists: " + seg.filePath()) } - // Create the file in append mode var err error seg.file, err = os.OpenFile(seg.filePath(), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return nil, errors.Wrapf(err, "error creating file: %s.", seg.filePath()) } - // Leave the file open for future writes return &seg, nil } -// openQueueSegment reads an existing persistent segment of the queue into memory -func openQueueSegment(dirPath string, number int, turbo bool, builder func() interface{}) (*qSegment, error) { +// openQueueSegment reads an existing persistent segment of the queue into memory. +func openQueueSegment(dirPath string, number int, turbo bool, builder func() interface{}, codec Codec) (*qSegment, error) { - seg := qSegment{dirPath: dirPath, number: number, turbo: turbo, objectBuilder: builder} + seg := qSegment{ + dirPath: dirPath, + number: number, + turbo: turbo, + objectBuilder: builder, + codec: codec, + } if !dirExists(seg.dirPath) { return nil, errors.New("dirPath is not a valid directory: " + seg.dirPath) @@ -408,18 +403,15 @@ func openQueueSegment(dirPath string, number int, turbo bool, builder func() int return nil, errors.New("file does not exist: " + seg.filePath()) } - // Load the items into memory if err := seg.load(); err != nil { return nil, errors.Wrap(err, "unable to load queue segment in "+dirPath) } - // Re-open the file in append mode var err error seg.file, err = os.OpenFile(seg.filePath(), os.O_APPEND|os.O_WRONLY, 0644) if err != nil { return nil, errors.Wrap(err, "error opening file: "+seg.filePath()) } - // Leave the file open for future writes return &seg, nil } diff --git a/segment_test.go b/segment_test.go index aa9f81f..ad2caed 100644 --- a/segment_test.go +++ b/segment_test.go @@ -1,8 +1,8 @@ -// segement_test.go +// segment_test.go package dque // -// White box texting of the aSegment struct and methods. +// White box testing of the qSegment struct and methods. // import ( @@ -24,7 +24,7 @@ func item1Builder() interface{} { return &item1{} } -// Test_segment verifies the behavior of one segment. +// TestSegment verifies the behavior of one segment. func TestSegment(t *testing.T) { testDir := "./TestSegment" os.RemoveAll(testDir) @@ -33,7 +33,7 @@ func TestSegment(t *testing.T) { } // Create a new segment of the queue - seg, err := newQueueSegment(testDir, 1, false, item1Builder) + seg, err := newQueueSegment(testDir, 1, false, item1Builder, GobCodec{}) if err != nil { t.Fatalf("newQueueSegment('%s') failed with '%s'\n", testDir, err.Error()) } @@ -63,7 +63,7 @@ func TestSegment(t *testing.T) { // // Recreate the segment from disk and remove the remaining item // - seg, err = openQueueSegment(testDir, 1, false, item1Builder) + seg, err = openQueueSegment(testDir, 1, false, item1Builder, GobCodec{}) if err != nil { t.Fatalf("openQueueSegment('%s') failed with '%s'\n", testDir, err.Error()) } @@ -87,7 +87,7 @@ func TestSegment(t *testing.T) { func TestSegment_ErrCorruptedSegment(t *testing.T) { testDir := "./TestSegmentError" os.RemoveAll(testDir) - defer os.RemoveAll((testDir)) + defer os.RemoveAll(testDir) if err := os.Mkdir(testDir, 0755); err != nil { t.Fatalf("Error creating directory in the TestSegment_ErrCorruptedSegment method: %s\n", err) @@ -98,21 +98,16 @@ func TestSegment_ErrCorruptedSegment(t *testing.T) { t.Fatal(err) } - // expect an 8 byte object, but only write 7 bytes + // expect an 8-byte object payload, but only write 7 bytes if _, err := f.Write([]byte{0, 0, 0, 8, 1, 2, 3, 4, 5, 6, 7}); err != nil { t.Fatal(err) } f.Close() - _, err = openQueueSegment(testDir, 0, false, func() interface{} { return make([]byte, 8) }) + _, err = openQueueSegment(testDir, 0, false, func() interface{} { return make([]byte, 8) }, GobCodec{}) if err == nil { t.Fatal("expected ErrCorruptedSegment but got nil") } - // // go >= 1.13: - // var corruptedError ErrCorruptedSegment - // if !errors.As(err, &corruptedError) { - // t.Fatalf("expected ErrCorruptedSegment but got %T: %s", err, err) - // } corruptedError, ok := unwrapError(unwrapError(err)).(ErrCorruptedSegment) if !ok { t.Fatalf("expected ErrCorruptedSegment but got %T: %s", err, err) @@ -120,8 +115,9 @@ func TestSegment_ErrCorruptedSegment(t *testing.T) { if corruptedError.Path != "TestSegmentError/0000000000000.dque" { t.Fatalf("unexpected file path: %s", corruptedError.Path) } - if corruptedError.Error() != "segment file TestSegmentError/0000000000000.dque is corrupted: error reading gob data from file: unexpected EOF" { - t.Fatalf("wrong error message: %s", corruptedError.Error()) + const wantMsg = "segment file TestSegmentError/0000000000000.dque is corrupted: error reading payload from file: unexpected EOF" + if corruptedError.Error() != wantMsg { + t.Fatalf("wrong error message:\ngot: %s\nwant: %s", corruptedError.Error(), wantMsg) } } @@ -129,7 +125,7 @@ func unwrapError(err error) error { return err.(interface{ Unwrap() error }).Unwrap() } -// TestSegment_Open verifies the behavior of the openSegment function. +// TestSegment_openQueueSegment_failIfNew verifies that openQueueSegment fails for a new (non-existent) file. func TestSegment_openQueueSegment_failIfNew(t *testing.T) { testDir := "./TestSegment_Open" os.RemoveAll(testDir) @@ -137,13 +133,12 @@ func TestSegment_openQueueSegment_failIfNew(t *testing.T) { t.Fatalf("Error creating directory in the TestSegment_Open method: %s\n", err) } - seg, err := openQueueSegment(testDir, 1, false, item1Builder) + seg, err := openQueueSegment(testDir, 1, false, item1Builder, GobCodec{}) if err == nil { t.Fatalf("openQueueSegment('%s') should have failed because it should be new\n", testDir) } assert(t, seg == nil, "segment after failure must be nil") - // Cleanup if err := os.RemoveAll(testDir); err != nil { t.Fatalf("Error cleaning up directory from the TestSegment_Open method with '%s'\n", err.Error()) } @@ -157,7 +152,7 @@ func TestSegment_Turbo(t *testing.T) { t.Fatalf("Error creating directory in the TestSegment_Turbo method: %s\n", err) } - seg, err := newQueueSegment(testDir, 10, false, item1Builder) + seg, err := newQueueSegment(testDir, 10, false, item1Builder, GobCodec{}) if err != nil { t.Fatalf("newQueueSegment('%s') failed\n", testDir) } @@ -175,7 +170,7 @@ func TestSegment_Turbo(t *testing.T) { // Turn off turbo and expect the syncCount to increase when remove is called. if err = seg.turboOff(); err != nil { - t.Fatalf("Unexpecte error turning off turbo('%s')\n", testDir) + t.Fatalf("Unexpected error turning off turbo('%s')\n", testDir) } // seg.turboOff() calls seg.turboSync() which increments syncCount @@ -188,12 +183,59 @@ func TestSegment_Turbo(t *testing.T) { // seg.remove() calls seg._sync() which increments syncCount assert(t, 3 == seg.syncCount, "syncCount must be 3 now") - // Cleanup if err := os.RemoveAll(testDir); err != nil { t.Fatalf("Error cleaning up directory from the TestSegment_Open method with '%s'\n", err.Error()) } } +// TestSegment_CodecRoundtrip verifies that add()/load() round-trip through the codec correctly. +func TestSegment_CodecRoundtrip(t *testing.T) { + testDir := "./TestSegmentCodecRoundtrip" + os.RemoveAll(testDir) + defer os.RemoveAll(testDir) + if err := os.Mkdir(testDir, 0755); err != nil { + t.Fatalf("mkdir failed: %s", err) + } + + items := []*item1{ + {Name: "alpha"}, + {Name: "beta"}, + {Name: "gamma with spaces"}, + } + + // Write items using GobCodec + seg, err := newQueueSegment(testDir, 1, false, item1Builder, GobCodec{}) + if err != nil { + t.Fatalf("newQueueSegment: %s", err) + } + for _, it := range items { + if addErr := seg.add(it); addErr != nil { + t.Fatalf("add failed: %s", addErr) + } + } + if closeErr := seg.close(); closeErr != nil { + t.Fatalf("close failed: %s", closeErr) + } + + // Re-open and verify all items round-tripped correctly + seg2, err := openQueueSegment(testDir, 1, false, item1Builder, GobCodec{}) + if err != nil { + t.Fatalf("openQueueSegment: %s", err) + } + assert(t, seg2.size() == len(items), "size mismatch after reload: got %d want %d", seg2.size(), len(items)) + + for i, want := range items { + obj, removeErr := seg2.remove() + if removeErr != nil { + t.Fatalf("remove[%d] failed: %s", i, removeErr) + } + got, ok := obj.(*item1) + assert(t, ok, "decoded object is not *item1 at index %d", i) + assert(t, got.Name == want.Name, + "item[%d] name mismatch: got %q want %q", i, got.Name, want.Name) + } +} + // assert fails the test if the condition is false. func assert(tb testing.TB, condition bool, msg string, v ...interface{}) { if !condition {