Skip to content

Commit 39ffc55

Browse files
committed
Add examples and update readme
1 parent 34f9144 commit 39ffc55

File tree

14 files changed

+384
-314
lines changed

14 files changed

+384
-314
lines changed

README.md

+77-200
Original file line numberDiff line numberDiff line change
@@ -1,214 +1,139 @@
1-
# Golang task/job queue with in-memory, Redis, SQS, and IronMQ backends
1+
# Golang task/job queue with Redis, SQS, IronMQ, and in-memory backends
22

33
[![Build Status](https://travis-ci.org/vmihailenco/taskq.svg)](https://travis-ci.org/vmihailenco/taskq)
4+
[![GoDoc](https://godoc.org/github.com/vmihailenco/taskq?status.svg)](https://godoc.org/github.com/vmihailenco/taskq)
45

56
## Installation
67

7-
``` bash
8+
```bash
89
go get -u github.com/vmihailenco/taskq
910
```
1011

1112
## Features
1213

1314
- Redis, SQS, IronMQ, and in-memory backends.
14-
- Automatically scaling number of goroutines used to fetch and process messages.
15-
- Rate limiting.
15+
- Automatically scaling number of goroutines used to fetch (fetcher) and process messages (worker).
16+
- Global rate limiting.
1617
- Global limit of workers.
17-
- Call once - deduplicating messages.
18+
- Call once - deduplicating messages with same name.
1819
- Automatic retries with exponential backoffs.
1920
- Automatic pausing when all messages in queue fail.
2021
- Fallback handler for processing failed messages.
2122
- Message batching. It is used in SQS and IronMQ backends to add/delete messages in batches.
22-
- Automatic message compression using zstd.
23-
- TDigest statistics.
23+
- Automatic message compression using zstd or snappy.
2424

2525
## Quickstart
2626

27-
API/client that adds messages to queues:
27+
I recommend to split your app into 2 parts:
28+
- API that accepts requests from customers and adds tasks to the queues.
29+
- Worker that fetches tasks from the queues and processes them.
2830

29-
``` go
31+
This way you can:
32+
- isolate API and worker from each other;
33+
- scale API and worker separately;
34+
- have different configs (like timeouts).
35+
36+
There is an [api_worker example](examples/api_worker) that does exactly that and can be run:
3037

38+
```bash
39+
cd examples/api_worker
40+
go run worker/main.go
41+
go run api/main.go
3142
```
3243

33-
## Design overview
44+
First you need to define queue and task in that queue:
3445

35-
go-msgqueue is a thin wrapper for SQS and IronMQ clients that uses Redis to implement rate limiting and call once semantic.
46+
```go
47+
var (
48+
QueueFactory = redisq.NewFactory()
49+
MainQueue = QueueFactory.NewQueue(&taskq.QueueOptions{
50+
Name: "api-worker",
51+
Redis: Redis,
52+
})
53+
CountTask = MainQueue.NewTask(&taskq.TaskOptions{
54+
Name: "counter",
55+
Handler: func() error {
56+
IncrLocalCounter()
57+
return nil
58+
},
59+
})
60+
)
61+
```
3662

37-
go-msgqueue consists of following components:
38-
- memqueue - in memory queue that can be used for local unit testing.
39-
- azsqs - Amazon SQS backend.
40-
- ironmq - IronMQ backend.
41-
- Manager - provides common interface for creating new queues.
42-
- Processor - queue processor that works with memqueue, azsqs, and ironmq.
63+
API that adds tasks:
4364

44-
rate limiting is implemented in the processor package using [redis_rate](https://github.com/go-redis/redis_rate). Call once is implemented in clients by checking if message name exists in Redis database.
65+
``` go
66+
for {
67+
err := api_worker.CountTask.Call()
68+
if err != nil {
69+
log.Fatal(err)
70+
}
71+
api_worker.IncrLocalCounter()
72+
}
73+
```
4574

46-
## API overview
75+
Worker that processes tasks:
4776

48-
```go
49-
import "github.com/go-msgqueue/msgqueue"
50-
import "github.com/go-redis/redis"
51-
import "golang.org/x/time/rate"
52-
53-
// Create in-memory queue that prints greetings.
54-
q := memqueue.NewQueue(&msgqueue.Options{
55-
// Handler is automatically retried on error.
56-
Handler: func(name string) error {
57-
fmt.Println("Hello", name)
58-
return nil
59-
},
77+
``` go
78+
err := api_worker.QueueFactory.StartConsumers()
79+
if err != nil {
80+
log.Fatal(err)
81+
}
82+
```
6083

61-
RateLimit: rate.Every(time.Second),
84+
## API overview
6285

63-
// Redis is only needed for rate limiting and call once.
64-
Redis: redis.NewClient(&redis.Options{
65-
Addr: ":6379",
66-
}),
86+
```go
87+
t := myQueue.NewTask(&taskq.TaskOptions{
88+
Name: "greeting",
89+
Handler: func(name string) error {
90+
fmt.Println("Hello", name)
91+
return nil
92+
},
6793
})
6894

69-
// Invoke handler with arguments.
70-
q.Call("World")
95+
// Say "Hello World".
96+
t.Call("World")
7197

7298
// Same using Message API.
73-
q.Add(msgqueue.NewMessage("World"))
99+
t.AddMessage(taskq.NewMessage("World"))
74100

75101
// Say "Hello World" with 1 hour delay.
76-
msg := msgqueue.NewMessage("World")
102+
msg := taskq.NewMessage("World")
77103
msg.Delay = time.Hour
78-
q.Add(msg)
104+
t.AddMessage(msg)
79105

80106
// Say "Hello World" once.
81107
for i := 0; i < 100; i++ {
82-
msg := msgqueue.NewMessage("hello")
83-
msg.Name = "hello-world"
84-
q.Add(msg)
108+
msg := taskq.NewMessage("hello")
109+
msg.Name = "hello-world" // unique
110+
t.Add(msg)
85111
}
86112

87113
// Say "Hello World" once with 1 hour delay.
88114
for i := 0; i < 100; i++ {
89-
msg := msgqueue.NewMessage("hello")
115+
msg := taskq.NewMessage("hello")
90116
msg.Name = "hello-world"
91117
msg.Delay = time.Hour
92-
q.Add(msg)
118+
t.Add(msg)
93119
}
94120

95121
// Say "Hello World" once in an hour.
96122
for i := 0; i < 100; i++ {
97-
q.CallOnce(time.Hour, "hello")
123+
t.CallOnce(time.Hour, "hello")
98124
}
99125

100126
// Say "Hello World" for Europe region once in an hour.
101127
for i := 0; i < 100; i++ {
102-
msg := msgqueue.NewMessage("hello")
103-
msg.SetDelayName(delay, "europe") // set delay and autogenerate message name
104-
q.Add(msg)
128+
msg := taskq.NewMessage("hello")
129+
msg.OnceWithArgs(time.Hour, "europe") // set delay and autogenerate message name
130+
t.Add(msg)
105131
}
106132
```
107133

108-
## SQS, IronMQ, and in-memory queues
109-
110-
SQS, IronMQ, and memqueue share the same API and can be used interchangeably.
111-
112-
### SQS
113-
114-
azsqs package uses Amazon Simple Queue Service as queue backend.
115-
116-
```go
117-
import "github.com/go-msgqueue/msgqueue"
118-
import "github.com/go-msgqueue/msgqueue/azsqs"
119-
import "github.com/aws/aws-sdk-go/service/sqs"
120-
121-
// Create queue.
122-
awsAccountId := "123456789"
123-
q := azsqs.NewQueue(awsSQS(), awsAccountId, &msgqueue.Options{
124-
Name: "sqs-queue-name",
125-
Handler: func(name string) error {
126-
fmt.Println("Hello", name)
127-
return nil
128-
},
129-
})
130-
131-
// Same using Manager.
132-
man := azsqs.NewManager(awsSQS(), accountId)
133-
q := man.NewQueue(&msgqueue.Options{...})
134-
135-
// Add message.
136-
q.Call("World")
137-
138-
// Start processing queue.
139-
p := q.Processor()
140-
p.Start()
141-
142-
// Stop processing.
143-
p.Stop()
144-
```
145-
146-
### IronMQ
147-
148-
ironmq package uses IronMQ as queue backend.
149-
150-
```go
151-
import "github.com/go-msgqueue/msgqueue"
152-
import "github.com/go-msgqueue/msgqueue/ironmq"
153-
import "github.com/iron-io/iron_go3/mq"
154-
155-
// Create queue.
156-
q := ironmq.NewQueue(mq.New("ironmq-queue-name"), &msgqueue.Options{
157-
Handler: func(name string) error {
158-
fmt.Println("Hello", name)
159-
return nil
160-
},
161-
})
162-
163-
// Same using manager.
164-
cfg := iron_config.Config("iron_mq")
165-
man := ironmq.NewManager(&cfg)
166-
q := man.NewQueue(&msgqueue.Options{...})
167-
168-
// Add message.
169-
q.Call("World")
170-
171-
// Start processing queue.
172-
p := q.Processor()
173-
p.Start()
174-
175-
// Stop processing.
176-
p.Stop()
177-
```
178-
179-
### In-memory
180-
181-
memqueue is in-memory queue backend implementation primarily useful for local development / unit testing. Unlike SQS and IronMQ it has running queue processor by default.
182-
183-
```go
184-
import "github.com/go-msgqueue/msgqueue"
185-
186-
// Create queue.
187-
q := memqueue.NewQueue(&msgqueue.Options{
188-
Handler: func(name string) error {
189-
fmt.Println("Hello", name)
190-
return nil
191-
},
192-
})
193-
194-
// Same using Manager.
195-
man := memqueue.NewManager()
196-
q := man.NewQueue(&msgqueue.Options{...})
197-
198-
// Stop processor if you don't need it.
199-
p := q.Processor()
200-
p.Stop()
201-
202-
// Process one message.
203-
err := p.ProcessOne()
204-
205-
// Process all buffered messages.
206-
err := p.ProcessAll()
207-
```
208-
209134
## Custom message delay
210135

211-
If error returned by handler implements `Delay() time.Duration` that delay is used to postpone message processing.
136+
If error returned by handler implements `Delay() time.Duration` interface then that delay is used to postpone message processing.
212137

213138
```go
214139
type RateLimitError string
@@ -224,52 +149,4 @@ func (RateLimitError) Delay() time.Duration {
224149
func handler() error {
225150
return RateLimitError("calm down")
226151
}
227-
228-
q := memqueue.NewQueue(&msgqueue.Options{
229-
Handler: handler,
230-
})
231-
```
232-
233-
## Stats
234-
235-
You can log local queue stats using following code:
236-
237-
```go
238-
func LogQueueStats(q msgqueue.Queue) {
239-
p := q.Processor()
240-
opt := p.Options()
241-
242-
var old *msgqueue.ProcessorStats
243-
for _ = range time.Tick(3 * time.Second) {
244-
st := p.Stats()
245-
if st == nil {
246-
break
247-
}
248-
249-
if old != nil && st.Processed == old.Processed &&
250-
st.Fails == old.Fails &&
251-
st.Retries == old.Retries {
252-
continue
253-
}
254-
old = st
255-
256-
glog.Infof(
257-
"%s: buffered=%d/%d in_flight=%d/%d "+
258-
"processed=%d fails=%d retries=%d "+
259-
"avg_dur=%s min_dur=%s max_dur=%s",
260-
q, st.Buffered, opt.BufferSize, st.InFlight, opt.WorkerNumber,
261-
st.Processed, st.Fails, st.Retries,
262-
st.AvgDuration, st.MinDuration, st.MaxDuration,
263-
)
264-
}
265-
}
266-
267-
go LogQueueStats(myqueue)
268-
```
269-
270-
which will log something like this
271-
272-
```
273-
Memqueue<Name=v1-production-notices-add>: buffered=0/1000 in_flight=3/16 processed=16183872 fails=0 retries=0 avg_dur=44.8ms min_dur=100µs max_dur=5.102s
274-
Memqueue<Name=v1-production-notices-add>: buffered=0/1000 in_flight=8/16 processed=16184022 fails=0 retries=0 avg_dur=42ms min_dur=100µs max_dur=5.102s
275152
```

azsqs/factory.go

+17-12
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,41 @@
11
package azsqs
22

33
import (
4-
"sync"
5-
64
"github.com/aws/aws-sdk-go/service/sqs"
75

86
"github.com/vmihailenco/taskq"
7+
"github.com/vmihailenco/taskq/internal/base"
98
)
109

1110
type factory struct {
11+
base base.Factory
12+
1213
sqs *sqs.SQS
1314
accountID string
14-
15-
queuesMu sync.RWMutex
16-
queues []taskq.Queue
1715
}
1816

1917
var _ taskq.Factory = (*factory)(nil)
2018

2119
func (f *factory) NewQueue(opt *taskq.QueueOptions) taskq.Queue {
22-
f.queuesMu.Lock()
23-
defer f.queuesMu.Unlock()
24-
2520
q := NewQueue(f.sqs, f.accountID, opt)
26-
f.queues = append(f.queues, q)
21+
f.base.Add(q)
2722
return q
2823
}
2924

3025
func (f *factory) Queues() []taskq.Queue {
31-
f.queuesMu.RLock()
32-
defer f.queuesMu.RUnlock()
33-
return f.queues
26+
return f.base.Queues()
27+
}
28+
29+
func (f *factory) StartConsumers() error {
30+
return f.base.StartConsumers()
31+
}
32+
33+
func (f *factory) StopConsumers() error {
34+
return f.base.StopConsumers()
35+
}
36+
37+
func (f *factory) Close() error {
38+
return f.base.Close()
3439
}
3540

3641
func NewFactory(sqs *sqs.SQS, accountID string) taskq.Factory {

0 commit comments

Comments
 (0)