forked from ergo-services/ergo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproducer.go
42 lines (36 loc) · 1.09 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main
import (
"fmt"
"github.com/halturin/ergo"
"github.com/halturin/ergo/etf"
)
type Producer struct {
ergo.GenStage
dispatcher ergo.GenStageDispatcherBehaviour
}
func (g *Producer) InitStage(process *ergo.Process, args ...interface{}) (ergo.GenStageOptions, interface{}) {
// create a hash function for the dispatcher
hash := func(t etf.Term) int {
i, ok := t.(int)
if !ok {
// filtering out
return -1
}
if i%2 == 0 {
return 0
}
return 1
}
options := ergo.GenStageOptions{
Dispatcher: ergo.CreateGenStageDispatcherPartition(3, hash),
}
return options, nil
}
func (g *Producer) HandleDemand(subscription ergo.GenStageSubscription, count uint, state interface{}) (error, etf.List) {
fmt.Println("Producer: just got demand for", count, "pack of events from", subscription.Pid)
return nil, nil
}
func (g *Producer) HandleSubscribe(subscription ergo.GenStageSubscription, options ergo.GenStageSubscribeOptions, state interface{}) error {
fmt.Println("New subscription from:", subscription.Pid, "with min:", options.MinDemand, "and max:", options.MaxDemand)
return nil
}