Skip to content

Commit 350266d

Browse files
committed
Use Message.TaskName when checking message uniqueness
1 parent c5c3219 commit 350266d

File tree

7 files changed

+34
-30
lines changed

7 files changed

+34
-30
lines changed

azsqs/queue.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ func (q *Queue) isDuplicate(msg *taskq.Message) bool {
510510
if msg.Name == "" {
511511
return false
512512
}
513-
return q.opt.Storage.Exists("taskq:" + q.opt.Name + ":" + msg.Name)
513+
return q.opt.Storage.Exists(msgutil.FullMessageName(q, msg))
514514
}
515515

516516
func findMessageById(msgs []*taskq.Message, id string) *taskq.Message {

internal/msgutil/msgutil.go

+4
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,7 @@ func UnwrapMessageHandler(fn interface{}) taskq.HandlerFunc {
3939
return h(msg)
4040
})
4141
}
42+
43+
func FullMessageName(q taskq.Queue, msg *taskq.Message) string {
44+
return "taskq:" + q.Name() + ":" + msg.TaskName + ":" + msg.Name
45+
}

ironmq/queue.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ func (q *Queue) isDuplicate(msg *taskq.Message) bool {
284284
if msg.Name == "" {
285285
return false
286286
}
287-
return q.opt.Storage.Exists("taskq:" + q.opt.Name + ":" + msg.Name)
287+
return q.opt.Storage.Exists(msgutil.FullMessageName(q, msg))
288288
}
289289

290290
func retry(fn func() error) error {

memqueue/queue.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/vmihailenco/taskq/v2"
1212
"github.com/vmihailenco/taskq/v2/internal"
13+
"github.com/vmihailenco/taskq/v2/internal/msgutil"
1314
)
1415

1516
type Queue struct {
@@ -173,5 +174,5 @@ func (q *Queue) isDuplicate(msg *taskq.Message) bool {
173174
if msg.Name == "" {
174175
return false
175176
}
176-
return q.opt.Storage.Exists("taskq:" + q.opt.Name + ":" + msg.Name)
177+
return q.opt.Storage.Exists(msgutil.FullMessageName(q, msg))
177178
}

message.go

+24
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package taskq
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
67
"fmt"
8+
"hash/fnv"
79
"time"
810

911
"github.com/valyala/gozstd"
@@ -74,6 +76,28 @@ func (m *Message) OnceInPeriod(period time.Duration, args ...interface{}) *Messa
7476
return m
7577
}
7678

79+
func hashArgs(args []interface{}) []byte {
80+
var buf bytes.Buffer
81+
enc := msgpack.NewEncoder(&buf)
82+
_ = enc.EncodeMulti(args...)
83+
b := buf.Bytes()
84+
85+
if len(b) <= 32 {
86+
return b
87+
}
88+
89+
h := fnv.New128a()
90+
_, _ = h.Write(b)
91+
return h.Sum(nil)
92+
}
93+
94+
func timeSlot(period time.Duration) int64 {
95+
if period <= 0 {
96+
return 0
97+
}
98+
return time.Now().UnixNano() / int64(period)
99+
}
100+
77101
func (m *Message) MarshalArgs() ([]byte, error) {
78102
if m.ArgsBin != nil {
79103
if m.ArgsCompression == "" {

redisq/queue.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/vmihailenco/taskq/v2"
1717
"github.com/vmihailenco/taskq/v2/internal"
18+
"github.com/vmihailenco/taskq/v2/internal/msgutil"
1819
"github.com/vmihailenco/taskq/v2/internal/redislock"
1920
)
2021

@@ -374,7 +375,7 @@ func (q *Queue) isDuplicate(msg *taskq.Message) bool {
374375
if msg.Name == "" {
375376
return false
376377
}
377-
return q.opt.Storage.Exists("taskq:" + q.opt.Name + ":" + msg.Name)
378+
return q.opt.Storage.Exists(msgutil.FullMessageName(q, msg))
378379
}
379380

380381
func (q *Queue) withRedisLock(name string, fn func() error) error {

task.go

-26
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
11
package taskq
22

33
import (
4-
"bytes"
54
"context"
65
"fmt"
7-
"hash/fnv"
86
"time"
9-
10-
"github.com/vmihailenco/msgpack/v4"
117
)
128

139
var unknownTaskOpt *TaskOptions
@@ -121,25 +117,3 @@ func (t *Task) WithArgs(ctx context.Context, args ...interface{}) *Message {
121117
msg.TaskName = t.opt.Name
122118
return msg
123119
}
124-
125-
func timeSlot(period time.Duration) int64 {
126-
if period <= 0 {
127-
return 0
128-
}
129-
return time.Now().UnixNano() / int64(period)
130-
}
131-
132-
func hashArgs(args []interface{}) []byte {
133-
var buf bytes.Buffer
134-
enc := msgpack.NewEncoder(&buf)
135-
_ = enc.EncodeMulti(args...)
136-
b := buf.Bytes()
137-
138-
if len(b) <= 64 {
139-
return b
140-
}
141-
142-
h := fnv.New128a()
143-
_, _ = h.Write(b)
144-
return h.Sum(nil)
145-
}

0 commit comments

Comments
 (0)