diff --git a/queue.go b/queue.go index 51bfa10..f842bf1 100644 --- a/queue.go +++ b/queue.go @@ -1,30 +1,26 @@ // Package lfg implements a lock-free, multiple-producer, multiple-consumer queue. -// Queue[T any] is a generic type, where the items added to the queue are of type *T. package lfg import ( "sync/atomic" - "unsafe" "golang.org/x/sys/cpu" ) -const cacheLinesize = unsafe.Sizeof(cpu.CacheLinePad{}) - // Queue[T any] is a lock-free, multiple-producer, multiple-consumer queue. type Queue[T any] struct { - buf []*T + buf []T mask int64 - _ [cacheLinesize]byte + _ cpu.CacheLinePad consumerBarrier atomic.Int64 - _ [cacheLinesize]byte + _ cpu.CacheLinePad consumerCursor atomic.Int64 - _ [cacheLinesize]byte + _ cpu.CacheLinePad producerBarrier atomic.Int64 - _ [cacheLinesize]byte + _ cpu.CacheLinePad producerCursor atomic.Int64 } @@ -35,13 +31,13 @@ func NewQueue[T any](size uint) *Queue[T] { } return &Queue[T]{ - buf: make([]*T, size), + buf: make([]T, size), mask: int64(size - 1), } } // Enqueue adds an item to the queue. It returns false if the buffer is full. -func (b *Queue[T]) Enqueue(v *T) bool { +func (b *Queue[T]) Enqueue(v T) bool { var pc, cb int64 for { @@ -57,7 +53,7 @@ func (b *Queue[T]) Enqueue(v *T) bool { } } - atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&b.buf[(pc+1)&b.mask])), unsafe.Pointer(v)) + b.buf[(pc+1)&b.mask] = v for { if b.producerBarrier.CompareAndSwap(pc, pc+1) { @@ -69,7 +65,7 @@ func (b *Queue[T]) Enqueue(v *T) bool { } // Dequeue removes an item from the queue. It returns false if the buffer is empty. -func (b *Queue[T]) Dequeue() (*T, bool) { +func (b *Queue[T]) Dequeue() (T, bool) { var cc, pb int64 for { @@ -77,7 +73,8 @@ func (b *Queue[T]) Dequeue() (*T, bool) { pb = b.producerBarrier.Load() if pb == cc { - return nil, false + var zero T + return zero, false } if b.consumerCursor.CompareAndSwap(cc, cc+1) { @@ -85,7 +82,7 @@ func (b *Queue[T]) Dequeue() (*T, bool) { } } - v := atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&b.buf[(cc+1)&b.mask]))) + v := b.buf[(cc+1)&b.mask] for { if b.consumerBarrier.CompareAndSwap(cc, cc+1) { @@ -93,7 +90,7 @@ func (b *Queue[T]) Dequeue() (*T, bool) { } } - return (*T)(v), true + return v, true } func isPot(n uint) bool { diff --git a/queue_test.go b/queue_test.go index ad967af..522224c 100644 --- a/queue_test.go +++ b/queue_test.go @@ -15,54 +15,54 @@ func TestRingBufferSingleThread(t *testing.T) { t.Run("empty queue must return false", func(t *testing.T) { v, ok := b.Dequeue() assert.False(t, ok) - assert.Nil(t, v) + assert.Zero(t, v) }) t.Run("enqueue one and dequeue one", func(t *testing.T) { - ok := b.Enqueue(intPtr(0)) + ok := b.Enqueue(0) assert.True(t, ok) v, ok := b.Dequeue() assert.True(t, ok) - assert.Equal(t, 0, *v) + assert.Equal(t, 0, v) }) t.Run("enqueue until buffer is full", func(t *testing.T) { - ok := b.Enqueue(intPtr(1)) + ok := b.Enqueue(1) assert.True(t, ok) - ok = b.Enqueue(intPtr(2)) + ok = b.Enqueue(2) assert.True(t, ok) - ok = b.Enqueue(intPtr(3)) + ok = b.Enqueue(3) assert.True(t, ok) - ok = b.Enqueue(intPtr(4)) + ok = b.Enqueue(4) assert.False(t, ok) }) t.Run("dequeue one and check value", func(t *testing.T) { v, ok := b.Dequeue() assert.True(t, ok) - assert.Equal(t, 1, *v) + assert.Equal(t, 1, v) }) - t.Run("enqueing one after dequeueing one must succeed", func(t *testing.T) { - ok := b.Enqueue(intPtr(5)) + t.Run("enqueuing one after dequeueing one must succeed", func(t *testing.T) { + ok := b.Enqueue(5) assert.True(t, ok) }) t.Run("dequeue until buffer is empty", func(t *testing.T) { v, ok := b.Dequeue() assert.True(t, ok) - assert.Equal(t, 2, *v) + assert.Equal(t, 2, v) v, ok = b.Dequeue() assert.True(t, ok) - assert.Equal(t, 3, *v) + assert.Equal(t, 3, v) v, ok = b.Dequeue() assert.True(t, ok) - assert.Equal(t, 5, *v) + assert.Equal(t, 5, v) v, ok = b.Dequeue() assert.False(t, ok) - assert.Nil(t, v) + assert.Zero(t, v) }) t.Run("creating queues with invalid sizes must panic", func(t *testing.T) { @@ -77,18 +77,18 @@ func TestRingBufferSingleThread(t *testing.T) { } func TestRingBufferSPSC(t *testing.T) { - b := NewQueue[int](4) + b := NewQueue[int](128) wg := sync.WaitGroup{} wg.Add(2) - count := 1_000 + count := 1_000_000 go func() { defer wg.Done() for i := 0; i < count; { - ok := b.Enqueue(intPtr(i)) + ok := b.Enqueue(i) if ok { i++ } @@ -104,7 +104,7 @@ func TestRingBufferSPSC(t *testing.T) { v, ok := b.Dequeue() if ok { i++ - if expected != *v { + if expected != v { panic("unexpected value") } expected++ @@ -116,7 +116,7 @@ func TestRingBufferSPSC(t *testing.T) { } func BenchmarkRingBufferMPSC(b *testing.B) { - buf := NewQueue[testMsg](1024) + buf := NewQueue[*testMsg](1024) const producerCount = 4 countPerProducer := b.N @@ -155,7 +155,7 @@ func BenchmarkRingBufferMPSC(b *testing.B) { } func BenchmarkRingBufferSPSC(b *testing.B) { - buf := NewQueue[testMsg](1024) + buf := NewQueue[*testMsg](1024) wg := sync.WaitGroup{} wg.Add(2) @@ -188,7 +188,3 @@ func BenchmarkRingBufferSPSC(b *testing.B) { wg.Wait() } - -func intPtr(i int) *int { - return &i -}