6
6
7
7
# Golang asynchronous task/job queue with Redis, SQS, IronMQ, and in-memory backends
8
8
9
- [ ![ Build Status ] ( https://travis-ci.org /vmihailenco/taskq.svg )] ( https://travis-ci.org/vmihailenco/taskq )
10
- [ ![ GoDoc ] ( https://godoc.org/ github.com/vmihailenco/taskq?status.svg )] ( https://pkg.go.dev/github.com/vmihailenco/taskq/v3?tab=doc )
9
+ ![ build workflow ] ( https://github.com /vmihailenco/taskq/actions/workflows/build.yml/badge.svg )
10
+ [ ![ PkgGoDev ] ( https://pkg.go.dev/badge/ github.com/vmihailenco/taskq/v3 )] ( https://pkg.go.dev/github.com/vmihailenco/taskq/v3?tab=doc )
11
11
12
12
## Installation
13
13
14
14
taskq supports 2 last Go versions and requires a Go version with
15
- [ modules] ( https://github.com/golang/go/wiki/Modules ) support. So make sure to
16
- initialize a Go module:
15
+ [ modules] ( https://github.com/golang/go/wiki/Modules ) support. So make sure to initialize a Go
16
+ module:
17
17
18
18
``` shell
19
19
go mod init github.com/my/repo
20
20
```
21
21
22
- And then install taskq/v3 (note _ v3_ in the import; omitting it is a popular
23
- mistake):
22
+ And then install taskq/v3 (note _ v3_ in the import; omitting it is a popular mistake):
24
23
25
24
``` shell
26
25
go get github.com/vmihailenco/taskq/v3
@@ -29,16 +28,14 @@ go get github.com/vmihailenco/taskq/v3
29
28
## Features
30
29
31
30
- Redis, SQS, IronMQ, and in-memory backends.
32
- - Automatically scaling number of goroutines used to fetch (fetcher) and process
33
- messages (worker).
31
+ - Automatically scaling number of goroutines used to fetch (fetcher) and process messages (worker).
34
32
- Global rate limiting.
35
33
- Global limit of workers.
36
34
- Call once - deduplicating messages with same name.
37
35
- Automatic retries with exponential backoffs.
38
36
- Automatic pausing when all messages in queue fail.
39
37
- Fallback handler for processing failed messages.
40
- - Message batching. It is used in SQS and IronMQ backends to add/delete messages
41
- in batches.
38
+ - Message batching. It is used in SQS and IronMQ backends to add/delete messages in batches.
42
39
- Automatic message compression using snappy / s2.
43
40
44
41
## Quickstart
@@ -54,8 +51,8 @@ This way you can:
54
51
- Scale API and worker separately.
55
52
- Have different configs for API and worker (like timeouts).
56
53
57
- There is an [ api_worker example] ( example/api_worker ) that demonstrates this
58
- approach using Redis as a backend:
54
+ There is an [ api_worker example] ( example/api_worker ) that demonstrates this approach using Redis as
55
+ a backend:
59
56
60
57
``` bash
61
58
cd example/api_worker
@@ -166,33 +163,28 @@ for i := 0; i < 100; i++ {
166
163
167
164
## Message deduplication
168
165
169
- If a ` Message ` has a ` Name ` then this will be used as unique identifier and
170
- messages with the same name will be deduplicated (i.e. not processed again)
171
- within a 24 hour period (or possibly longer if not evicted from local cache
172
- after that period). Where ` Name ` is omitted then non deduplication occurs and
173
- each message will be processed. ` Task ` 's ` WithMessage ` and ` WithArgs ` both
174
- produces messages with no ` Name ` so will not be deduplicated. ` OnceWithArgs `
175
- sets a name based off a consistent hash of the arguments and a quantised period
176
- of time (i.e. 'this hour', 'today') passed to ` OnceWithArgs ` a ` period ` . This
177
- guarantees that the same function will not be called with the same arguments
178
- during `period'.
166
+ If a ` Message ` has a ` Name ` then this will be used as unique identifier and messages with the same
167
+ name will be deduplicated (i.e. not processed again) within a 24 hour period (or possibly longer if
168
+ not evicted from local cache after that period). Where ` Name ` is omitted then non deduplication
169
+ occurs and each message will be processed. ` Task ` 's ` WithMessage ` and ` WithArgs ` both produces
170
+ messages with no ` Name ` so will not be deduplicated. ` OnceWithArgs ` sets a name based off a
171
+ consistent hash of the arguments and a quantised period of time (i.e. 'this hour', 'today') passed
172
+ to ` OnceWithArgs ` a ` period ` . This guarantees that the same function will not be called with the
173
+ same arguments during `period'.
179
174
180
175
## Handlers
181
176
182
- A ` Handler ` and ` FallbackHandler ` are supplied to ` RegisterTask ` in the
183
- ` TaskOptions ` .
177
+ A ` Handler ` and ` FallbackHandler ` are supplied to ` RegisterTask ` in the ` TaskOptions ` .
184
178
185
179
There are three permitted types of signature:
186
180
187
181
1 . A zero-argument function
188
- 2 . A function whose arguments are assignable in type from those which are passed
189
- in the message
182
+ 2 . A function whose arguments are assignable in type from those which are passed in the message
190
183
3 . A function which takes a single ` *Message ` argument
191
184
192
- If a task is registered with a handler that takes a Go ` context.Context ` as its
193
- first argument then when that handler is invoked it will be passed the same
194
- ` Context ` that was passed to ` Consumer.Start(ctx) ` . This can be used to transmit
195
- a signal to abort to all tasks being processed:
185
+ If a task is registered with a handler that takes a Go ` context.Context ` as its first argument then
186
+ when that handler is invoked it will be passed the same ` Context ` that was passed to
187
+ ` Consumer.Start(ctx) ` . This can be used to transmit a signal to abort to all tasks being processed:
196
188
197
189
``` go
198
190
var AbortableTask = MainQueue .RegisterTask (&taskq.TaskOptions {
@@ -214,8 +206,8 @@ var AbortableTask = MainQueue.RegisterTask(&taskq.TaskOptions{
214
206
215
207
## Custom message delay
216
208
217
- If error returned by handler implements ` Delay() time.Duration ` interface then
218
- that delay is used to postpone message processing.
209
+ If error returned by handler implements ` Delay() time.Duration ` interface then that delay is used to
210
+ postpone message processing.
219
211
220
212
``` go
221
213
type RateLimitError string
@@ -235,9 +227,8 @@ func handler() error {
235
227
236
228
## Tracing
237
229
238
- taskq supports tracing out-of-the-box using
239
- [ OpenTelemetry] ( https://opentelemetry.io/ ) API. To instrument a queue, use the
240
- following code:
230
+ taskq supports tracing out-of-the-box using [ OpenTelemetry] ( https://opentelemetry.io/ ) API. To
231
+ instrument a queue, use the following code:
241
232
242
233
``` go
243
234
import " github.com/vmihailenco/taskq/extra/taskqotel/v3"
@@ -257,5 +248,4 @@ factory.Range(func(q taskq.Queue) bool {
257
248
})
258
249
```
259
250
260
- We recommend using [ Uptrace.dev] ( https://github.com/uptrace/uptrace-go ) as a
261
- tracing backend.
251
+ We recommend using [ Uptrace.dev] ( https://github.com/uptrace/uptrace-go ) as a tracing backend.
0 commit comments