@@ -24,8 +24,7 @@ const msgSizeLimit = 262144
24
24
const delayUntilAttr = "TaskqDelayUntil"
25
25
26
26
type Queue struct {
27
- base base.Queue
28
- opt * taskq.QueueOptions
27
+ opt * taskq.QueueOptions
29
28
30
29
sqs * sqs.SQS
31
30
accountID string
@@ -44,7 +43,7 @@ type Queue struct {
44
43
consumer * taskq.Consumer
45
44
}
46
45
47
- var _ taskq.Queue = (* Queue )(nil )
46
+ var _ taskq.Queuer = (* Queue )(nil )
48
47
49
48
func NewQueue (sqs * sqs.SQS , accountID string , opt * taskq.QueueOptions ) * Queue {
50
49
opt .Init ()
@@ -62,15 +61,16 @@ func NewQueue(sqs *sqs.SQS, accountID string, opt *taskq.QueueOptions) *Queue {
62
61
}
63
62
64
63
func (q * Queue ) initAddQueue () {
64
+ queueName := "azsqs:" + q .opt .Name + ":add"
65
65
q .addQueue = memqueue .NewQueue (& taskq.QueueOptions {
66
- Name : "azsqs:" + q . opt . Name + ":add" ,
66
+ Name : queueName ,
67
67
BufferSize : 100 ,
68
68
Redis : q .opt .Redis ,
69
69
})
70
- q .addTask = q . addQueue .NewTask (& taskq.TaskOptions {
71
- Name : " add-message" ,
70
+ q .addTask = taskq .NewTask (& taskq.TaskOptions {
71
+ Name : queueName + ": add-message" ,
72
72
Handler : taskq .HandlerFunc (q .addBatcherAdd ),
73
- FallbackHandler : msgutil .UnwrapMessageHandler (q .HandleMessage ),
73
+ FallbackHandler : msgutil .UnwrapMessageHandler (taskq . Tasks .HandleMessage ),
74
74
RetryLimit : 3 ,
75
75
MinBackoff : time .Second ,
76
76
})
@@ -81,13 +81,14 @@ func (q *Queue) initAddQueue() {
81
81
}
82
82
83
83
func (q * Queue ) initDelQueue () {
84
+ queueName := "azsqs:" + q .opt .Name + ":delete"
84
85
q .delQueue = memqueue .NewQueue (& taskq.QueueOptions {
85
- Name : "azsqs:" + q . opt . Name + ":delete" ,
86
+ Name : queueName ,
86
87
BufferSize : 100 ,
87
88
Redis : q .opt .Redis ,
88
89
})
89
- q .delTask = q . delQueue .NewTask (& taskq.TaskOptions {
90
- Name : " delete-message" ,
90
+ q .delTask = taskq .NewTask (& taskq.TaskOptions {
91
+ Name : queueName + ": delete-message" ,
91
92
Handler : taskq .HandlerFunc (q .delBatcherAdd ),
92
93
RetryLimit : 3 ,
93
94
MinBackoff : time .Second ,
@@ -110,22 +111,6 @@ func (q *Queue) Options() *taskq.QueueOptions {
110
111
return q .opt
111
112
}
112
113
113
- func (q * Queue ) HandleMessage (msg * taskq.Message ) error {
114
- return q .base .HandleMessage (msg )
115
- }
116
-
117
- func (q * Queue ) NewTask (opt * taskq.TaskOptions ) * taskq.Task {
118
- return q .base .NewTask (q , opt )
119
- }
120
-
121
- func (q * Queue ) GetTask (name string ) * taskq.Task {
122
- return q .base .GetTask (name )
123
- }
124
-
125
- func (q * Queue ) RemoveTask (name string ) {
126
- q .base .RemoveTask (name )
127
- }
128
-
129
114
func (q * Queue ) Consumer () * taskq.Consumer {
130
115
if q .consumer == nil {
131
116
q .consumer = taskq .NewConsumer (q )
@@ -152,8 +137,11 @@ func (q *Queue) Add(msg *taskq.Message) error {
152
137
if msg .TaskName == "" {
153
138
return internal .ErrTaskNameRequired
154
139
}
140
+ if q .isDuplicate (msg ) {
141
+ return taskq .ErrDuplicate
142
+ }
155
143
msg = msgutil .WrapMessage (msg )
156
- return q .addTask .AddMessage (msg )
144
+ return q .addQueue . Add ( q . addTask .WithMessage (msg ) )
157
145
}
158
146
159
147
func (q * Queue ) queueURL () string {
@@ -286,7 +274,8 @@ func (q *Queue) Release(msg *taskq.Message) error {
286
274
287
275
// Delete deletes the message from the queue.
288
276
func (q * Queue ) Delete (msg * taskq.Message ) error {
289
- return q .delTask .AddMessage (msgutil .WrapMessage (msg ))
277
+ msg = msgutil .WrapMessage (msg )
278
+ return q .delQueue .Add (q .delTask .WithMessage (msg ))
290
279
}
291
280
292
281
// Purge deletes all messages from the queue using SQS API.
@@ -368,8 +357,8 @@ func (q *Queue) addBatch(msgs []*taskq.Message) error {
368
357
}
369
358
370
359
if len (str ) > msgSizeLimit {
371
- internal .Logger .Printf ("%s : str=%d bytes=%d is larger than %d" ,
372
- msg .Task , len (str ), len (b ), msgSizeLimit )
360
+ internal .Logger .Printf ("task=%q : str=%d bytes=%d is larger than %d" ,
361
+ msg .TaskName , len (str ), len (b ), msgSizeLimit )
373
362
}
374
363
375
364
entry := & sqs.SendMessageBatchRequestEntry {
@@ -520,6 +509,13 @@ func (q *Queue) GetDeleteQueue() *memqueue.Queue {
520
509
return q .delQueue
521
510
}
522
511
512
+ func (q * Queue ) isDuplicate (msg * taskq.Message ) bool {
513
+ if msg .Name == "" {
514
+ return false
515
+ }
516
+ return q .opt .Storage .Exists ("taskq:" + q .opt .Name + ":" + msg .Name )
517
+ }
518
+
523
519
func findMessageById (msgs []* taskq.Message , id string ) * taskq.Message {
524
520
i , err := strconv .Atoi (id )
525
521
if err != nil {
0 commit comments