8
8
"sync/atomic"
9
9
"time"
10
10
11
- "github.com/go-redis/redis_rate/v8 "
11
+ "github.com/go-redis/redis_rate/v9 "
12
12
13
13
"github.com/vmihailenco/taskq/v3/internal"
14
14
"github.com/vmihailenco/taskq/v3/internal/redislock"
@@ -140,7 +140,7 @@ func (c *Consumer) Stats() *ConsumerStats {
140
140
}
141
141
142
142
func (c * Consumer ) Add (msg * Message ) error {
143
- _ = c .limiter .Reserve (1 )
143
+ _ = c .limiter .Reserve (msg . Ctx , 1 )
144
144
c .buffer <- msg
145
145
return nil
146
146
}
@@ -283,24 +283,24 @@ func (c *Consumer) removeWorker(id int32) bool { //nolint:unused
283
283
return atomic .CompareAndSwapInt32 (& c .numWorker , id + 1 , id )
284
284
}
285
285
286
- func (c * Consumer ) addFetcher (id int32 ) bool {
286
+ func (c * Consumer ) addFetcher (ctx context. Context , id int32 ) bool {
287
287
c .startStopMu .Lock ()
288
288
defer c .startStopMu .Unlock ()
289
289
290
290
if atomic .CompareAndSwapInt32 (& c .numFetcher , id , id + 1 ) {
291
291
c .fetchersWG .Add (1 )
292
292
go func () {
293
293
defer c .fetchersWG .Done ()
294
- c .fetcher (id )
294
+ c .fetcher (ctx , id )
295
295
}()
296
296
return true
297
297
}
298
298
return false
299
299
}
300
300
301
- func (c * Consumer ) ensureFetcher () {
301
+ func (c * Consumer ) ensureFetcher (ctx context. Context ) {
302
302
if atomic .LoadInt32 (& c .numFetcher ) == 0 {
303
- c .addFetcher (0 )
303
+ c .addFetcher (ctx , 0 )
304
304
}
305
305
}
306
306
@@ -339,7 +339,7 @@ func (c *Consumer) ProcessAll(ctx context.Context) error {
339
339
340
340
// ProcessOne processes at most one message in the queue.
341
341
func (c * Consumer ) ProcessOne (ctx context.Context ) error {
342
- msg , err := c .reserveOne ()
342
+ msg , err := c .reserveOne (ctx )
343
343
if err != nil {
344
344
return err
345
345
}
@@ -349,14 +349,14 @@ func (c *Consumer) ProcessOne(ctx context.Context) error {
349
349
return c .Process (msg )
350
350
}
351
351
352
- func (c * Consumer ) reserveOne () (* Message , error ) {
352
+ func (c * Consumer ) reserveOne (ctx context. Context ) (* Message , error ) {
353
353
select {
354
354
case msg := <- c .buffer :
355
355
return msg , nil
356
356
default :
357
357
}
358
358
359
- msgs , err := c .q .ReserveN (1 , c .opt .WaitTimeout )
359
+ msgs , err := c .q .ReserveN (ctx , 1 , c .opt .WaitTimeout )
360
360
if err != nil && err != internal .ErrNotSupported {
361
361
return nil , err
362
362
}
@@ -371,7 +371,7 @@ func (c *Consumer) reserveOne() (*Message, error) {
371
371
return & msgs [0 ], nil
372
372
}
373
373
374
- func (c * Consumer ) fetcher (fetcherID int32 ) {
374
+ func (c * Consumer ) fetcher (ctx context. Context , fetcherID int32 ) {
375
375
timer := time .NewTimer (time .Minute )
376
376
timer .Stop ()
377
377
@@ -390,7 +390,7 @@ func (c *Consumer) fetcher(fetcherID int32) {
390
390
continue
391
391
}
392
392
393
- timeout , err := c .fetchMessages (timer , fetchTimeout )
393
+ timeout , err := c .fetchMessages (ctx , timer , fetchTimeout )
394
394
if err != nil {
395
395
if err == internal .ErrNotSupported {
396
396
atomic .StoreInt32 (& c .numFetcher , - 1 )
@@ -411,11 +411,11 @@ func (c *Consumer) fetcher(fetcherID int32) {
411
411
}
412
412
413
413
func (c * Consumer ) fetchMessages (
414
- timer * time.Timer , timeout time.Duration ,
414
+ ctx context. Context , timer * time.Timer , timeout time.Duration ,
415
415
) (bool , error ) {
416
- size := c .limiter .Reserve (c .opt .ReservationSize )
416
+ size := c .limiter .Reserve (ctx , c .opt .ReservationSize )
417
417
418
- msgs , err := c .q .ReserveN (size , c .opt .WaitTimeout )
418
+ msgs , err := c .q .ReserveN (ctx , size , c .opt .WaitTimeout )
419
419
if err != nil {
420
420
return false , err
421
421
}
@@ -452,7 +452,7 @@ func (c *Consumer) worker(ctx context.Context, workerID int32) {
452
452
var lock * redislock.Lock
453
453
defer func () {
454
454
if lock != nil {
455
- _ = lock .Release ()
455
+ _ = lock .Release (ctx )
456
456
}
457
457
}()
458
458
@@ -464,10 +464,10 @@ func (c *Consumer) worker(ctx context.Context, workerID int32) {
464
464
return
465
465
}
466
466
if c .opt .WorkerLimit > 0 {
467
- lock = c .lockWorker (lock , workerID )
467
+ lock = c .lockWorker (ctx , lock , workerID )
468
468
}
469
469
470
- msg := c .waitMessage (timer )
470
+ msg := c .waitMessage (ctx , timer )
471
471
if msg == nil {
472
472
if atomic .LoadInt32 (& c .state ) >= stateStoppingWorkers {
473
473
return
@@ -480,7 +480,7 @@ func (c *Consumer) worker(ctx context.Context, workerID int32) {
480
480
}
481
481
}
482
482
483
- func (c * Consumer ) waitMessage (timer * time.Timer ) * Message {
483
+ func (c * Consumer ) waitMessage (ctx context. Context , timer * time.Timer ) * Message {
484
484
const workerIdleTimeout = time .Second
485
485
486
486
select {
@@ -489,7 +489,7 @@ func (c *Consumer) waitMessage(timer *time.Timer) *Message {
489
489
default :
490
490
}
491
491
492
- c .ensureFetcher ()
492
+ c .ensureFetcher (ctx )
493
493
494
494
timer .Reset (workerIdleTimeout )
495
495
select {
@@ -698,7 +698,11 @@ func (c *Consumer) resetPause() {
698
698
atomic .StoreUint32 (& c .consecutiveNumErr , 0 )
699
699
}
700
700
701
- func (c * Consumer ) lockWorker (lock * redislock.Lock , workerID int32 ) * redislock.Lock {
701
+ func (c * Consumer ) lockWorker (
702
+ ctx context.Context ,
703
+ lock * redislock.Lock ,
704
+ workerID int32 ,
705
+ ) * redislock.Lock {
702
706
lockTimeout := c .opt .ReservationTimeout + 10 * time .Second
703
707
704
708
timer := time .NewTimer (time .Minute )
@@ -708,9 +712,9 @@ func (c *Consumer) lockWorker(lock *redislock.Lock, workerID int32) *redislock.L
708
712
var err error
709
713
if lock == nil {
710
714
key := fmt .Sprintf ("%s:worker:lock:%d" , c .q .Name (), workerID )
711
- lock , err = redislock .Obtain (c .opt .Redis , key , lockTimeout , nil )
715
+ lock , err = redislock .Obtain (ctx , c .opt .Redis , key , lockTimeout , nil )
712
716
} else {
713
- err = lock .Refresh (lockTimeout , nil )
717
+ err = lock .Refresh (ctx , lockTimeout , nil )
714
718
}
715
719
if err == nil {
716
720
return lock
@@ -720,7 +724,7 @@ func (c *Consumer) lockWorker(lock *redislock.Lock, workerID int32) *redislock.L
720
724
internal .Logger .Printf ("redislock.Lock failed: %s" , err )
721
725
}
722
726
if lock != nil {
723
- _ = lock .Release ()
727
+ _ = lock .Release (ctx )
724
728
lock = nil
725
729
}
726
730
@@ -729,7 +733,7 @@ func (c *Consumer) lockWorker(lock *redislock.Lock, workerID int32) *redislock.L
729
733
case <- timer .C :
730
734
case <- c .stopCh :
731
735
if lock != nil {
732
- _ = lock .Release ()
736
+ _ = lock .Release (ctx )
733
737
}
734
738
return nil
735
739
}
@@ -839,7 +843,7 @@ func (c *Consumer) replaceConfig(ctx context.Context, cfg *consumerConfig) {
839
843
atomic .StoreInt32 (& c .numFetcher , cfg .NumFetcher )
840
844
} else {
841
845
for id := numFetcher ; id < cfg .NumFetcher ; id ++ {
842
- if ! c .addFetcher (id ) {
846
+ if ! c .addFetcher (ctx , id ) {
843
847
internal .Logger .Printf ("taskq: addFetcher id=%d failed" , id )
844
848
}
845
849
}
@@ -874,7 +878,7 @@ type limiter struct {
874
878
cancelled uint32 // atomic
875
879
}
876
880
877
- func (l * limiter ) Reserve (max int ) int {
881
+ func (l * limiter ) Reserve (ctx context. Context , max int ) int {
878
882
if l .limiter == nil || l .limit == nil {
879
883
return max
880
884
}
@@ -899,7 +903,7 @@ func (l *limiter) Reserve(max int) int {
899
903
900
904
var size int
901
905
for {
902
- res , err := l .limiter .Allow (l .bucket , l .limit )
906
+ res , err := l .limiter .Allow (ctx , l .bucket , l .limit )
903
907
if err != nil {
904
908
//TODO: ??
905
909
time .Sleep (100 * time .Millisecond )
0 commit comments