diff --git a/rabbitmq/rabbitmq.go b/rabbitmq/rabbitmq.go index 41d02e5..aea35a9 100644 --- a/rabbitmq/rabbitmq.go +++ b/rabbitmq/rabbitmq.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "strconv" + "sync" "time" "sync/atomic" @@ -172,7 +173,8 @@ func next(s []string, lastSeq int) int { // Channel amqp.Channel wapper type Channel struct { *amqp.Channel - closed int32 + closed int32 + cancelledConsumers sync.Map } // IsClosed indicate closed by developer @@ -191,29 +193,52 @@ func (ch *Channel) Close() error { return ch.Channel.Close() } +func (ch *Channel) Cancel(consumer string) error { + ch.cancelledConsumers.Store(consumer, true) + return ch.Channel.Cancel(consumer, false) +} + +func (ch *Channel) isConsumerCancelled(consumer string) bool { + val, ok := ch.cancelledConsumers.Load(consumer) + if !ok { + return false + } + cancelled, _ := val.(bool) + return cancelled +} + // Consume wrap amqp.Channel.Consume, the returned delivery will end only when channel closed by developer func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) { deliveries := make(chan amqp.Delivery) go func() { + defer close(deliveries) for { + if ch.IsClosed() || ch.isConsumerCancelled(consumer) { + return + } d, err := ch.Channel.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args) if err != nil { debugf("consume failed, err: %v", err) + if ch.IsClosed() || ch.isConsumerCancelled(consumer) { + return + } time.Sleep(time.Duration(delay) * time.Second) continue } for msg := range d { + if ch.IsClosed() || ch.isConsumerCancelled(consumer) { + return + } deliveries <- msg } - // sleep before IsClose call. closed flag may not set before sleep. - time.Sleep(time.Duration(delay) * time.Second) - - if ch.IsClosed() { - break + if ch.IsClosed() || ch.isConsumerCancelled(consumer) { + return } + + time.Sleep(time.Duration(delay) * time.Second) } }()