diff --git a/cleaner_test.go b/cleaner_test.go index 3a5d4a7..22c28ab 100644 --- a/cleaner_test.go +++ b/cleaner_test.go @@ -61,7 +61,7 @@ func TestCleaner(t *testing.T) { count, err = queue.unackedCount() assert.NoError(t, err) assert.Equal(t, int64(0), count) - assert.NoError(t, queue.StartConsuming(2, time.Millisecond, nil)) + assert.NoError(t, queue.StartConsuming(2, time.Millisecond)) time.Sleep(time.Millisecond) count, err = queue.unackedCount() assert.NoError(t, err) @@ -86,7 +86,7 @@ func TestCleaner(t *testing.T) { require.NotNil(t, consumer.LastDelivery) assert.Equal(t, "del1", consumer.LastDelivery.Payload()) - assert.NoError(t, consumer.LastDelivery.Ack(nil, nil)) + assert.NoError(t, consumer.LastDelivery.Ack(nil)) time.Sleep(10 * time.Millisecond) count, err = queue.unackedCount() assert.NoError(t, err) @@ -138,7 +138,7 @@ func TestCleaner(t *testing.T) { count, err = queue.unackedCount() assert.NoError(t, err) assert.Equal(t, int64(0), count) - assert.NoError(t, queue.StartConsuming(2, time.Millisecond, nil)) + assert.NoError(t, queue.StartConsuming(2, time.Millisecond)) time.Sleep(time.Millisecond) count, err = queue.unackedCount() assert.NoError(t, err) @@ -172,7 +172,7 @@ func TestCleaner(t *testing.T) { assert.Equal(t, int64(6), count) assert.Equal(t, "del5", consumer.LastDelivery.Payload()) - assert.NoError(t, consumer.LastDelivery.Ack(nil, nil)) + assert.NoError(t, consumer.LastDelivery.Ack(nil)) time.Sleep(10 * time.Millisecond) count, err = queue.unackedCount() assert.NoError(t, err) @@ -202,7 +202,7 @@ func TestCleaner(t *testing.T) { assert.NoError(t, err) queue, err = conn.OpenQueue("q1") assert.NoError(t, err) - assert.NoError(t, queue.StartConsuming(10, time.Millisecond, nil)) + assert.NoError(t, queue.StartConsuming(10, time.Millisecond)) consumer = NewTestConsumer("c-C") _, err = queue.AddConsumer("consumer3", consumer) diff --git a/connection.go b/connection.go index e047204..9e104ff 100644 --- a/connection.go +++ b/connection.go @@ -50,6 +50,7 @@ type redisConnection struct { heartbeatKey string // key to keep alive queuesKey string // key to list of queues consumed by this connection redisClient RedisClient + errChan chan<- error heartbeatTicker *time.Ticker // list of all queues that have been opened in this connection @@ -58,27 +59,27 @@ type redisConnection struct { } // OpenConnection opens and returns a new connection -func OpenConnection(tag, network, address string, db int, errors chan<- error) (Connection, error) { +func OpenConnection(tag, network, address string, db int, errChan chan<- error) (Connection, error) { redisClient := redis.NewClient(&redis.Options{ Network: network, Addr: address, DB: db, }) - return OpenConnectionWithRedisClient(tag, redisClient, errors) + return OpenConnectionWithRedisClient(tag, redisClient, errChan) } // OpenConnectionWithRedisClient opens and returns a new connection -func OpenConnectionWithRedisClient(tag string, redisClient *redis.Client, errors chan<- error) (*redisConnection, error) { - return openConnectionWithRedisClient(tag, RedisWrapper{redisClient}, errors) +func OpenConnectionWithRedisClient(tag string, redisClient *redis.Client, errChan chan<- error) (*redisConnection, error) { + return openConnectionWithRedisClient(tag, RedisWrapper{redisClient}, errChan) } // OpenConnectionWithTestRedisClient opens and returns a new connection which // uses a test redis client internally. This is useful in integration tests. -func OpenConnectionWithTestRedisClient(tag string, errors chan<- error) (*redisConnection, error) { - return openConnectionWithRedisClient(tag, NewTestRedisClient(), errors) +func OpenConnectionWithTestRedisClient(tag string, errChan chan<- error) (*redisConnection, error) { + return openConnectionWithRedisClient(tag, NewTestRedisClient(), errChan) } -func openConnectionWithRedisClient(tag string, redisClient RedisClient, errors chan<- error) (*redisConnection, error) { +func openConnectionWithRedisClient(tag string, redisClient RedisClient, errChan chan<- error) (*redisConnection, error) { name := fmt.Sprintf("%s-%s", tag, uniuri.NewLen(6)) connection := &redisConnection{ @@ -86,6 +87,7 @@ func openConnectionWithRedisClient(tag string, redisClient RedisClient, errors c heartbeatKey: strings.Replace(connectionHeartbeatTemplate, phConnection, name, 1), queuesKey: strings.Replace(connectionQueuesTemplate, phConnection, name, 1), redisClient: redisClient, + errChan: errChan, heartbeatTicker: time.NewTicker(heartbeatInterval), } @@ -98,7 +100,7 @@ func openConnectionWithRedisClient(tag string, redisClient RedisClient, errors c return nil, err } - go connection.heartbeat(errors) + go connection.heartbeat(errChan) // log.Printf("rmq connection connected to %s %s:%s %d", name, network, address, db) return connection, nil } @@ -108,7 +110,7 @@ func (connection *redisConnection) updateHeartbeat() error { } // heartbeat keeps the heartbeat key alive -func (connection *redisConnection) heartbeat(errors chan<- error) { +func (connection *redisConnection) heartbeat(errChan chan<- error) { errorCount := 0 // number of consecutive errors for range connection.heartbeatTicker.C { err := connection.updateHeartbeat() @@ -121,7 +123,7 @@ func (connection *redisConnection) heartbeat(errors chan<- error) { errorCount++ select { // try to add error to channel, but don't block - case errors <- &HeartbeatError{RedisErr: err, Count: errorCount}: + case errChan <- &HeartbeatError{RedisErr: err, Count: errorCount}: default: } @@ -246,7 +248,13 @@ func (connection *redisConnection) getConsumingQueues() ([]string, error) { // openQueue opens a queue without adding it to the set of queues func (connection *redisConnection) openQueue(name string) Queue { - return newQueue(name, connection.Name, connection.queuesKey, connection.redisClient) + return newQueue( + name, + connection.Name, + connection.queuesKey, + connection.redisClient, + connection.errChan, + ) } // stopHeartbeat stops the heartbeat of the connection diff --git a/deliveries.go b/deliveries.go index e9e2e55..dd5f1ed 100644 --- a/deliveries.go +++ b/deliveries.go @@ -19,27 +19,26 @@ func (deliveries Deliveries) Payloads() []string { // functions with retry, see comments in delivery.go (recommended) -func (deliveries Deliveries) Ack(ctx context.Context, errChan chan<- error) (errMap map[int]error) { - return deliveries.each(ctx, errChan, Delivery.Ack) +func (deliveries Deliveries) Ack(ctx context.Context) (errMap map[int]error) { + return deliveries.each(ctx, Delivery.Ack) } -func (deliveries Deliveries) Reject(ctx context.Context, errChan chan<- error) (errMap map[int]error) { - return deliveries.each(ctx, errChan, Delivery.Reject) +func (deliveries Deliveries) Reject(ctx context.Context) (errMap map[int]error) { + return deliveries.each(ctx, Delivery.Reject) } -func (deliveries Deliveries) Push(ctx context.Context, errChan chan<- error) (errMap map[int]error) { - return deliveries.each(ctx, errChan, Delivery.Push) +func (deliveries Deliveries) Push(ctx context.Context) (errMap map[int]error) { + return deliveries.each(ctx, Delivery.Push) } // helper functions func (deliveries Deliveries) each( ctx context.Context, - errChan chan<- error, - f func(Delivery, context.Context, chan<- error) error, + f func(Delivery, context.Context) error, ) (errMap map[int]error) { for i, delivery := range deliveries { - if err := f(delivery, ctx, errChan); err != nil { + if err := f(delivery, ctx); err != nil { if errMap == nil { // create error map lazily on demand errMap = map[int]error{} } diff --git a/delivery.go b/delivery.go index f7316a0..b1bc5bd 100644 --- a/delivery.go +++ b/delivery.go @@ -9,9 +9,9 @@ import ( type Delivery interface { Payload() string - Ack(context.Context, chan<- error) error - Reject(context.Context, chan<- error) error - Push(context.Context, chan<- error) error + Ack(context.Context) error + Reject(context.Context) error + Push(context.Context) error } type redisDelivery struct { @@ -20,15 +20,24 @@ type redisDelivery struct { rejectedKey string pushKey string redisClient RedisClient + errChan chan<- error } -func newDelivery(payload, unackedKey, rejectedKey, pushKey string, redisClient RedisClient) *redisDelivery { +func newDelivery( + payload string, + unackedKey string, + rejectedKey string, + pushKey string, + redisClient RedisClient, + errChan chan<- error, +) *redisDelivery { return &redisDelivery{ payload: payload, unackedKey: unackedKey, rejectedKey: rejectedKey, pushKey: pushKey, redisClient: redisClient, + errChan: errChan, } } @@ -46,7 +55,7 @@ func (delivery *redisDelivery) Payload() string { // 3. if the context is cancalled or its timeout exceeded, context.Cancelled or // context.DeadlineExceeded will be returned -func (delivery *redisDelivery) Ack(ctx context.Context, errChan chan<- error) error { +func (delivery *redisDelivery) Ack(ctx context.Context) error { if ctx == nil { // TODO: remove this ctx = context.TODO() } @@ -66,7 +75,7 @@ func (delivery *redisDelivery) Ack(ctx context.Context, errChan chan<- error) er errorCount++ select { // try to add error to channel, but don't block - case errChan <- &DeliveryError{Delivery: delivery, RedisErr: err, Count: errorCount}: + case delivery.errChan <- &DeliveryError{Delivery: delivery, RedisErr: err, Count: errorCount}: default: } @@ -78,19 +87,19 @@ func (delivery *redisDelivery) Ack(ctx context.Context, errChan chan<- error) er } } -func (delivery *redisDelivery) Reject(ctx context.Context, errChan chan<- error) error { - return delivery.move(ctx, errChan, delivery.rejectedKey) +func (delivery *redisDelivery) Reject(ctx context.Context) error { + return delivery.move(ctx, delivery.rejectedKey) } -func (delivery *redisDelivery) Push(ctx context.Context, errChan chan<- error) error { +func (delivery *redisDelivery) Push(ctx context.Context) error { if delivery.pushKey == "" { - return delivery.Reject(ctx, errChan) // fall back to rejecting + return delivery.Reject(ctx) // fall back to rejecting } - return delivery.move(ctx, errChan, delivery.pushKey) + return delivery.move(ctx, delivery.pushKey) } -func (delivery *redisDelivery) move(ctx context.Context, errChan chan<- error, key string) error { +func (delivery *redisDelivery) move(ctx context.Context, key string) error { errorCount := 0 for { _, err := delivery.redisClient.LPush(key, delivery.payload) @@ -102,14 +111,14 @@ func (delivery *redisDelivery) move(ctx context.Context, errChan chan<- error, k errorCount++ select { // try to add error to channel, but don't block - case errChan <- &DeliveryError{Delivery: delivery, RedisErr: err, Count: errorCount}: + case delivery.errChan <- &DeliveryError{Delivery: delivery, RedisErr: err, Count: errorCount}: default: } time.Sleep(time.Second) } - return delivery.Ack(ctx, errChan) + return delivery.Ack(ctx) } // lower level functions which don't retry but just return the first error diff --git a/example/batch_consumer/main.go b/example/batch_consumer/main.go index 37bcce1..6483b96 100644 --- a/example/batch_consumer/main.go +++ b/example/batch_consumer/main.go @@ -56,10 +56,10 @@ func main() { if err != nil { panic(err) } - if err := queue.StartConsuming(unackedLimit, pollDuration, nil); err != nil { + if err := queue.StartConsuming(unackedLimit, pollDuration); err != nil { panic(err) } - if _, err := queue.AddBatchConsumer(queueName, batchSize, batchTimeout, NewBatchConsumer(ctx, errChan, queueName)); err != nil { + if _, err := queue.AddBatchConsumer(queueName, batchSize, batchTimeout, NewBatchConsumer(ctx, queueName)); err != nil { panic(err) } } @@ -82,16 +82,14 @@ func main() { } type BatchConsumer struct { - ctx context.Context - errChan chan<- error - tag string + ctx context.Context + tag string } -func NewBatchConsumer(ctx context.Context, errChan chan<- error, tag string) *BatchConsumer { +func NewBatchConsumer(ctx context.Context, tag string) *BatchConsumer { return &BatchConsumer{ - ctx: ctx, - errChan: errChan, - tag: tag, + ctx: ctx, + tag: tag, } } @@ -101,7 +99,7 @@ func (consumer *BatchConsumer) Consume(batch rmq.Deliveries) { time.Sleep(consumeDuration) log.Printf("%s consumed %d: %s", consumer.tag, len(batch), batch[0]) - errors := batch.Ack(consumer.ctx, consumer.errChan) + errors := batch.Ack(consumer.ctx) if len(errors) == 0 { debugf("acked %q", payloads) return diff --git a/example/consumer/main.go b/example/consumer/main.go index 419a9b2..2938dbe 100644 --- a/example/consumer/main.go +++ b/example/consumer/main.go @@ -51,7 +51,7 @@ func main() { panic(err) } - if err := queue.StartConsuming(unackedLimit, 500*time.Millisecond, errChan); err != nil { + if err := queue.StartConsuming(unackedLimit, 500*time.Millisecond); err != nil { panic(err) } @@ -59,7 +59,7 @@ func main() { for i := 0; i < numConsumers; i++ { name := fmt.Sprintf("consumer %d", i) - if _, err := queue.AddConsumer(name, NewConsumer(ctx, errChan, i)); err != nil { + if _, err := queue.AddConsumer(name, NewConsumer(ctx, i)); err != nil { panic(err) } } @@ -82,20 +82,18 @@ func main() { } type Consumer struct { - ctx context.Context - errChan chan<- error - name string - count int - before time.Time + ctx context.Context + name string + count int + before time.Time } -func NewConsumer(ctx context.Context, errChan chan<- error, tag int) *Consumer { +func NewConsumer(ctx context.Context, tag int) *Consumer { return &Consumer{ - ctx: ctx, - errChan: errChan, - name: fmt.Sprintf("consumer%d", tag), - count: 0, - before: time.Now(), + ctx: ctx, + name: fmt.Sprintf("consumer%d", tag), + count: 0, + before: time.Now(), } } @@ -113,13 +111,13 @@ func (consumer *Consumer) Consume(delivery rmq.Delivery) { } if consumer.count%batchSize > 0 { - if err := delivery.Ack(consumer.ctx, consumer.errChan); err != nil { + if err := delivery.Ack(consumer.ctx); err != nil { debugf("failed to ack %s: %s", payload, err) } else { debugf("acked %s", payload) } } else { // reject one per batch - if err := delivery.Reject(consumer.ctx, consumer.errChan); err != nil { + if err := delivery.Reject(consumer.ctx); err != nil { debugf("failed to reject %s: %s", payload, err) } else { debugf("rejected %s", payload) diff --git a/queue.go b/queue.go index b7933da..e38adb1 100644 --- a/queue.go +++ b/queue.go @@ -18,7 +18,7 @@ type Queue interface { Publish(payload ...string) error PublishBytes(payload ...[]byte) error SetPushQueue(pushQueue Queue) - StartConsuming(prefetchLimit int64, pollDuration time.Duration, errors chan<- error) error + StartConsuming(prefetchLimit int64, pollDuration time.Duration) error StopConsuming() <-chan struct{} AddConsumer(tag string, consumer Consumer) (string, error) AddConsumerFunc(tag string, consumerFunc ConsumerFunc) (string, error) @@ -49,6 +49,7 @@ type redisQueue struct { unackedKey string // key to list of currently consuming deliveries pushKey string // key to list of pushed deliveries redisClient RedisClient + errChan chan<- error deliveryChan chan Delivery // nil for publish channels, not nil for consuming channels prefetchLimit int64 // max number of prefetched deliveries number of unacked can go up to prefetchLimit + numConsumers pollDuration time.Duration @@ -56,7 +57,14 @@ type redisQueue struct { stopWg sync.WaitGroup } -func newQueue(name, connectionName, queuesKey string, redisClient RedisClient) *redisQueue { +func newQueue( + name string, + connectionName string, + queuesKey string, + redisClient RedisClient, + errChan chan<- error, +) *redisQueue { + consumersKey := strings.Replace(connectionQueueConsumersTemplate, phConnection, connectionName, 1) consumersKey = strings.Replace(consumersKey, phQueue, name, 1) @@ -75,6 +83,7 @@ func newQueue(name, connectionName, queuesKey string, redisClient RedisClient) * rejectedKey: rejectedKey, unackedKey: unackedKey, redisClient: redisClient, + errChan: errChan, } return queue } @@ -111,7 +120,7 @@ func (queue *redisQueue) SetPushQueue(pushQueue Queue) { // StartConsuming starts consuming into a channel of size prefetchLimit // must be called before consumers can be added! // pollDuration is the duration the queue sleeps before checking for new deliveries -func (queue *redisQueue) StartConsuming(prefetchLimit int64, pollDuration time.Duration, errors chan<- error) error { +func (queue *redisQueue) StartConsuming(prefetchLimit int64, pollDuration time.Duration) error { if queue.deliveryChan != nil { return ErrorAlreadyConsuming } @@ -126,11 +135,11 @@ func (queue *redisQueue) StartConsuming(prefetchLimit int64, pollDuration time.D queue.deliveryChan = make(chan Delivery, prefetchLimit) queue.consumingStopped = make(chan struct{}) // log.Printf("rmq queue started consuming %s %d %s", queue, prefetchLimit, pollDuration) - go queue.consume(errors) + go queue.consume() return nil } -func (queue *redisQueue) consume(errors chan<- error) { +func (queue *redisQueue) consume() { errorCount := 0 // number of consecutive batch errors for { @@ -145,7 +154,7 @@ func (queue *redisQueue) consume(errors chan<- error) { default: // redis error errorCount++ select { // try to add error to channel, but don't block - case errors <- &ConsumeError{RedisErr: err, Count: errorCount}: + case queue.errChan <- &ConsumeError{RedisErr: err, Count: errorCount}: default: } time.Sleep(queue.pollDuration) // sleep before retry @@ -205,7 +214,7 @@ func (queue *redisQueue) newDelivery(payload string) Delivery { queue.rejectedKey, queue.pushKey, queue.redisClient, - // queue.errChan, + queue.errChan, ) } diff --git a/queue_test.go b/queue_test.go index 868f8f4..72ecd02 100644 --- a/queue_test.go +++ b/queue_test.go @@ -79,7 +79,7 @@ func TestConnectionQueues(t *testing.T) { queues, err = connection.getConsumingQueues() assert.NoError(t, err) assert.Len(t, queues, 0) - assert.NoError(t, queue1.StartConsuming(1, time.Millisecond, nil)) + assert.NoError(t, queue1.StartConsuming(1, time.Millisecond)) queues, err = connection.getConsumingQueues() assert.NoError(t, err) assert.Equal(t, []string{"conn-q-q1"}, queues) @@ -93,7 +93,7 @@ func TestConnectionQueues(t *testing.T) { queues, err = connection.getConsumingQueues() assert.NoError(t, err) assert.Len(t, queues, 1) - assert.NoError(t, queue2.StartConsuming(1, time.Millisecond, nil)) + assert.NoError(t, queue2.StartConsuming(1, time.Millisecond)) queues, err = connection.getConsumingQueues() assert.NoError(t, err) assert.Len(t, queues, 2) @@ -162,8 +162,8 @@ func TestQueueCommon(t *testing.T) { queues, err := connection.getConsumingQueues() assert.NoError(t, err) assert.Len(t, queues, 0) - assert.NoError(t, queue.StartConsuming(10, time.Millisecond, nil)) - assert.Equal(t, ErrorAlreadyConsuming, queue.StartConsuming(10, time.Millisecond, nil)) + assert.NoError(t, queue.StartConsuming(10, time.Millisecond)) + assert.Equal(t, ErrorAlreadyConsuming, queue.StartConsuming(10, time.Millisecond)) cons1name, err := queue.AddConsumer("queue-cons1", NewTestConsumer("queue-A")) assert.NoError(t, err) time.Sleep(time.Millisecond) @@ -196,7 +196,7 @@ func TestConsumerCommon(t *testing.T) { consumer := NewTestConsumer("cons-A") consumer.AutoAck = false - assert.NoError(t, queue1.StartConsuming(10, time.Millisecond, nil)) + assert.NoError(t, queue1.StartConsuming(10, time.Millisecond)) _, err = queue1.AddConsumer("cons-cons", consumer) assert.NoError(t, err) assert.Nil(t, consumer.LastDelivery) @@ -222,7 +222,7 @@ func TestConsumerCommon(t *testing.T) { assert.NoError(t, err) assert.Equal(t, int64(2), count) - assert.NoError(t, consumer.LastDeliveries[0].Ack(nil, nil)) + assert.NoError(t, consumer.LastDeliveries[0].Ack(nil)) count, err = queue1.readyCount() assert.NoError(t, err) assert.Equal(t, int64(0), count) @@ -230,7 +230,7 @@ func TestConsumerCommon(t *testing.T) { assert.NoError(t, err) assert.Equal(t, int64(1), count) - assert.NoError(t, consumer.LastDeliveries[1].Ack(nil, nil)) + assert.NoError(t, consumer.LastDeliveries[1].Ack(nil)) count, err = queue1.readyCount() assert.NoError(t, err) assert.Equal(t, int64(0), count) @@ -238,7 +238,7 @@ func TestConsumerCommon(t *testing.T) { assert.NoError(t, err) assert.Equal(t, int64(0), count) - assert.Equal(t, ErrorNotFound, consumer.LastDeliveries[0].Ack(nil, nil)) + assert.Equal(t, ErrorNotFound, consumer.LastDeliveries[0].Ack(nil)) assert.NoError(t, queue1.Publish("cons-d3")) time.Sleep(2 * time.Millisecond) @@ -252,7 +252,7 @@ func TestConsumerCommon(t *testing.T) { assert.NoError(t, err) assert.Equal(t, int64(0), count) assert.Equal(t, "cons-d3", consumer.LastDelivery.Payload()) - assert.NoError(t, consumer.LastDelivery.Reject(nil, nil)) + assert.NoError(t, consumer.LastDelivery.Reject(nil)) count, err = queue1.readyCount() assert.NoError(t, err) assert.Equal(t, int64(0), count) @@ -275,7 +275,7 @@ func TestConsumerCommon(t *testing.T) { assert.NoError(t, err) assert.Equal(t, int64(1), count) assert.Equal(t, "cons-d4", consumer.LastDelivery.Payload()) - assert.NoError(t, consumer.LastDelivery.Reject(nil, nil)) + assert.NoError(t, consumer.LastDelivery.Reject(nil)) count, err = queue1.readyCount() assert.NoError(t, err) assert.Equal(t, int64(0), count) @@ -297,13 +297,13 @@ func TestConsumerCommon(t *testing.T) { queue2, err := connection.OpenQueue("cons-func-q") assert.NoError(t, err) - assert.NoError(t, queue2.StartConsuming(10, time.Millisecond, nil)) + assert.NoError(t, queue2.StartConsuming(10, time.Millisecond)) payloadChan := make(chan string, 1) payload := "cons-func-payload" _, err = queue2.AddConsumerFunc("cons-func", func(delivery Delivery) { - err = delivery.Ack(nil, nil) + err = delivery.Ack(nil) assert.NoError(t, err) payloadChan <- delivery.Payload() }) @@ -343,7 +343,7 @@ func TestMulti(t *testing.T) { assert.NoError(t, err) assert.Equal(t, int64(0), count) - assert.NoError(t, queue.StartConsuming(10, time.Millisecond, nil)) + assert.NoError(t, queue.StartConsuming(10, time.Millisecond)) time.Sleep(2 * time.Millisecond) count, err = queue.readyCount() assert.NoError(t, err) @@ -366,7 +366,7 @@ func TestMulti(t *testing.T) { assert.NoError(t, err) assert.Equal(t, int64(10), count) - assert.NoError(t, consumer.LastDelivery.Ack(nil, nil)) + assert.NoError(t, consumer.LastDelivery.Ack(nil)) time.Sleep(10 * time.Millisecond) count, err = queue.readyCount() assert.NoError(t, err) @@ -384,7 +384,7 @@ func TestMulti(t *testing.T) { assert.NoError(t, err) assert.Equal(t, int64(10), count) - assert.NoError(t, consumer.LastDelivery.Ack(nil, nil)) + assert.NoError(t, consumer.LastDelivery.Ack(nil)) time.Sleep(10 * time.Millisecond) count, err = queue.readyCount() assert.NoError(t, err) @@ -421,7 +421,7 @@ func TestBatch(t *testing.T) { assert.NoError(t, err) } - assert.NoError(t, queue.StartConsuming(10, time.Millisecond, nil)) + assert.NoError(t, queue.StartConsuming(10, time.Millisecond)) time.Sleep(10 * time.Millisecond) count, err := queue.unackedCount() assert.NoError(t, err) @@ -434,8 +434,8 @@ func TestBatch(t *testing.T) { require.Len(t, consumer.LastBatch, 2) assert.Equal(t, "batch-d0", consumer.LastBatch[0].Payload()) assert.Equal(t, "batch-d1", consumer.LastBatch[1].Payload()) - assert.NoError(t, consumer.LastBatch[0].Reject(nil, nil)) - assert.NoError(t, consumer.LastBatch[1].Ack(nil, nil)) + assert.NoError(t, consumer.LastBatch[0].Reject(nil)) + assert.NoError(t, consumer.LastBatch[1].Ack(nil)) count, err = queue.unackedCount() assert.NoError(t, err) assert.Equal(t, int64(3), count) @@ -448,8 +448,8 @@ func TestBatch(t *testing.T) { require.Len(t, consumer.LastBatch, 2) assert.Equal(t, "batch-d2", consumer.LastBatch[0].Payload()) assert.Equal(t, "batch-d3", consumer.LastBatch[1].Payload()) - assert.NoError(t, consumer.LastBatch[0].Reject(nil, nil)) - assert.NoError(t, consumer.LastBatch[1].Ack(nil, nil)) + assert.NoError(t, consumer.LastBatch[0].Reject(nil)) + assert.NoError(t, consumer.LastBatch[1].Ack(nil)) count, err = queue.unackedCount() assert.NoError(t, err) assert.Equal(t, int64(1), count) @@ -470,7 +470,7 @@ func TestBatch(t *testing.T) { time.Sleep(60 * time.Millisecond) require.Len(t, consumer.LastBatch, 1) assert.Equal(t, "batch-d4", consumer.LastBatch[0].Payload()) - assert.NoError(t, consumer.LastBatch[0].Reject(nil, nil)) + assert.NoError(t, consumer.LastBatch[0].Reject(nil)) count, err = queue.unackedCount() assert.NoError(t, err) assert.Equal(t, int64(0), count) @@ -502,7 +502,7 @@ func TestReturnRejected(t *testing.T) { assert.NoError(t, err) assert.Equal(t, int64(0), count) - assert.NoError(t, queue.StartConsuming(10, time.Millisecond, nil)) + assert.NoError(t, queue.StartConsuming(10, time.Millisecond)) time.Sleep(time.Millisecond) count, err = queue.readyCount() assert.NoError(t, err) @@ -530,12 +530,12 @@ func TestReturnRejected(t *testing.T) { assert.Equal(t, int64(0), count) assert.Len(t, consumer.LastDeliveries, 6) - assert.NoError(t, consumer.LastDeliveries[0].Reject(nil, nil)) - assert.NoError(t, consumer.LastDeliveries[1].Ack(nil, nil)) - assert.NoError(t, consumer.LastDeliveries[2].Reject(nil, nil)) - assert.NoError(t, consumer.LastDeliveries[3].Reject(nil, nil)) + assert.NoError(t, consumer.LastDeliveries[0].Reject(nil)) + assert.NoError(t, consumer.LastDeliveries[1].Ack(nil)) + assert.NoError(t, consumer.LastDeliveries[2].Reject(nil)) + assert.NoError(t, consumer.LastDeliveries[3].Reject(nil)) // delivery 4 still open - assert.NoError(t, consumer.LastDeliveries[5].Reject(nil, nil)) + assert.NoError(t, consumer.LastDeliveries[5].Reject(nil)) time.Sleep(time.Millisecond) count, err = queue.readyCount() @@ -590,14 +590,14 @@ func TestPushQueue(t *testing.T) { consumer1 := NewTestConsumer("push-cons") consumer1.AutoAck = false consumer1.AutoFinish = false - assert.NoError(t, queue1.StartConsuming(10, time.Millisecond, nil)) + assert.NoError(t, queue1.StartConsuming(10, time.Millisecond)) _, err = queue1.AddConsumer("push-cons", consumer1) assert.NoError(t, err) consumer2 := NewTestConsumer("push-cons") consumer2.AutoAck = false consumer2.AutoFinish = false - assert.NoError(t, queue2.StartConsuming(10, time.Millisecond, nil)) + assert.NoError(t, queue2.StartConsuming(10, time.Millisecond)) _, err = queue2.AddConsumer("push-cons", consumer2) assert.NoError(t, err) @@ -608,7 +608,7 @@ func TestPushQueue(t *testing.T) { assert.Equal(t, int64(1), count) require.Len(t, consumer1.LastDeliveries, 1) - assert.NoError(t, consumer1.LastDelivery.Push(nil, nil)) + assert.NoError(t, consumer1.LastDelivery.Push(nil)) time.Sleep(2 * time.Millisecond) count, err = queue1.unackedCount() assert.NoError(t, err) @@ -618,7 +618,7 @@ func TestPushQueue(t *testing.T) { assert.Equal(t, int64(1), count) require.Len(t, consumer2.LastDeliveries, 1) - assert.NoError(t, consumer2.LastDelivery.Push(nil, nil)) + assert.NoError(t, consumer2.LastDelivery.Push(nil)) time.Sleep(2 * time.Millisecond) count, err = queue2.rejectedCount() assert.NoError(t, err) @@ -639,7 +639,7 @@ func TestConsuming(t *testing.T) { t.FailNow() // should return closed finishedChan } - assert.NoError(t, queue.StartConsuming(10, time.Millisecond, nil)) + assert.NoError(t, queue.StartConsuming(10, time.Millisecond)) assert.NotNil(t, queue.StopConsuming()) // already stopped assert.NotNil(t, queue.StopConsuming()) @@ -665,7 +665,7 @@ func TestStopConsuming_Consumer(t *testing.T) { assert.NoError(t, err) } - assert.NoError(t, queue.StartConsuming(20, time.Millisecond, nil)) + assert.NoError(t, queue.StartConsuming(20, time.Millisecond)) var consumers []*TestConsumer for i := 0; i < 10; i++ { consumer := NewTestConsumer("c" + strconv.Itoa(i)) @@ -709,7 +709,7 @@ func TestStopConsuming_BatchConsumer(t *testing.T) { assert.NoError(t, err) } - assert.NoError(t, queue.StartConsuming(20, time.Millisecond, nil)) + assert.NoError(t, queue.StartConsuming(20, time.Millisecond)) var consumers []*TestBatchConsumer for i := 0; i < 10; i++ { @@ -756,7 +756,7 @@ func BenchmarkQueue(b *testing.B) { consumer := NewTestConsumer("bench-A") // consumer.SleepDuration = time.Microsecond consumers = append(consumers, consumer) - assert.NoError(b, queue.StartConsuming(10, time.Millisecond, nil)) + assert.NoError(b, queue.StartConsuming(10, time.Millisecond)) _, err = queue.AddConsumer("bench-cons", consumer) assert.NoError(b, err) } diff --git a/stats_test.go b/stats_test.go index 1b7f5f4..4731c63 100644 --- a/stats_test.go +++ b/stats_test.go @@ -29,15 +29,15 @@ func TestStats(t *testing.T) { assert.NoError(t, err) consumer := NewTestConsumer("hand-A") consumer.AutoAck = false - assert.NoError(t, q2.StartConsuming(10, time.Millisecond, nil)) + assert.NoError(t, q2.StartConsuming(10, time.Millisecond)) _, err = q2.AddConsumer("stats-cons1", consumer) assert.NoError(t, err) assert.NoError(t, q2.Publish("stats-d2")) assert.NoError(t, q2.Publish("stats-d3")) assert.NoError(t, q2.Publish("stats-d4")) time.Sleep(2 * time.Millisecond) - assert.NoError(t, consumer.LastDeliveries[0].Ack(nil, nil)) - assert.NoError(t, consumer.LastDeliveries[1].Reject(nil, nil)) + assert.NoError(t, consumer.LastDeliveries[0].Ack(nil)) + assert.NoError(t, consumer.LastDeliveries[1].Reject(nil)) _, err = q2.AddConsumer("stats-cons2", NewTestConsumer("hand-B")) assert.NoError(t, err) diff --git a/test_batch_consumer.go b/test_batch_consumer.go index c745a43..60404ff 100644 --- a/test_batch_consumer.go +++ b/test_batch_consumer.go @@ -18,7 +18,7 @@ func (consumer *TestBatchConsumer) Consume(batch Deliveries) { consumer.LastBatch = batch consumer.ConsumedCount += int64(len(batch)) if consumer.AutoFinish { - batch.Ack(nil, nil) + batch.Ack(nil) } else { <-consumer.finish // log.Printf("TestBatchConsumer.Consume() finished") diff --git a/test_consumer.go b/test_consumer.go index d8b956d..68fadda 100644 --- a/test_consumer.go +++ b/test_consumer.go @@ -37,7 +37,7 @@ func (consumer *TestConsumer) Consume(delivery Delivery) { time.Sleep(consumer.SleepDuration) } if consumer.AutoAck { - if err := delivery.Ack(nil, nil); err != nil { + if err := delivery.Ack(nil); err != nil { panic(err) } } diff --git a/test_delivery.go b/test_delivery.go index c437e17..0e94b85 100644 --- a/test_delivery.go +++ b/test_delivery.go @@ -33,7 +33,7 @@ func (delivery *TestDelivery) Payload() string { return delivery.payload } -func (delivery *TestDelivery) Ack(context.Context, chan<- error) error { +func (delivery *TestDelivery) Ack(context.Context) error { if delivery.State != Unacked { return ErrorNotFound } @@ -41,7 +41,7 @@ func (delivery *TestDelivery) Ack(context.Context, chan<- error) error { return nil } -func (delivery *TestDelivery) Reject(context.Context, chan<- error) error { +func (delivery *TestDelivery) Reject(context.Context) error { if delivery.State != Unacked { return ErrorNotFound } @@ -49,7 +49,7 @@ func (delivery *TestDelivery) Reject(context.Context, chan<- error) error { return nil } -func (delivery *TestDelivery) Push(context.Context, chan<- error) error { +func (delivery *TestDelivery) Push(context.Context) error { if delivery.State != Unacked { return ErrorNotFound } diff --git a/test_delivery_test.go b/test_delivery_test.go index 1785982..a97ad0d 100644 --- a/test_delivery_test.go +++ b/test_delivery_test.go @@ -9,28 +9,28 @@ import ( func TestDeliveryPayload(t *testing.T) { var delivery Delivery delivery = NewTestDelivery("p23") - assert.NoError(t, delivery.Ack(nil, nil)) + assert.NoError(t, delivery.Ack(nil)) assert.Equal(t, "p23", delivery.Payload()) } func TestDeliveryAck(t *testing.T) { delivery := NewTestDelivery("p") assert.Equal(t, Unacked, delivery.State) - assert.NoError(t, delivery.Ack(nil, nil)) + assert.NoError(t, delivery.Ack(nil)) assert.Equal(t, Acked, delivery.State) - assert.Equal(t, ErrorNotFound, delivery.Ack(nil, nil)) - assert.Equal(t, ErrorNotFound, delivery.Reject(nil, nil)) + assert.Equal(t, ErrorNotFound, delivery.Ack(nil)) + assert.Equal(t, ErrorNotFound, delivery.Reject(nil)) assert.Equal(t, Acked, delivery.State) } func TestDeliveryReject(t *testing.T) { delivery := NewTestDelivery("p") assert.Equal(t, Unacked, delivery.State) - assert.NoError(t, delivery.Reject(nil, nil)) + assert.NoError(t, delivery.Reject(nil)) assert.Equal(t, Rejected, delivery.State) - assert.Equal(t, ErrorNotFound, delivery.Reject(nil, nil)) - assert.Equal(t, ErrorNotFound, delivery.Ack(nil, nil)) + assert.Equal(t, ErrorNotFound, delivery.Reject(nil)) + assert.Equal(t, ErrorNotFound, delivery.Ack(nil)) assert.Equal(t, Rejected, delivery.State) } diff --git a/test_queue.go b/test_queue.go index 91c7eff..60d6d93 100644 --- a/test_queue.go +++ b/test_queue.go @@ -30,10 +30,8 @@ func (queue *TestQueue) PublishBytes(payload ...[]byte) error { return queue.Publish(stringifiedBytes...) } -func (*TestQueue) SetPushQueue(Queue) { panic(errorNotSupported) } -func (*TestQueue) StartConsuming(int64, time.Duration, chan<- error) error { - panic(errorNotSupported) -} +func (*TestQueue) SetPushQueue(Queue) { panic(errorNotSupported) } +func (*TestQueue) StartConsuming(int64, time.Duration) error { panic(errorNotSupported) } func (*TestQueue) StopConsuming() <-chan struct{} { panic(errorNotSupported) } func (*TestQueue) AddConsumer(string, Consumer) (string, error) { panic(errorNotSupported) } func (*TestQueue) AddConsumerFunc(string, ConsumerFunc) (string, error) { panic(errorNotSupported) }