@@ -23,20 +23,60 @@ import (
23
23
stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"
24
24
)
25
25
26
+ type ReaderOption func (* delegatorsReader ) error
27
+
28
+ func WithChainName (chainName string ) ReaderOption {
29
+ return func (r * delegatorsReader ) error {
30
+ r .chainName = chainName
31
+ return nil
32
+ }
33
+ }
34
+
35
+ func WithLogger (logger log.Logger ) ReaderOption {
36
+ return func (r * delegatorsReader ) error {
37
+ r .logger = logger
38
+ return nil
39
+ }
40
+ }
41
+
42
+ func WithMinSharesFilter (minShares math.LegacyDec ) ReaderOption {
43
+ return func (r * delegatorsReader ) error {
44
+ r .minSharesFilter = minShares
45
+ return nil
46
+ }
47
+ }
48
+
49
+ func WithMaxSharesFilter (maxShares math.LegacyDec ) ReaderOption {
50
+ return func (r * delegatorsReader ) error {
51
+ r .maxSharesFilter = maxShares
52
+ return nil
53
+ }
54
+ }
55
+
26
56
type delegatorsReader struct {
27
- chainName string
28
- src string
29
- logger log.Logger
30
- closer io.Closer
57
+ chainName string
58
+ src string
59
+ logger log.Logger
60
+ closer io.Closer
61
+ minSharesFilter math.LegacyDec
62
+ maxSharesFilter math.LegacyDec
31
63
}
32
64
33
65
// NewDelegatorsReader returns a new Reader that reads delegators from a blockchain data stores.
34
- func NewDelegatorsReader (chainName , src string , logger log. Logger ) (goetl.Processor , error ) {
35
- return & delegatorsReader {
36
- chainName : chainName ,
66
+ func NewDelegatorsReader (src string , options ... ReaderOption ) (goetl.Processor , error ) {
67
+ r := & delegatorsReader {
68
+ chainName : "mystery" ,
37
69
src : src ,
38
- logger : logger ,
39
- }, nil
70
+ logger : log .NewNopLogger (),
71
+ }
72
+
73
+ for _ , option := range options {
74
+ if err := option (r ); err != nil {
75
+ return nil , err
76
+ }
77
+ }
78
+
79
+ return r , nil
40
80
}
41
81
42
82
func (r * delegatorsReader ) ProcessData (_ etldata.Payload , outputChan chan etldata.Payload , killChan chan error ) {
@@ -68,7 +108,13 @@ func (r *delegatorsReader) ProcessData(_ etldata.Payload, outputChan chan etldat
68
108
69
109
err = iterateAllAddresses (ctx , keepers .Bank , func (addr sdk.AccAddress ) (stop bool ) {
70
110
delegations := lo .RejectMap (validators ,
71
- toDelegations (ctx , addr , r .logger , keepers .Staking , killChan ))
111
+ extractDelegations (ctx , addr , r .logger , keepers .Staking , killChan ))
112
+ shares := lo .Reduce (delegations , computeShares (), math .LegacyZeroDec ())
113
+
114
+ if (! r .maxSharesFilter .IsNil () && shares .GT (r .maxSharesFilter )) ||
115
+ (! r .minSharesFilter .IsNil () && shares .LT (r .minSharesFilter )) {
116
+ return false
117
+ }
72
118
73
119
for _ , delegation := range delegations {
74
120
payload := Delegation {
@@ -127,7 +173,7 @@ func iterateAllAddresses(ctx context.Context, bankKeeper bankkeeper.BaseKeeper,
127
173
return err
128
174
}
129
175
130
- func toDelegations (
176
+ func extractDelegations (
131
177
ctx context.Context , address sdk.AccAddress , logger log.Logger , stakingKeeper * stakingkeeper.Keeper , killChan chan error ,
132
178
) func (item stakingtypes.Validator , index int ) (stakingtypes.Delegation , bool ) {
133
179
return func (item stakingtypes.Validator , _ int ) (stakingtypes.Delegation , bool ) {
@@ -152,6 +198,12 @@ func toDelegations(
152
198
}
153
199
}
154
200
201
+ func computeShares () func (acc math.LegacyDec , delegation stakingtypes.Delegation , _ int ) math.LegacyDec {
202
+ return func (acc math.LegacyDec , delegation stakingtypes.Delegation , _ int ) math.LegacyDec {
203
+ return acc .Add (delegation .Shares )
204
+ }
205
+ }
206
+
155
207
func guessPrefixFromValoper (valoper string ) (string , error ) {
156
208
if idx := strings .Index (valoper , "valoper" ); idx != - 1 {
157
209
return valoper [:idx ], nil
0 commit comments