Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Commit 0d85ff6

Browse files
committed
sync_peer tx request selection rewrite
1 parent 33f035a commit 0d85ff6

File tree

1 file changed

+27
-15
lines changed

1 file changed

+27
-15
lines changed

ethcore/sync/src/chain/mod.rs

+27-15
Original file line numberDiff line numberDiff line change
@@ -691,7 +691,7 @@ pub struct ChainSync {
691691
sync_start_time: Option<Instant>,
692692
/// Transactions propagation statistics
693693
transactions_stats: TransactionsStats,
694-
/// Unfetched transactions
694+
/// Transactions whose hash has been announced, but that we have not fetched
695695
unfetched_pooled_transactions: H256FastMap<UnfetchedTransaction>,
696696
/// Enable ancient block downloading
697697
download_old_blocks: bool,
@@ -1126,33 +1126,45 @@ impl ChainSync {
11261126
}
11271127
}
11281128

1129-
// get some peers to give us transaction pool
1129+
// get the peer to give us at least some of announced but unfetched transactions
11301130
if !self.unfetched_pooled_transactions.is_empty() {
1131-
if let Some(s) = &mut self.peers.get_mut(&peer_id).unwrap().asking_pooled_transactions {
1131+
if let Some(s) = &mut self.peers.get_mut(&peer_id).expect("this is always an active peer; qed").asking_pooled_transactions {
11321132
let now = Instant::now();
11331133

11341134
let mut new_asking_pooled_transactions = s.iter().copied().collect::<HashSet<_>>();
1135-
let mut new_unfetched_pooled_transactions = self.unfetched_pooled_transactions.clone();
1136-
while new_asking_pooled_transactions.len() <= 256 {
1137-
for (hash, mut item) in self.unfetched_pooled_transactions.drain() {
1138-
if item.next_fetch < now {
1139-
new_asking_pooled_transactions.insert(hash);
1140-
item.tries += 1;
1141-
if item.tries < 5 {
1142-
item.next_fetch = now + (POOLED_TRANSACTIONS_TIMEOUT / 2);
1143-
new_unfetched_pooled_transactions.insert(hash, item);
1144-
}
1135+
let mut remaining_unfetched_pooled_transactions = self.unfetched_pooled_transactions.clone();
1136+
for (hash, mut item) in self.unfetched_pooled_transactions.drain() {
1137+
if new_asking_pooled_transactions.len() >= 256 {
1138+
// can't request any more transactions
1139+
break;
1140+
}
1141+
1142+
// if enough time has passed since last attempt...
1143+
if item.next_fetch < now {
1144+
// ...queue this hash for requesting
1145+
new_asking_pooled_transactions.insert(hash);
1146+
item.tries += 1;
1147+
1148+
// if we just started asking for it, queue it to be asked later on again
1149+
if item.tries < 5 {
1150+
item.next_fetch = now + (POOLED_TRANSACTIONS_TIMEOUT / 2);
1151+
remaining_unfetched_pooled_transactions.insert(hash, item);
1152+
} else {
1153+
// ...otherwise we assume this transaction does not exist and remove its hash from request queue
1154+
remaining_unfetched_pooled_transactions.remove(&hash);
11451155
}
11461156
}
11471157
}
11481158

11491159
let new_asking_pooled_transactions = new_asking_pooled_transactions.into_iter().collect::<Vec<_>>();
11501160
SyncRequester::request_pooled_transactions(self, io, peer_id, &new_asking_pooled_transactions);
11511161

1152-
self.peers.get_mut(&peer_id).unwrap().asking_pooled_transactions = Some(new_asking_pooled_transactions);
1153-
self.unfetched_pooled_transactions = new_unfetched_pooled_transactions;
1162+
self.peers.get_mut(&peer_id).expect("this is always an active peer; qed").asking_pooled_transactions = Some(new_asking_pooled_transactions);
1163+
self.unfetched_pooled_transactions = remaining_unfetched_pooled_transactions;
11541164

11551165
return;
1166+
} else {
1167+
trace!(target: "sync", "Skipping transaction fetch for peer {} as they don't support eth/65", peer_id);
11561168
}
11571169
}
11581170

0 commit comments

Comments
 (0)