-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathpublisher.go
84 lines (74 loc) · 2.16 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package amqp
import (
"context"
"sync"
"github.com/pkg/errors"
"github.com/devimteam/amqp/conn"
)
type Publisher struct {
observer *observer
conn *conn.Connection
opts publisherOptions
defaultPublish Publish
}
func newPublisher(ctx context.Context, conn *conn.Connection, opts ...PublisherOption) *Publisher {
p := Publisher{}
p.opts = defaultPubOptions()
for _, opt := range opts {
opt(&p)
}
p.conn = conn
p.observer = newObserver(ctx, p.conn, p.opts.observerOpts...)
return &p
}
func (p Publisher) Publish(ctx context.Context, exchangeName string, obj interface{}, pub Publish) error {
channel := p.observer.channel()
if p.opts.wait.flag {
err := p.conn.NotifyConnected(p.opts.wait.timeout)
if err != nil {
return err
}
}
defer p.observer.release(channel)
return p.publish(channel, ctx, exchangeName, obj, pub)
}
func (p Publisher) PublishChannel(ctx context.Context, exchangeName string, pub Publish) chan<- interface{} {
channel := make(chan interface{})
go func() {
amqpChan := p.observer.channel()
defer p.observer.release(amqpChan)
p.workerPool(amqpChan, channel, ctx, exchangeName, pub)
}()
return channel
}
func (p Publisher) workerPool(amqpChan *Channel, channel <-chan interface{}, ctx context.Context, exchangeName string, pub Publish) {
var wg sync.WaitGroup
wg.Add(p.opts.workers)
for i := 0; i < p.opts.workers; i++ {
go func() {
p.worker(amqpChan, channel, ctx, exchangeName, pub)
wg.Done()
}()
}
wg.Wait()
}
func (p Publisher) worker(amqpChan *Channel, channel <-chan interface{}, ctx context.Context, exchangeName string, pub Publish) {
for obj := range channel {
if err := p.publish(amqpChan, ctx, exchangeName, obj, pub); err != nil {
_ = p.opts.log.Log(err)
}
}
}
func (p Publisher) publish(channel *Channel, ctx context.Context, exchangeName string, v interface{}, publish Publish) error {
msg, err := constructPublishing(v, publish.Priority, p.opts.defaultContentType)
if err != nil {
return err
}
for _, before := range p.opts.before {
before(ctx, &msg)
}
if err = channel.publish(exchangeName, msg, publish); err != nil {
return errors.Wrap(err, "publish")
}
return nil
}