-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.go
85 lines (71 loc) · 1.71 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// Copyright 2022 coffeehaze. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package oni
import (
"context"
"fmt"
"sync"
)
type IConsumer interface {
Handler(key string, handlerFunc ...HandlerFunc)
ErrorHandler(callbackFunc ErrorCallbackFunc)
Producer(name string, producerFunc ProducerFunc)
Group(keyGroup string) *Consumer
run(ctx context.Context)
closeConsumers() error
closeProducers() error
Explicit()
Implicit()
}
type Consumer struct {
stream *Stream
keyGroup string
callbackError ErrorCallbackFunc
}
func NewConsumer(stream *Stream) *Consumer {
return &Consumer{
stream: stream,
}
}
func (c *Consumer) Handler(key string, handlerFunc ...HandlerFunc) {
if len(c.keyGroup) != 0 {
key = fmt.Sprintf("%s.%s", c.keyGroup, key)
}
for _, f := range handlerFunc {
c.stream.addHandler(key, f, c.callbackError)
}
}
func (c *Consumer) Producer(name string, producerFunc ProducerFunc) {
c.stream.addProducer(name, producerFunc)
}
func (c *Consumer) Group(keyGroup string) *Consumer {
return &Consumer{
stream: c.stream,
keyGroup: keyGroup,
}
}
func (c *Consumer) Explicit() {
c.stream.cm = explicit
}
func (c *Consumer) Implicit() {
c.stream.cm = implicit
}
func (c *Consumer) ErrorHandler(callbackFunc ErrorCallbackFunc) {
c.callbackError = callbackFunc
}
func (c *Consumer) run(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(len(c.stream.handlers))
c.stream.ctx = ctx
for range c.stream.handlers {
go c.stream.invoke(&wg)
}
wg.Wait()
}
func (c *Consumer) closeConsumers() error {
return c.stream.closeConsumers()
}
func (c *Consumer) closeProducers() error {
return c.stream.closeProducers()
}