Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ libp2p-core = { version = "0.43.1", path = "core" }
libp2p-dcutr = { version = "0.14.0", path = "protocols/dcutr" }
libp2p-dns = { version = "0.44.0", path = "transports/dns" }
libp2p-floodsub = { version = "0.47.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.49.0", path = "protocols/gossipsub" }
libp2p-gossipsub = { version = "0.50.0", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.47.0", path = "protocols/identify" }
libp2p-identity = { version = "0.2.12" }
libp2p-kad = { version = "0.48.1", path = "protocols/kad" }
Expand Down
5 changes: 5 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.50.0

- Remove `Rpc` from the public API.
See [PR 6091](https://github.com/libp2p/rust-libp2p/pull/6091)

## 0.49.0

- Feature gate metrics related code. This changes some `Behaviour` constructor methods.
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-gossipsub"
edition.workspace = true
rust-version = { workspace = true }
description = "Gossipsub protocol for libp2p"
version = "0.49.0"
version = "0.50.0"
authors = ["Age Manning <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
20 changes: 10 additions & 10 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
protocol::GossipsubCodec,
rpc::Receiver,
subscription_filter::WhitelistSubscriptionFilter,
types::Rpc,
types::RpcIn,
IdentTopic as Topic,
};

Expand Down Expand Up @@ -313,7 +313,7 @@ where
}

// Converts a protobuf message into a gossipsub message for reading the Gossipsub event queue.
fn proto_to_message(rpc: &proto::RPC) -> Rpc {
fn proto_to_message(rpc: &proto::RPC) -> RpcIn {
// Store valid messages.
let mut messages = Vec::with_capacity(rpc.publish.len());
let rpc = rpc.clone();
Expand Down Expand Up @@ -403,7 +403,7 @@ fn proto_to_message(rpc: &proto::RPC) -> Rpc {
control_msgs.extend(prune_msgs);
}

Rpc {
RpcIn {
messages,
subscriptions: rpc
.subscriptions
Expand Down Expand Up @@ -1243,7 +1243,7 @@ fn test_handle_iwant_msg_but_already_sent_idontwant() {
gs.mcache.put(&msg_id, raw_message);

// Receive IDONTWANT from Peer 1.
let rpc = Rpc {
let rpc = RpcIn {
messages: vec![],
subscriptions: vec![],
control_msgs: vec![ControlAction::IDontWant(IDontWant {
Expand Down Expand Up @@ -3137,7 +3137,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {
p1,
ConnectionId::new_unchecked(0),
HandlerEvent::Message {
rpc: Rpc {
rpc: RpcIn {
messages: vec![raw_message1],
subscriptions: vec![subscription.clone()],
control_msgs: vec![control_action],
Expand All @@ -3163,7 +3163,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {
p2,
ConnectionId::new_unchecked(0),
HandlerEvent::Message {
rpc: Rpc {
rpc: RpcIn {
messages: vec![raw_message3],
subscriptions: vec![subscription],
control_msgs: vec![control_action],
Expand Down Expand Up @@ -3773,7 +3773,7 @@ fn test_scoring_p4_invalid_signature() {
peers[0],
ConnectionId::new_unchecked(0),
HandlerEvent::Message {
rpc: Rpc {
rpc: RpcIn {
messages: vec![],
subscriptions: vec![],
control_msgs: vec![],
Expand Down Expand Up @@ -5530,7 +5530,7 @@ fn parses_idontwant() {
.create_network();

let message_id = MessageId::new(&[0, 1, 2, 3]);
let rpc = Rpc {
let rpc = RpcIn {
messages: vec![],
subscriptions: vec![],
control_msgs: vec![ControlAction::IDontWant(IDontWant {
Expand Down Expand Up @@ -6619,7 +6619,7 @@ fn test_validation_error_message_size_too_large_topic_specific() {
peers[0],
ConnectionId::new_unchecked(0),
HandlerEvent::Message {
rpc: Rpc {
rpc: RpcIn {
messages: vec![raw_message],
subscriptions: vec![],
control_msgs: vec![],
Expand Down Expand Up @@ -6723,7 +6723,7 @@ fn test_validation_message_size_within_topic_specific() {
peers[0],
ConnectionId::new_unchecked(0),
HandlerEvent::Message {
rpc: Rpc {
rpc: RpcIn {
messages: vec![raw_message],
subscriptions: vec![],
control_msgs: vec![],
Expand Down
4 changes: 2 additions & 2 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{
protocol::{GossipsubCodec, ProtocolConfig},
rpc::Receiver,
rpc_proto::proto,
types::{PeerKind, RawMessage, Rpc, RpcOut},
types::{PeerKind, RawMessage, RpcIn, RpcOut},
ValidationError,
};

Expand All @@ -51,7 +51,7 @@ pub enum HandlerEvent {
/// any) that were received.
Message {
/// The GossipsubRPC message excluding any invalid messages.
rpc: Rpc,
rpc: RpcIn,
/// Any invalid messages that were received in the RPC, along with the associated
/// validation error.
invalid_messages: Vec<(RawMessage, ValidationError)>,
Expand Down
3 changes: 0 additions & 3 deletions protocols/gossipsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,5 @@ pub use self::{
types::{FailedMessages, Message, MessageAcceptance, MessageId, RawMessage},
};

#[deprecated(note = "Will be removed from the public API.")]
pub type Rpc = self::types::Rpc;

pub type IdentTopic = Topic<self::topic::IdentityHash>;
pub type Sha256Topic = Topic<self::topic::Sha256Hash>;
17 changes: 10 additions & 7 deletions protocols/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
topic::TopicHash,
types::{
ControlAction, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune,
RawMessage, Rpc, Subscription, SubscriptionAction,
RawMessage, RpcIn, Subscription, SubscriptionAction,
},
ValidationError,
};
Expand Down Expand Up @@ -564,7 +564,7 @@ impl Decoder for GossipsubCodec {
}

Ok(Some(HandlerEvent::Message {
rpc: Rpc {
rpc: RpcIn {
messages,
subscriptions: rpc
.subscriptions
Expand All @@ -587,12 +587,16 @@ impl Decoder for GossipsubCodec {

#[cfg(test)]
mod tests {
use std::time::Duration;

use futures_timer::Delay;
use libp2p_identity::Keypair;
use quickcheck::*;

use super::*;
use crate::{
config::Config, Behaviour, ConfigBuilder, IdentTopic as Topic, MessageAuthenticity, Version,
config::Config, types::RpcOut, Behaviour, ConfigBuilder, IdentTopic as Topic,
MessageAuthenticity, Version,
};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -652,10 +656,9 @@ mod tests {
fn prop(message: Message) {
let message = message.0;

let rpc = Rpc {
messages: vec![message.clone()],
subscriptions: vec![],
control_msgs: vec![],
let rpc = RpcOut::Publish {
message: message.clone(),
timeout: Delay::new(Duration::from_secs(1)),
};

let mut codec =
Expand Down
119 changes: 3 additions & 116 deletions protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,9 @@ impl From<RpcOut> for proto::RPC {
}
}

/// An RPC received/sent.
/// A Gossipsub RPC message received.
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Rpc {
pub struct RpcIn {
/// List of messages that were part of this RPC query.
pub messages: Vec<RawMessage>,
/// List of subscriptions.
Expand All @@ -483,120 +483,7 @@ pub struct Rpc {
pub control_msgs: Vec<ControlAction>,
}

impl Rpc {
/// Converts the GossipsubRPC into its protobuf format.
// A convenience function to avoid explicitly specifying types.
pub fn into_protobuf(self) -> proto::RPC {
self.into()
}
}

impl From<Rpc> for proto::RPC {
/// Converts the RPC into protobuf format.
fn from(rpc: Rpc) -> Self {
// Messages
let mut publish = Vec::new();

for message in rpc.messages.into_iter() {
let message = proto::Message {
from: message.source.map(|m| m.to_bytes()),
data: Some(message.data),
seqno: message.sequence_number.map(|s| s.to_be_bytes().to_vec()),
topic: TopicHash::into_string(message.topic),
signature: message.signature,
key: message.key,
};

publish.push(message);
}

// subscriptions
let subscriptions = rpc
.subscriptions
.into_iter()
.map(|sub| proto::SubOpts {
subscribe: Some(sub.action == SubscriptionAction::Subscribe),
topic_id: Some(sub.topic_hash.into_string()),
})
.collect::<Vec<_>>();

// control messages
let mut control = proto::ControlMessage {
ihave: Vec::new(),
iwant: Vec::new(),
graft: Vec::new(),
prune: Vec::new(),
idontwant: Vec::new(),
};

let empty_control_msg = rpc.control_msgs.is_empty();

for action in rpc.control_msgs {
match action {
// collect all ihave messages
ControlAction::IHave(IHave {
topic_hash,
message_ids,
}) => {
let rpc_ihave = proto::ControlIHave {
topic_id: Some(topic_hash.into_string()),
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.ihave.push(rpc_ihave);
}
ControlAction::IWant(IWant { message_ids }) => {
let rpc_iwant = proto::ControlIWant {
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.iwant.push(rpc_iwant);
}
ControlAction::Graft(Graft { topic_hash }) => {
let rpc_graft = proto::ControlGraft {
topic_id: Some(topic_hash.into_string()),
};
control.graft.push(rpc_graft);
}
ControlAction::Prune(Prune {
topic_hash,
peers,
backoff,
}) => {
let rpc_prune = proto::ControlPrune {
topic_id: Some(topic_hash.into_string()),
peers: peers
.into_iter()
.map(|info| proto::PeerInfo {
peer_id: info.peer_id.map(|id| id.to_bytes()),
// TODO, see https://github.com/libp2p/specs/pull/217
signed_peer_record: None,
})
.collect(),
backoff,
};
control.prune.push(rpc_prune);
}
ControlAction::IDontWant(IDontWant { message_ids }) => {
let rpc_idontwant = proto::ControlIDontWant {
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.idontwant.push(rpc_idontwant);
}
}
}

proto::RPC {
subscriptions,
publish,
control: if empty_control_msg {
None
} else {
Some(control)
},
}
}
}

impl fmt::Debug for Rpc {
impl fmt::Debug for RpcIn {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut b = f.debug_struct("GossipsubRpc");
if !self.messages.is_empty() {
Expand Down
Loading