8
8
> [ distributed tracing tool] ( https://get.uptrace.dev/compare/distributed-tracing-tools.html ) powered
9
9
> by OpenTelemetry and ClickHouse. Give it a star as well!
10
10
11
- ## Installation
12
-
13
- taskq supports 2 last Go versions and requires a Go version with
14
- [ modules] ( https://github.com/golang/go/wiki/Modules ) support. So make sure to initialize a Go
15
- module:
16
-
17
- ``` shell
18
- go mod init github.com/my/repo
19
- ```
20
-
21
- And then install taskq/v3 (note _ v3_ in the import; omitting it is a popular mistake):
22
-
23
- ``` shell
24
- go get github.com/vmihailenco/taskq/v3
25
- ```
26
-
27
11
## Features
28
12
29
13
- Redis, SQS, IronMQ, and in-memory backends.
@@ -37,224 +21,64 @@ go get github.com/vmihailenco/taskq/v3
37
21
- Message batching. It is used in SQS and IronMQ backends to add/delete messages in batches.
38
22
- Automatic message compression using snappy / s2.
39
23
40
- ## Quickstart
41
-
42
- I recommend that you split your app into the two parts:
43
-
44
- - An API that accepts requests from customers and adds tasks to the queues.
45
- - A Worker that fetches tasks from the queues and processes them.
46
-
47
- This way you can:
24
+ ## Getting started
48
25
49
- - Isolate API and worker from each other.
50
- - Scale API and worker separately.
51
- - Have different configs for API and worker (like timeouts).
26
+ To get started, see [ Golang Task Queue] ( https://taskq.uptrace.dev/ ) documentation.
52
27
53
- There is an [ api_worker example] ( example/api_worker ) that demonstrates this approach using Redis as
54
- a backend:
55
-
56
- ``` bash
57
- cd example/api_worker
58
- go run worker/worker.go
59
- go run api/api.go
60
- ```
61
-
62
- You start by choosing a backend to use - in our case Redis:
28
+ ** Producer** :
63
29
64
30
``` go
65
- package api_worker
31
+ import (
32
+ " github.com/vmihailenco/taskq/v3"
33
+ " github.com/vmihailenco/taskq/v3/redisq"
34
+ )
66
35
36
+ // Create a queue factory.
67
37
var QueueFactory = redisq.NewFactory ()
68
- ```
69
-
70
- Using that factory you create a queue that contains tasks:
71
38
72
- ``` go
39
+ // Create a queue.
73
40
var MainQueue = QueueFactory .RegisterQueue (&taskq.QueueOptions {
74
41
Name : " api-worker" ,
75
42
Redis : Redis , // go-redis client
76
43
})
77
- ```
78
-
79
- Using the queue you create a task with handler that does some useful work:
80
44
81
- ``` go
45
+ // Register a task.
82
46
var CountTask = taskq.RegisterTask (&taskq.TaskOptions {
83
47
Name : " counter" ,
84
48
Handler : func () error {
85
49
IncrLocalCounter ()
86
50
return nil
87
51
},
88
52
})
89
- ```
90
53
91
- Then in an API binary you use tasks to add messages/jobs to queues:
92
-
93
- ``` go
94
54
ctx := context.Background ()
55
+
56
+ // And start producing.
95
57
for {
96
- // call task handler without any args
97
- err := api_worker.MainQueue .Add (api_worker.CountTask .WithArgs (ctx))
98
- if err != nil {
99
- log.Fatal (err)
100
- }
58
+ // Call the task without any args.
59
+ err := MainQueue .Add (CountTask.WithArgs (ctx))
60
+ if err != nil {
61
+ panic (err)
62
+ }
63
+ time.Sleep (time.Second )
101
64
}
102
65
```
103
66
104
- And in a worker binary you start processing queues :
67
+ ** Consumer ** :
105
68
106
69
``` go
107
- err := api_worker. MainQueue . Start (context. Background ())
108
- if err != nil {
70
+ // Start consuming the queue.
71
+ if err := MainQueue . Start (context. Background ()); err != nil {
109
72
log.Fatal (err)
110
73
}
111
74
```
112
75
113
- ## API overview
114
-
115
- ``` go
116
- t := myQueue.RegisterTask (&taskq.TaskOptions {
117
- Name : " greeting" ,
118
- Handler : func (name string ) error {
119
- fmt.Println (" Hello" , name)
120
- return nil
121
- },
122
- })
123
-
124
- // Say "Hello World".
125
- err := myQueue.Add (t.WithArgs (context.Background (), " World" ))
126
- if err != nil {
127
- panic (err)
128
- }
129
-
130
- // Say "Hello World" with 1 hour delay.
131
- msg := t.WithArgs (ctx, " World" )
132
- msg.Delay = time.Hour
133
- _ = myQueue.Add (msg)
134
-
135
- // Say "Hello World" once.
136
- for i := 0 ; i < 100 ; i++ {
137
- msg := t.WithArgs (ctx, " World" )
138
- msg.Name = " hello-world" // unique
139
- _ = myQueue.Add (msg)
140
- }
141
-
142
- // Say "Hello World" once with 1 hour delay.
143
- for i := 0 ; i < 100 ; i++ {
144
- msg := t.WithArgs (ctx, " World" )
145
- msg.Name = " hello-world"
146
- msg.Delay = time.Hour
147
- _ = myQueue.Add (msg)
148
- }
149
-
150
- // Say "Hello World" once in an hour.
151
- for i := 0 ; i < 100 ; i++ {
152
- msg := t.WithArgs (ctx, " World" ).OnceInPeriod (time.Hour )
153
- _ = myQueue.Add (msg)
154
- }
155
-
156
- // Say "Hello World" for Europe region once in an hour.
157
- for i := 0 ; i < 100 ; i++ {
158
- msg := t.WithArgs (ctx, " World" ).OnceInPeriod (time.Hour , " World" , " europe" )
159
- _ = myQueue.Add (msg)
160
- }
161
- ```
162
-
163
- ## Message deduplication
164
-
165
- If a ` Message ` has a ` Name ` then this will be used as unique identifier and messages with the same
166
- name will be deduplicated (i.e. not processed again) within a 24 hour period (or possibly longer if
167
- not evicted from local cache after that period). Where ` Name ` is omitted then non deduplication
168
- occurs and each message will be processed. ` Task ` 's ` WithMessage ` and ` WithArgs ` both produces
169
- messages with no ` Name ` so will not be deduplicated. ` OnceWithArgs ` sets a name based off a
170
- consistent hash of the arguments and a quantised period of time (i.e. 'this hour', 'today') passed
171
- to ` OnceWithArgs ` a ` period ` . This guarantees that the same function will not be called with the
172
- same arguments during `period'.
173
-
174
- ## Handlers
175
-
176
- A ` Handler ` and ` FallbackHandler ` are supplied to ` RegisterTask ` in the ` TaskOptions ` .
177
-
178
- There are three permitted types of signature:
179
-
180
- 1 . A zero-argument function
181
- 2 . A function whose arguments are assignable in type from those which are passed in the message
182
- 3 . A function which takes a single ` *Message ` argument
183
-
184
- If a task is registered with a handler that takes a Go ` context.Context ` as its first argument then
185
- when that handler is invoked it will be passed the same ` Context ` that was passed to
186
- ` Consumer.Start(ctx) ` . This can be used to transmit a signal to abort to all tasks being processed:
187
-
188
- ``` go
189
- var AbortableTask = MainQueue .RegisterTask (&taskq.TaskOptions {
190
- Name : " SomethingLongwinded" ,
191
- Handler : func (ctx context.Context ) error {
192
- for range time.Tick (time.Second ) {
193
- select {
194
- case <- ctx.Done ():
195
- return ctx.Err ()
196
- default :
197
- fmt.Println (" Wee!" )
198
- }
199
- }
200
- return nil
201
- },
202
- })
203
-
204
- ```
205
-
206
- ## Custom message delay
207
-
208
- If error returned by handler implements ` Delay() time.Duration ` interface then that delay is used to
209
- postpone message processing.
210
-
211
- ``` go
212
- type RateLimitError string
213
-
214
- func (e RateLimitError ) Error () string {
215
- return string (e)
216
- }
217
-
218
- func (RateLimitError ) Delay () time .Duration {
219
- return time.Hour
220
- }
221
-
222
- func handler () error {
223
- return RateLimitError (" calm down" )
224
- }
225
- ```
226
-
227
- ## Tracing
228
-
229
- taskq supports tracing out-of-the-box using [ OpenTelemetry] ( https://opentelemetry.io/ ) API. To
230
- instrument a queue, use the following code:
231
-
232
- ``` go
233
- import " github.com/vmihailenco/taskq/extra/taskqotel/v3"
234
-
235
- consumer := queue.Consumer ()
236
- consumer.AddHook (&taskqotel.OpenTelemetryHook {})
237
- ```
238
-
239
- or using a ` taskq.Factory ` :
240
-
241
- ``` go
242
- factory.Range (func (q taskq.Queue ) bool {
243
- consumer := q.Consumer ()
244
- consumer.AddHook (&taskqext.OpenTelemetryHook {})
245
-
246
- return true
247
- })
248
- ```
249
-
250
- We recommend using [ Uptrace] ( https://github.com/uptrace/uptrace ) as a tracing backend.
251
-
252
76
## See also
253
77
254
78
- [ Golang ORM] ( https://github.com/uptrace/bun ) for PostgreSQL, MySQL, MSSQL, and SQLite
255
79
- [ Golang PostgreSQL] ( https://bun.uptrace.dev/postgres/ )
256
80
- [ Golang HTTP router] ( https://github.com/uptrace/bunrouter )
257
- - [ Golang ClickHouse ORM ] ( https://github.com/uptrace/go-clickhouse )
81
+ - [ Golang ClickHouse] ( https://github.com/uptrace/go-clickhouse )
258
82
259
83
## Contributors
260
84
0 commit comments