Skip to content

Commit 5d8cc33

Browse files
authored
feat(gossipsub): send IDONTWANT before publishing a new message
This allows us to possible save some bandwidth on the network. See libp2p/go-libp2p-pubsub#610 (comment) for more info Pull-Request: libp2p#6017.
1 parent a367e1b commit 5d8cc33

File tree

2 files changed

+46
-54
lines changed

2 files changed

+46
-54
lines changed

protocols/gossipsub/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
## 0.49.0
2+
- Send IDONTWANT before Publishing a new message.
3+
See [PR 6017](https://github.com/libp2p/rust-libp2p/pull/6017)
4+
25
- Improve log messaging by renaming `message` to `message_id`. This change makes it log coherent to
36
the field and also improve duplication of `message` field.
47
See [PR 5972](https://github.com/libp2p/rust-libp2p/pull/5972)

protocols/gossipsub/src/behaviour.rs

Lines changed: 43 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
// DEALINGS IN THE SOFTWARE.
2020

2121
use std::{
22-
cmp::{max, Ordering, Ordering::Equal},
22+
cmp::{
23+
max,
24+
Ordering::{self, Equal},
25+
},
2326
collections::{BTreeSet, HashMap, HashSet, VecDeque},
24-
fmt,
25-
fmt::Debug,
27+
fmt::{self, Debug},
2628
net::IpAddr,
2729
task::{Context, Poll},
2830
time::Duration,
@@ -747,6 +749,19 @@ where
747749
let mut publish_failed = true;
748750
for peer_id in recipient_peers.iter() {
749751
tracing::trace!(peer=%peer_id, "Sending message to peer");
752+
// If enabled, Send first an IDONTWANT so that if we are slower than forwarders
753+
// publishing the original message we don't receive it back.
754+
if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold()
755+
&& self.config.idontwant_on_publish()
756+
{
757+
self.send_message(
758+
*peer_id,
759+
RpcOut::IDontWant(IDontWant {
760+
message_ids: vec![msg_id.clone()],
761+
}),
762+
);
763+
}
764+
750765
if self.send_message(
751766
*peer_id,
752767
RpcOut::Publish {
@@ -766,13 +781,6 @@ where
766781
return Err(PublishError::AllQueuesFull(recipient_peers.len()));
767782
}
768783

769-
// Broadcast IDONTWANT messages
770-
if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold()
771-
&& self.config.idontwant_on_publish()
772-
{
773-
self.send_idontwant(&raw_message, &msg_id, raw_message.source.as_ref());
774-
}
775-
776784
tracing::debug!(message_id=%msg_id, "Published message");
777785

778786
if let Some(metrics) = self.metrics.as_mut() {
@@ -1766,7 +1774,26 @@ where
17661774

17671775
// Broadcast IDONTWANT messages
17681776
if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() {
1769-
self.send_idontwant(&raw_message, &msg_id, Some(propagation_source));
1777+
let recipient_peers = self
1778+
.mesh
1779+
.get(&message.topic)
1780+
.map(|mesh| mesh.iter())
1781+
.unwrap_or_default()
1782+
.copied()
1783+
.chain(self.gossip_promises.peers_for_message(&msg_id))
1784+
.filter(|peer_id| {
1785+
peer_id != propagation_source && Some(peer_id) != message.source.as_ref()
1786+
})
1787+
.collect::<Vec<PeerId>>();
1788+
1789+
for peer_id in recipient_peers {
1790+
self.send_message(
1791+
peer_id,
1792+
RpcOut::IDontWant(IDontWant {
1793+
message_ids: vec![msg_id.clone()],
1794+
}),
1795+
);
1796+
}
17701797
}
17711798

17721799
// Check the validity of the message
@@ -2633,49 +2660,6 @@ where
26332660
}
26342661
}
26352662

2636-
/// Helper function which sends an IDONTWANT message to mesh\[topic\] peers.
2637-
fn send_idontwant(
2638-
&mut self,
2639-
message: &RawMessage,
2640-
msg_id: &MessageId,
2641-
propagation_source: Option<&PeerId>,
2642-
) {
2643-
let Some(mesh_peers) = self.mesh.get(&message.topic) else {
2644-
return;
2645-
};
2646-
2647-
let iwant_peers = self.gossip_promises.peers_for_message(msg_id);
2648-
2649-
let recipient_peers: Vec<PeerId> = mesh_peers
2650-
.iter()
2651-
.chain(iwant_peers.iter())
2652-
.filter(|&peer_id| {
2653-
Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref()
2654-
})
2655-
.cloned()
2656-
.collect();
2657-
2658-
for peer_id in recipient_peers {
2659-
let Some(peer) = self.connected_peers.get_mut(&peer_id) else {
2660-
tracing::error!(peer = %peer_id,
2661-
"Could not IDONTWANT, peer doesn't exist in connected peer list");
2662-
continue;
2663-
};
2664-
2665-
// Only gossipsub 1.2 peers support IDONTWANT.
2666-
if peer.kind != PeerKind::Gossipsubv1_2 {
2667-
continue;
2668-
}
2669-
2670-
self.send_message(
2671-
peer_id,
2672-
RpcOut::IDontWant(IDontWant {
2673-
message_ids: vec![msg_id.clone()],
2674-
}),
2675-
);
2676-
}
2677-
}
2678-
26792663
/// Helper function which forwards a message to mesh\[topic\] peers.
26802664
///
26812665
/// Returns true if at least one peer was messaged.
@@ -2867,6 +2851,11 @@ where
28672851
return false;
28682852
};
28692853

2854+
if !matches!(peer.kind, PeerKind::Gossipsubv1_2) && matches!(rpc, RpcOut::IDontWant(..)) {
2855+
tracing::trace!(peer=%peer_id, "Won't send IDONTWANT message for message to peer as it doesn't support Gossipsub v1.2");
2856+
return false;
2857+
}
2858+
28702859
// Try sending the message to the connection handler.
28712860
match peer.sender.send_message(rpc) {
28722861
Ok(()) => true,

0 commit comments

Comments
 (0)