Skip to content

Commit fd8eebc

Browse files
authored
Merge pull request #145 from Subomi/fix/ack-processed-messages-from-redisq
Call XAck inside Delete in redsiq
2 parents c9e62e6 + 670be0f commit fd8eebc

File tree

3 files changed

+92
-5
lines changed

3 files changed

+92
-5
lines changed

consumer_test.go

+73
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"log"
78
"math/rand"
89
"runtime"
910
"strconv"
@@ -17,6 +18,7 @@ import (
1718
"github.com/go-redis/redis_rate/v9"
1819

1920
"github.com/vmihailenco/taskq/v3"
21+
"github.com/vmihailenco/taskq/v3/redisq"
2022
)
2123

2224
const waitTimeout = time.Second
@@ -92,6 +94,73 @@ func testConsumer(t *testing.T, factory taskq.Factory, opt *taskq.QueueOptions)
9294
}
9395
}
9496

97+
func testConsumerDelete(t *testing.T, factory taskq.Factory, opt *taskq.QueueOptions) {
98+
c := context.Background()
99+
opt.WaitTimeout = waitTimeout
100+
opt.Redis = redisRing()
101+
102+
red, ok := opt.Redis.(redisq.RedisStreamClient)
103+
if !ok {
104+
log.Fatal(fmt.Errorf("redisq: Redis client must support streams"))
105+
}
106+
107+
q := factory.RegisterQueue(opt)
108+
defer q.Close()
109+
110+
purge(t, q)
111+
112+
ch := make(chan time.Time)
113+
task := taskq.RegisterTask(&taskq.TaskOptions{
114+
Name: nextTaskID(),
115+
Handler: func() error {
116+
ch <- time.Now()
117+
return nil
118+
},
119+
})
120+
121+
err := q.Add(task.WithArgs(c))
122+
if err != nil {
123+
t.Fatal(err)
124+
}
125+
126+
p := q.Consumer()
127+
if err := p.Start(c); err != nil {
128+
t.Fatal(err)
129+
}
130+
131+
select {
132+
case <-ch:
133+
case <-time.After(testTimeout):
134+
t.Fatalf("message was not processed")
135+
}
136+
137+
tm := time.Now().Add(opt.ReservationTimeout)
138+
end := strconv.FormatInt(unixMs(tm), 10)
139+
pending, err := red.XPendingExt(context.Background(), &redis.XPendingExtArgs{
140+
Stream: "taskq:{" + opt.Name + "}:stream",
141+
Group: "taskq",
142+
Start: "-",
143+
End: end,
144+
Count: 100,
145+
}).Result()
146+
147+
if err != nil {
148+
t.Fatal(err)
149+
}
150+
151+
if len(pending) > 0 {
152+
t.Fatal("task not acknowledged and still exists in pending list.")
153+
}
154+
155+
if err := p.Stop(); err != nil {
156+
t.Fatal(err)
157+
}
158+
159+
if err := q.Close(); err != nil {
160+
t.Fatal(err)
161+
}
162+
}
163+
95164
func testUnknownTask(t *testing.T, factory taskq.Factory, opt *taskq.QueueOptions) {
96165
c := context.Background()
97166
opt.WaitTimeout = waitTimeout
@@ -763,3 +832,7 @@ func nextTaskID() string {
763832
taskID++
764833
return id
765834
}
835+
836+
func unixMs(tm time.Time) int64 {
837+
return tm.UnixNano() / int64(time.Millisecond)
838+
}

redisq/queue.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323

2424
const batchSize = 100
2525

26-
type redisStreamClient interface {
26+
type RedisStreamClient interface {
2727
Del(ctx context.Context, keys ...string) *redis.IntCmd
2828
TxPipeline() redis.Pipeliner
2929

@@ -49,7 +49,7 @@ type Queue struct {
4949

5050
consumer *taskq.Consumer
5151

52-
redis redisStreamClient
52+
redis RedisStreamClient
5353
wg sync.WaitGroup
5454

5555
zset string
@@ -73,7 +73,7 @@ func NewQueue(opt *taskq.QueueOptions) *Queue {
7373
if opt.Redis == nil {
7474
panic(fmt.Errorf("redisq: Redis client is required"))
7575
}
76-
red, ok := opt.Redis.(redisStreamClient)
76+
red, ok := opt.Redis.(RedisStreamClient)
7777
if !ok {
7878
panic(fmt.Errorf("redisq: Redis client must support streams"))
7979
}
@@ -147,7 +147,7 @@ func (q *Queue) Add(msg *taskq.Message) error {
147147
return q.add(q.redis, msg)
148148
}
149149

150-
func (q *Queue) add(pipe redisStreamClient, msg *taskq.Message) error {
150+
func (q *Queue) add(pipe RedisStreamClient, msg *taskq.Message) error {
151151
if msg.TaskName == "" {
152152
return internal.ErrTaskNameRequired
153153
}
@@ -243,6 +243,10 @@ func (q *Queue) Release(msg *taskq.Message) error {
243243

244244
// Delete deletes the message from the queue.
245245
func (q *Queue) Delete(msg *taskq.Message) error {
246+
err := q.redis.XAck(msg.Ctx, q.stream, q.streamGroup, msg.ID).Err()
247+
if err != nil {
248+
return err
249+
}
246250
return q.redis.XDel(msg.Ctx, q.stream, msg.ID).Err()
247251
}
248252

@@ -373,6 +377,7 @@ func (q *Queue) schedulePending(ctx context.Context) (int, error) {
373377
if err != nil {
374378
if strings.HasPrefix(err.Error(), "NOGROUP") {
375379
q.createStreamGroup(ctx)
380+
376381
return 0, nil
377382
}
378383
return 0, err
@@ -386,8 +391,9 @@ func (q *Queue) schedulePending(ctx context.Context) (int, error) {
386391
if err != nil {
387392
return 0, err
388393
}
394+
389395
if len(xmsgs) != 1 {
390-
err := fmt.Errorf("redisq: can't find peding message id=%q in stream=%q",
396+
err := fmt.Errorf("redisq: can't find pending message id=%q in stream=%q",
391397
id, q.stream)
392398
return 0, err
393399
}

redisq_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package taskq_test
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/vmihailenco/taskq/v3"
78
"github.com/vmihailenco/taskq/v3/redisq"
@@ -17,6 +18,13 @@ func TestRedisqConsumer(t *testing.T) {
1718
})
1819
}
1920

21+
func TestRedisqAckMessage(t *testing.T) {
22+
testConsumerDelete(t, redisqFactory(), &taskq.QueueOptions{
23+
Name: queueName("redisq-ack-message"),
24+
ReservationTimeout: 1 * time.Second,
25+
})
26+
}
27+
2028
func TestRedisqUnknownTask(t *testing.T) {
2129
testUnknownTask(t, redisqFactory(), &taskq.QueueOptions{
2230
Name: queueName("redisq-unknown-task"),

0 commit comments

Comments
 (0)