@@ -691,7 +691,7 @@ pub struct ChainSync {
691
691
sync_start_time : Option < Instant > ,
692
692
/// Transactions propagation statistics
693
693
transactions_stats : TransactionsStats ,
694
- /// Unfetched transactions
694
+ /// Transactions whose hash has been announced, but that we have not fetched
695
695
unfetched_pooled_transactions : H256FastMap < UnfetchedTransaction > ,
696
696
/// Enable ancient block downloading
697
697
download_old_blocks : bool ,
@@ -1126,33 +1126,45 @@ impl ChainSync {
1126
1126
}
1127
1127
}
1128
1128
1129
- // get some peers to give us transaction pool
1129
+ // get the peer to give us at least some of announced but unfetched transactions
1130
1130
if !self . unfetched_pooled_transactions . is_empty ( ) {
1131
- if let Some ( s) = & mut self . peers . get_mut ( & peer_id) . unwrap ( ) . asking_pooled_transactions {
1131
+ if let Some ( s) = & mut self . peers . get_mut ( & peer_id) . expect ( "this is always an active peer; qed" ) . asking_pooled_transactions {
1132
1132
let now = Instant :: now ( ) ;
1133
1133
1134
1134
let mut new_asking_pooled_transactions = s. iter ( ) . copied ( ) . collect :: < HashSet < _ > > ( ) ;
1135
- let mut new_unfetched_pooled_transactions = self . unfetched_pooled_transactions . clone ( ) ;
1136
- while new_asking_pooled_transactions. len ( ) <= 256 {
1137
- for ( hash, mut item) in self . unfetched_pooled_transactions . drain ( ) {
1138
- if item. next_fetch < now {
1139
- new_asking_pooled_transactions. insert ( hash) ;
1140
- item. tries += 1 ;
1141
- if item. tries < 5 {
1142
- item. next_fetch = now + ( POOLED_TRANSACTIONS_TIMEOUT / 2 ) ;
1143
- new_unfetched_pooled_transactions. insert ( hash, item) ;
1144
- }
1135
+ let mut remaining_unfetched_pooled_transactions = self . unfetched_pooled_transactions . clone ( ) ;
1136
+ for ( hash, mut item) in self . unfetched_pooled_transactions . drain ( ) {
1137
+ if new_asking_pooled_transactions. len ( ) >= 256 {
1138
+ // can't request any more transactions
1139
+ break ;
1140
+ }
1141
+
1142
+ // if enough time has passed since last attempt...
1143
+ if item. next_fetch < now {
1144
+ // ...queue this hash for requesting
1145
+ new_asking_pooled_transactions. insert ( hash) ;
1146
+ item. tries += 1 ;
1147
+
1148
+ // if we just started asking for it, queue it to be asked later on again
1149
+ if item. tries < 5 {
1150
+ item. next_fetch = now + ( POOLED_TRANSACTIONS_TIMEOUT / 2 ) ;
1151
+ remaining_unfetched_pooled_transactions. insert ( hash, item) ;
1152
+ } else {
1153
+ // ...otherwise we assume this transaction does not exist and remove its hash from request queue
1154
+ remaining_unfetched_pooled_transactions. remove ( & hash) ;
1145
1155
}
1146
1156
}
1147
1157
}
1148
1158
1149
1159
let new_asking_pooled_transactions = new_asking_pooled_transactions. into_iter ( ) . collect :: < Vec < _ > > ( ) ;
1150
1160
SyncRequester :: request_pooled_transactions ( self , io, peer_id, & new_asking_pooled_transactions) ;
1151
1161
1152
- self . peers . get_mut ( & peer_id) . unwrap ( ) . asking_pooled_transactions = Some ( new_asking_pooled_transactions) ;
1153
- self . unfetched_pooled_transactions = new_unfetched_pooled_transactions ;
1162
+ self . peers . get_mut ( & peer_id) . expect ( "this is always an active peer; qed" ) . asking_pooled_transactions = Some ( new_asking_pooled_transactions) ;
1163
+ self . unfetched_pooled_transactions = remaining_unfetched_pooled_transactions ;
1154
1164
1155
1165
return ;
1166
+ } else {
1167
+ trace ! ( target: "sync" , "Skipping transaction fetch for peer {} as they don't support eth/65" , peer_id) ;
1156
1168
}
1157
1169
}
1158
1170
0 commit comments