Skip to content

Commit 36b3e4d

Browse files
committed
remove retain_mut and do not retain when polling the handler
1 parent 40a9e4b commit 36b3e4d

File tree

3 files changed

+9
-42
lines changed

3 files changed

+9
-42
lines changed

protocols/gossipsub/src/behaviour.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3177,22 +3177,18 @@ where
31773177
}
31783178
}
31793179
}
3180-
HandlerEvent::MessagesDropped(rpcs) => {
3180+
HandlerEvent::MessagesDropped(rpc) => {
31813181
// Account for this in the scoring logic
31823182
if let Some((peer_score, _, _)) = &mut self.peer_score {
31833183
peer_score.failed_message_slow_peer(&propagation_source);
31843184
}
31853185

31863186
// Keep track of expired messages for the application layer.
31873187
let failed_messages = self.failed_messages.entry(propagation_source).or_default();
3188-
failed_messages.timeout += rpcs.len();
3188+
failed_messages.timeout += 1;
31893189
if let Some(metrics) = self.metrics.as_mut() {
3190-
for rpc in rpcs {
3191-
if let RpcOut::Publish { message, .. } | RpcOut::Forward { message, .. } =
3192-
rpc
3193-
{
3194-
metrics.timeout_msg_dropped(&message.topic);
3195-
}
3190+
if let RpcOut::Publish { message, .. } | RpcOut::Forward { message, .. } = rpc {
3191+
metrics.timeout_msg_dropped(&message.topic);
31963192
}
31973193
}
31983194
}
@@ -3297,7 +3293,7 @@ where
32973293
}
32983294

32993295
// Remove messages from the queue.
3300-
peer.messages.retain_mut(|rpc| match rpc {
3296+
peer.messages.retain(|rpc| match rpc {
33013297
RpcOut::Publish { message_id, .. }
33023298
| RpcOut::Forward { message_id, .. } => {
33033299
!message_ids.contains(message_id)

protocols/gossipsub/src/handler.rs

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ pub enum HandlerEvent {
6060
/// which protocol. This message only occurs once per connection.
6161
PeerKind(PeerKind),
6262
/// A message to be published was dropped because it could not be sent in time.
63-
MessagesDropped(Vec<RpcOut>),
63+
MessagesDropped(RpcOut),
6464
}
6565

6666
/// A message sent from the behaviour to the handler.
@@ -270,7 +270,7 @@ impl EnabledHandler {
270270
self.outbound_substream =
271271
Some(OutboundSubstreamState::WaitingOutput(substream));
272272
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
273-
HandlerEvent::MessagesDropped(vec![message]),
273+
HandlerEvent::MessagesDropped(message),
274274
));
275275
}
276276
}
@@ -410,22 +410,6 @@ impl EnabledHandler {
410410
}
411411
}
412412

413-
// Remove stale messages from the queue.
414-
let stale = self.message_queue.retain_mut(|rpc| match rpc {
415-
RpcOut::Publish {
416-
ref mut timeout, ..
417-
}
418-
| RpcOut::Forward {
419-
ref mut timeout, ..
420-
} => !timeout.poll_unpin(cx).is_ready(),
421-
_ => true,
422-
});
423-
if !stale.is_empty() {
424-
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
425-
HandlerEvent::MessagesDropped(stale),
426-
));
427-
}
428-
429413
Poll::Pending
430414
}
431415
}

protocols/gossipsub/src/queue.rs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
use std::{
2222
collections::{BinaryHeap, HashMap},
23-
mem,
2423
sync::{
2524
atomic::{AtomicUsize, Ordering},
2625
Arc, Mutex,
@@ -105,21 +104,9 @@ impl<T: Ord> Queue<T> {
105104
/// Retain only the elements specified by the predicate.
106105
/// In other words, remove all elements e for which f(&e) returns false. The elements are visited in unsorted (and unspecified) order.
107106
/// Returns the cleared items.
108-
pub(crate) fn retain_mut<F: FnMut(&mut T) -> bool>(&mut self, mut f: F) -> Vec<T> {
107+
pub(crate) fn retain<F: FnMut(&T) -> bool>(&mut self, f: F) {
109108
let mut shared = self.shared.lock().expect("lock to not be poisoned");
110-
// `BinaryHeap` doesn't impl `retain_mut`, this seems like a practical way to achieve it.
111-
// `BinaryHeap::drain` is O(n) as it returns an iterator over the removed elements in its internal arbitrary order.
112-
// `BinaryHeap::push` is ~O(1) which makes this function O(n).
113-
let mut queue = mem::replace(&mut shared.queue, BinaryHeap::with_capacity(self.capacity));
114-
let mut cleared = vec![];
115-
for mut item in queue.drain() {
116-
if f(&mut item) {
117-
shared.queue.push(item);
118-
} else {
119-
cleared.push(item);
120-
}
121-
}
122-
cleared
109+
shared.queue.retain(f);
123110
}
124111

125112
/// Returns the length of the queue.

0 commit comments

Comments
 (0)