Skip to content

Commit 12bc4aa

Browse files
committed
fix: use std mutex for sender_accounts
Signed-off-by: Alexis Asseman <[email protected]>
1 parent 6f1ec7c commit 12bc4aa

File tree

2 files changed

+51
-46
lines changed

2 files changed

+51
-46
lines changed

tap-agent/src/tap/sender_account.rs

+2-6
Original file line numberDiff line numberDiff line change
@@ -434,9 +434,7 @@ mod tests {
434434

435435
// To help with testing from other modules.
436436
impl SenderAccount {
437-
pub async fn _tests_get_allocations_active(
438-
&self,
439-
) -> HashMap<Address, Arc<SenderAllocation>> {
437+
pub fn _tests_get_allocations_active(&self) -> HashMap<Address, Arc<SenderAllocation>> {
440438
self.inner
441439
.allocations
442440
.lock()
@@ -452,9 +450,7 @@ mod tests {
452450
.collect()
453451
}
454452

455-
pub async fn _tests_get_allocations_ineligible(
456-
&self,
457-
) -> HashMap<Address, Arc<SenderAllocation>> {
453+
pub fn _tests_get_allocations_ineligible(&self) -> HashMap<Address, Arc<SenderAllocation>> {
458454
self.inner
459455
.allocations
460456
.lock()

tap-agent/src/tap/sender_accounts_manager.rs

+49-40
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use std::collections::HashSet;
5+
use std::sync::Mutex as StdMutex;
56
use std::{collections::HashMap, str::FromStr, sync::Arc};
67

78
use alloy_sol_types::Eip712Domain;
@@ -13,7 +14,6 @@ use indexer_common::prelude::{Allocation, SubgraphClient};
1314
use serde::Deserialize;
1415
use sqlx::{postgres::PgListener, PgPool};
1516
use thegraph::types::Address;
16-
use tokio::sync::RwLock;
1717
use tracing::{error, warn};
1818

1919
use crate::config;
@@ -41,7 +41,7 @@ struct Inner {
4141
config: &'static config::Cli,
4242
pgpool: PgPool,
4343
/// Map of sender_address to SenderAllocation.
44-
sender_accounts: Arc<RwLock<HashMap<Address, SenderAccount>>>,
44+
sender_accounts: Arc<StdMutex<HashMap<Address, Arc<SenderAccount>>>>,
4545
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
4646
escrow_accounts: Eventual<EscrowAccounts>,
4747
escrow_subgraph: &'static SubgraphClient,
@@ -57,11 +57,11 @@ impl Inner {
5757
target_senders: HashSet<Address>,
5858
) -> Result<()> {
5959
let eligible_allocations: HashSet<Address> = indexer_allocations.keys().copied().collect();
60-
let mut sender_accounts_write = self.sender_accounts.write().await;
60+
let mut sender_accounts_copy = self.sender_accounts.lock().unwrap().clone();
6161

6262
// For all Senders that are not in the target_senders HashSet, set all their allocations to
6363
// ineligible. That will trigger a finalization of all their receipts.
64-
for (sender_id, sender_account) in sender_accounts_write.iter_mut() {
64+
for (sender_id, sender_account) in sender_accounts_copy.iter() {
6565
if !target_senders.contains(sender_id) {
6666
sender_account.update_allocations(HashSet::new()).await;
6767
}
@@ -70,33 +70,37 @@ impl Inner {
7070
// Get or create SenderAccount instances for all currently eligible
7171
// senders.
7272
for sender_id in &target_senders {
73-
let sender = sender_accounts_write
74-
.entry(*sender_id)
75-
.or_insert(SenderAccount::new(
76-
self.config,
77-
self.pgpool.clone(),
78-
*sender_id,
79-
self.escrow_accounts.clone(),
80-
self.escrow_subgraph,
81-
self.escrow_adapter.clone(),
82-
self.tap_eip712_domain_separator.clone(),
83-
self.sender_aggregator_endpoints
84-
.get(sender_id)
85-
.ok_or_else(|| {
86-
anyhow!(
87-
"No sender_aggregator_endpoint found for sender {}",
88-
sender_id
89-
)
90-
})?
91-
.clone(),
92-
));
73+
let sender =
74+
sender_accounts_copy
75+
.entry(*sender_id)
76+
.or_insert(Arc::new(SenderAccount::new(
77+
self.config,
78+
self.pgpool.clone(),
79+
*sender_id,
80+
self.escrow_accounts.clone(),
81+
self.escrow_subgraph,
82+
self.escrow_adapter.clone(),
83+
self.tap_eip712_domain_separator.clone(),
84+
self.sender_aggregator_endpoints
85+
.get(sender_id)
86+
.ok_or_else(|| {
87+
anyhow!(
88+
"No sender_aggregator_endpoint found for sender {}",
89+
sender_id
90+
)
91+
})?
92+
.clone(),
93+
)));
9394

9495
// Update sender's allocations
9596
sender
9697
.update_allocations(eligible_allocations.clone())
9798
.await;
9899
}
99100

101+
// Replace the sender_accounts with the updated sender_accounts_copy
102+
*self.sender_accounts.lock().unwrap() = sender_accounts_copy;
103+
100104
// TODO: remove Sender instances that are finished. Ideally done in another async task?
101105

102106
Ok(())
@@ -118,7 +122,7 @@ impl SenderAccountsManager {
118122
let inner = Arc::new(Inner {
119123
config,
120124
pgpool,
121-
sender_accounts: Arc::new(RwLock::new(HashMap::new())),
125+
sender_accounts: Arc::new(StdMutex::new(HashMap::new())),
122126
indexer_allocations,
123127
escrow_accounts,
124128
escrow_subgraph,
@@ -239,11 +243,11 @@ impl SenderAccountsManager {
239243

240244
// Create SenderAccount instances for all senders that have unfinalized allocations and add
241245
// the allocations to the SenderAccount instances.
242-
let mut sender_accounts_write_lock = inner.sender_accounts.write().await;
246+
let mut sender_accounts = HashMap::new();
243247
for (sender_id, allocation_ids) in unfinalized_sender_allocations_map {
244-
let sender = sender_accounts_write_lock
248+
let sender = sender_accounts
245249
.entry(sender_id)
246-
.or_insert(SenderAccount::new(
250+
.or_insert(Arc::new(SenderAccount::new(
247251
config,
248252
inner.pgpool.clone(),
249253
sender_id,
@@ -256,7 +260,7 @@ impl SenderAccountsManager {
256260
.get(&sender_id)
257261
.expect("should be able to get sender_aggregator_endpoint for sender")
258262
.clone(),
259-
));
263+
)));
260264

261265
sender.update_allocations(allocation_ids).await;
262266

@@ -265,7 +269,8 @@ impl SenderAccountsManager {
265269
.await
266270
.expect("should be able to recompute unaggregated fees");
267271
}
268-
drop(sender_accounts_write_lock);
272+
// replace the sender_accounts with the updated sender_accounts
273+
*inner.sender_accounts.lock().unwrap() = sender_accounts;
269274

270275
// Update senders and allocations based on the current state of the network.
271276
// It is important to do this after creating the Sender and SenderAllocation instances based
@@ -320,7 +325,7 @@ impl SenderAccountsManager {
320325
/// corresponding SenderAccount.
321326
async fn new_receipts_watcher(
322327
mut pglistener: PgListener,
323-
sender_accounts: Arc<RwLock<HashMap<Address, SenderAccount>>>,
328+
sender_accounts: Arc<StdMutex<HashMap<Address, Arc<SenderAccount>>>>,
324329
escrow_accounts: Eventual<EscrowAccounts>,
325330
) {
326331
loop {
@@ -354,7 +359,13 @@ impl SenderAccountsManager {
354359
}
355360
};
356361

357-
if let Some(sender_account) = sender_accounts.read().await.get(&sender_address) {
362+
let sender_account = sender_accounts
363+
.lock()
364+
.unwrap()
365+
.get(&sender_address)
366+
.cloned();
367+
368+
if let Some(sender_account) = sender_account {
358369
sender_account
359370
.handle_new_receipt_notification(new_receipt_notification)
360371
.await;
@@ -492,8 +503,8 @@ mod tests {
492503
assert!(sender_account
493504
._inner
494505
.sender_accounts
495-
.write()
496-
.await
506+
.lock()
507+
.unwrap()
497508
.contains_key(&SENDER.1));
498509

499510
// Remove the escrow account from the escrow_accounts Eventual.
@@ -506,22 +517,20 @@ mod tests {
506517
assert!(sender_account
507518
._inner
508519
.sender_accounts
509-
.read()
510-
.await
520+
.lock()
521+
.unwrap()
511522
.get(&SENDER.1)
512523
.unwrap()
513524
._tests_get_allocations_active()
514-
.await
515525
.is_empty());
516526
assert!(sender_account
517527
._inner
518528
.sender_accounts
519-
.read()
520-
.await
529+
.lock()
530+
.unwrap()
521531
.get(&SENDER.1)
522532
.unwrap()
523533
._tests_get_allocations_ineligible()
524-
.await
525534
.contains_key(&allocation_id));
526535
}
527536
}

0 commit comments

Comments
 (0)