Skip to content

Commit 2a2d3fa

Browse files
committed
chore: added tests to expose pending list bug in rediq
1 parent 2f6bd74 commit 2a2d3fa

File tree

3 files changed

+86
-1
lines changed

3 files changed

+86
-1
lines changed

consumer_test.go

+75
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package taskq_test
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
67
"fmt"
8+
"io"
9+
"log"
710
"math/rand"
11+
"os"
812
"runtime"
913
"strconv"
1014
"strings"
@@ -92,6 +96,77 @@ func testConsumer(t *testing.T, factory taskq.Factory, opt *taskq.QueueOptions)
9296
}
9397
}
9498

99+
func testConsumerDelete(t *testing.T, factory taskq.Factory, opt *taskq.QueueOptions) {
100+
old := os.Stderr
101+
r, w, err := os.Pipe()
102+
if err != nil {
103+
log.Fatal(err)
104+
}
105+
os.Stderr = w
106+
107+
taskq.SetLogger(log.New(os.Stderr, "taskq: ", log.LstdFlags|log.Lshortfile))
108+
109+
c := context.Background()
110+
opt.WaitTimeout = waitTimeout
111+
opt.Redis = redisRing()
112+
113+
q := factory.RegisterQueue(opt)
114+
defer q.Close()
115+
116+
purge(t, q)
117+
118+
outC := make(chan string)
119+
go func() {
120+
var buf bytes.Buffer
121+
io.Copy(&buf, r)
122+
outC <- buf.String()
123+
}()
124+
125+
ch := make(chan time.Time)
126+
task := taskq.RegisterTask(&taskq.TaskOptions{
127+
Name: nextTaskID(),
128+
Handler: func() error {
129+
ch <- time.Now()
130+
return nil
131+
},
132+
})
133+
134+
err = q.Add(task.WithArgs(c))
135+
if err != nil {
136+
t.Fatal(err)
137+
}
138+
139+
p := q.Consumer()
140+
if err := p.Start(c); err != nil {
141+
t.Fatal(err)
142+
}
143+
144+
select {
145+
case <-ch:
146+
case <-time.After(testTimeout):
147+
t.Fatalf("message was not processed")
148+
}
149+
150+
time.Sleep(2 * time.Second)
151+
w.Close()
152+
os.Stderr = old
153+
154+
out := <-outC
155+
findPendingLogLine := strings.Contains(out, "redisq: pending failed: redisq: can't find pending message")
156+
157+
if findPendingLogLine {
158+
t.Fatalf("data not acknowledged and still exists in pending list.")
159+
}
160+
161+
if err := p.Stop(); err != nil {
162+
t.Fatal(err)
163+
}
164+
165+
if err := q.Close(); err != nil {
166+
t.Fatal(err)
167+
}
168+
}
169+
95170
func testUnknownTask(t *testing.T, factory taskq.Factory, opt *taskq.QueueOptions) {
96171
c := context.Background()
97172
opt.WaitTimeout = waitTimeout

redisq/queue.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ func (q *Queue) schedulePending(ctx context.Context) (int, error) {
377377
if err != nil {
378378
if strings.HasPrefix(err.Error(), "NOGROUP") {
379379
q.createStreamGroup(ctx)
380+
380381
return 0, nil
381382
}
382383
return 0, err
@@ -390,8 +391,9 @@ func (q *Queue) schedulePending(ctx context.Context) (int, error) {
390391
if err != nil {
391392
return 0, err
392393
}
394+
393395
if len(xmsgs) != 1 {
394-
err := fmt.Errorf("redisq: can't find peding message id=%q in stream=%q",
396+
err := fmt.Errorf("redisq: can't find pending message id=%q in stream=%q",
395397
id, q.stream)
396398
return 0, err
397399
}

redisq_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package taskq_test
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/vmihailenco/taskq/v3"
78
"github.com/vmihailenco/taskq/v3/redisq"
@@ -17,6 +18,13 @@ func TestRedisqConsumer(t *testing.T) {
1718
})
1819
}
1920

21+
func TestRedisqAckMessage(t *testing.T) {
22+
testConsumerDelete(t, redisqFactory(), &taskq.QueueOptions{
23+
Name: queueName("redisq-ack-message"),
24+
ReservationTimeout: 1 * time.Second,
25+
})
26+
}
27+
2028
func TestRedisqUnknownTask(t *testing.T) {
2129
testUnknownTask(t, redisqFactory(), &taskq.QueueOptions{
2230
Name: queueName("redisq-unknown-task"),

0 commit comments

Comments
 (0)