@@ -9,16 +9,17 @@ import (
9
9
10
10
"github.com/axone-protocol/cosmos-extractor/pkg/keeper"
11
11
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
12
+ "github.com/samber/lo"
12
13
"github.com/teambenny/goetl"
13
14
"github.com/teambenny/goetl/etldata"
14
- "github.com/teambenny/goetl/etlutil"
15
15
16
16
"cosmossdk.io/collections"
17
17
"cosmossdk.io/log"
18
18
"cosmossdk.io/math"
19
19
20
20
sdk "github.com/cosmos/cosmos-sdk/types"
21
21
bankkeeper "github.com/cosmos/cosmos-sdk/x/bank/keeper"
22
+ stakingkeeper "github.com/cosmos/cosmos-sdk/x/staking/keeper"
22
23
stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"
23
24
)
24
25
@@ -65,22 +66,11 @@ func (r *delegatorsReader) ProcessData(_ etldata.Payload, outputChan chan etldat
65
66
66
67
configureSdk (prefix )
67
68
68
- err = IterateAllAddresses (ctx , keepers .Bank , func (addr sdk.AccAddress ) (stop bool ) {
69
- for _ , val := range validators {
70
- valAddr , err := sdk .ValAddressFromBech32 (val .OperatorAddress )
71
- etlutil .KillPipelineIfErr (err , killChan )
72
-
73
- delegation , err := keepers .Staking .GetDelegation (ctx , addr , valAddr )
74
- if err != nil {
75
- if errors .Is (err , stakingtypes .ErrNoDelegation ) {
76
- continue
77
- }
78
-
79
- r .logger .Error (err .Error ())
80
- killChan <- err
81
- return true
82
- }
69
+ err = iterateAllAddresses (ctx , keepers .Bank , func (addr sdk.AccAddress ) (stop bool ) {
70
+ delegations := lo .RejectMap (validators ,
71
+ toDelegations (ctx , addr , r .logger , keepers .Staking , killChan ))
83
72
73
+ for _ , delegation := range delegations {
84
74
payload := Delegation {
85
75
ChainName : r .chainName ,
86
76
DelegatorNativeAddr : delegation .DelegatorAddress ,
@@ -122,7 +112,7 @@ func (r *delegatorsReader) String() string {
122
112
123
113
// IterateAllAddresses iterates over all the accounts that are provided to a callback.
124
114
// If true is returned from the callback, iteration is halted.
125
- func IterateAllAddresses (ctx context.Context , bankKeeper bankkeeper.BaseKeeper , cb func (sdk.AccAddress ) bool ) error {
115
+ func iterateAllAddresses (ctx context.Context , bankKeeper bankkeeper.BaseKeeper , cb func (sdk.AccAddress ) bool ) error {
126
116
lastSeenAddr := ""
127
117
err := bankKeeper .Balances .Walk (ctx , nil , func (key collections.Pair [sdk.AccAddress , string ], _ math.Int ) (stop bool , err error ) {
128
118
addr := key .K1 ()
@@ -137,6 +127,31 @@ func IterateAllAddresses(ctx context.Context, bankKeeper bankkeeper.BaseKeeper,
137
127
return err
138
128
}
139
129
130
+ func toDelegations (
131
+ ctx context.Context , address sdk.AccAddress , logger log.Logger , stakingKeeper * stakingkeeper.Keeper , killChan chan error ,
132
+ ) func (item stakingtypes.Validator , index int ) (stakingtypes.Delegation , bool ) {
133
+ return func (item stakingtypes.Validator , _ int ) (stakingtypes.Delegation , bool ) {
134
+ valAddr , err := sdk .ValAddressFromBech32 (item .OperatorAddress )
135
+ if err != nil {
136
+ logger .Error (err .Error ())
137
+ killChan <- err
138
+ return stakingtypes.Delegation {}, true
139
+ }
140
+
141
+ delegation , err := stakingKeeper .GetDelegation (ctx , address , valAddr )
142
+ if err != nil {
143
+ if errors .Is (err , stakingtypes .ErrNoDelegation ) {
144
+ return stakingtypes.Delegation {}, true
145
+ }
146
+
147
+ logger .Error (err .Error ())
148
+ killChan <- err
149
+ return stakingtypes.Delegation {}, true
150
+ }
151
+ return delegation , false
152
+ }
153
+ }
154
+
140
155
func guessPrefixFromValoper (valoper string ) (string , error ) {
141
156
if idx := strings .Index (valoper , "valoper" ); idx != - 1 {
142
157
return valoper [:idx ], nil
0 commit comments