@@ -93,12 +93,16 @@ impl SenderAccountsManager {
93
93
. await
94
94
. expect ( "Should get escrow accounts from Eventual" ) ;
95
95
96
- let mut sender_accounts_write_lock = inner. sender_accounts . write ( ) . await ;
96
+ // Gather all outstanding receipts and unfinalized RAVs from the database.
97
+ // Used to create SenderAccount instances for all senders that have unfinalized allocations
98
+ // and try to finalize them if they have become ineligible.
99
+
100
+ // First we accumulate all allocations for each sender. This is because we may have more
101
+ // than one signer per sender in DB.
102
+ let mut unfinalized_sender_allocations_map: HashMap < Address , HashSet < Address > > =
103
+ HashMap :: new ( ) ;
97
104
98
- // Create Sender and SenderAllocation instances for all outstanding receipts in the
99
- // database because they may be linked to allocations that are not eligible anymore, but
100
- // still need to get aggregated.
101
- let unfinalized_allocations_in_db = sqlx:: query!(
105
+ let receipts_signer_allocations_in_db = sqlx:: query!(
102
106
r#"
103
107
SELECT DISTINCT
104
108
signer_address,
@@ -115,9 +119,9 @@ impl SenderAccountsManager {
115
119
)
116
120
. fetch_all ( & inner. pgpool )
117
121
. await
118
- . expect ( "should be able to fetch unfinalized allocations from the database" ) ;
122
+ . expect ( "should be able to fetch pending receipts from the database" ) ;
119
123
120
- for row in unfinalized_allocations_in_db {
124
+ for row in receipts_signer_allocations_in_db {
121
125
let allocation_ids = row
122
126
. allocation_ids
123
127
. expect ( "all receipts should have an allocation_id" )
@@ -133,6 +137,56 @@ impl SenderAccountsManager {
133
137
. get_sender_for_signer ( & signer_id)
134
138
. expect ( "should be able to get sender from signer" ) ;
135
139
140
+ // Accumulate allocations for the sender
141
+ unfinalized_sender_allocations_map
142
+ . entry ( sender_id)
143
+ . or_default ( )
144
+ . extend ( allocation_ids) ;
145
+ }
146
+
147
+ let nonfinal_ravs_sender_allocations_in_db = sqlx:: query!(
148
+ r#"
149
+ SELECT
150
+ sender_address,
151
+ (
152
+ SELECT ARRAY
153
+ (
154
+ SELECT DISTINCT allocation_id
155
+ FROM scalar_tap_ravs
156
+ WHERE sender_address = sender_address
157
+ )
158
+ ) AS allocation_id
159
+ FROM scalar_tap_ravs
160
+ "#
161
+ )
162
+ . fetch_all ( & inner. pgpool )
163
+ . await
164
+ . expect ( "should be able to fetch unfinalized RAVs from the database" ) ;
165
+
166
+ for row in nonfinal_ravs_sender_allocations_in_db {
167
+ let allocation_ids = row
168
+ . allocation_id
169
+ . expect ( "all RAVs should have an allocation_id" )
170
+ . iter ( )
171
+ . map ( |allocation_id| {
172
+ Address :: from_str ( allocation_id)
173
+ . expect ( "allocation_id should be a valid address" )
174
+ } )
175
+ . collect :: < HashSet < Address > > ( ) ;
176
+ let sender_id = Address :: from_str ( & row. sender_address )
177
+ . expect ( "sender_address should be a valid address" ) ;
178
+
179
+ // Accumulate allocations for the sender
180
+ unfinalized_sender_allocations_map
181
+ . entry ( sender_id)
182
+ . or_default ( )
183
+ . extend ( allocation_ids) ;
184
+ }
185
+
186
+ // Create SenderAccount instances for all senders that have unfinalized allocations and add
187
+ // the allocations to the SenderAccount instances.
188
+ let mut sender_accounts_write_lock = inner. sender_accounts . write ( ) . await ;
189
+ for ( sender_id, allocation_ids) in unfinalized_sender_allocations_map {
136
190
let sender = sender_accounts_write_lock
137
191
. entry ( sender_id)
138
192
. or_insert ( SenderAccount :: new (
@@ -150,14 +204,13 @@ impl SenderAccountsManager {
150
204
. clone ( ) ,
151
205
) ) ;
152
206
153
- // Update sender's allocations
154
- sender . update_allocations ( allocation_ids . clone ( ) ) . await ;
207
+ sender. update_allocations ( allocation_ids ) . await ;
208
+
155
209
sender
156
210
. recompute_unaggregated_fees ( )
157
211
. await
158
212
. expect ( "should be able to recompute unaggregated fees" ) ;
159
213
}
160
-
161
214
drop ( sender_accounts_write_lock) ;
162
215
163
216
// Update senders and allocations based on the current state of the network.
0 commit comments