Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ libp2p-core = { version = "0.43.1", path = "core" }
libp2p-dcutr = { version = "0.13.0", path = "protocols/dcutr" }
libp2p-dns = { version = "0.44.0", path = "transports/dns" }
libp2p-floodsub = { version = "0.46.1", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.49.0", path = "protocols/gossipsub" }
libp2p-gossipsub = { version = "0.50.0", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.47.0", path = "protocols/identify" }
libp2p-identity = { version = "0.2.12" }
libp2p-kad = { version = "0.47.1", path = "protocols/kad" }
Expand Down
5 changes: 5 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.50.0

- Remove `Rpc` from the public API.
See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX)

## 0.49.0
- Feature gate metrics related code. This changes some `Behaviour` constructor methods.
See [PR 6020](https://github.com/libp2p/rust-libp2p/pull/6020)
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-gossipsub"
edition.workspace = true
rust-version = { workspace = true }
description = "Gossipsub protocol for libp2p"
version = "0.49.0"
version = "0.50.0"
authors = ["Age Manning <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
20 changes: 10 additions & 10 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
protocol::GossipsubCodec,
rpc::Receiver,
subscription_filter::WhitelistSubscriptionFilter,
types::Rpc,
types::RpcIn,
IdentTopic as Topic,
};

Expand Down Expand Up @@ -313,7 +313,7 @@ where
}

// Converts a protobuf message into a gossipsub message for reading the Gossipsub event queue.
fn proto_to_message(rpc: &proto::RPC) -> Rpc {
fn proto_to_message(rpc: &proto::RPC) -> RpcIn {
// Store valid messages.
let mut messages = Vec::with_capacity(rpc.publish.len());
let rpc = rpc.clone();
Expand Down Expand Up @@ -403,7 +403,7 @@ fn proto_to_message(rpc: &proto::RPC) -> Rpc {
control_msgs.extend(prune_msgs);
}

Rpc {
RpcIn {
messages,
subscriptions: rpc
.subscriptions
Expand Down Expand Up @@ -1243,7 +1243,7 @@ fn test_handle_iwant_msg_but_already_sent_idontwant() {
gs.mcache.put(&msg_id, raw_message);

// Receive IDONTWANT from Peer 1.
let rpc = Rpc {
let rpc = RpcIn {
messages: vec![],
subscriptions: vec![],
control_msgs: vec![ControlAction::IDontWant(IDontWant {
Expand Down Expand Up @@ -3137,7 +3137,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {
p1,
ConnectionId::new_unchecked(0),
HandlerEvent::Message {
rpc: Rpc {
rpc: RpcIn {
messages: vec![raw_message1],
subscriptions: vec![subscription.clone()],
control_msgs: vec![control_action],
Expand All @@ -3163,7 +3163,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {
p2,
ConnectionId::new_unchecked(0),
HandlerEvent::Message {
rpc: Rpc {
rpc: RpcIn {
messages: vec![raw_message3],
subscriptions: vec![subscription],
control_msgs: vec![control_action],
Expand Down Expand Up @@ -3773,7 +3773,7 @@ fn test_scoring_p4_invalid_signature() {
peers[0],
ConnectionId::new_unchecked(0),
HandlerEvent::Message {
rpc: Rpc {
rpc: RpcIn {
messages: vec![],
subscriptions: vec![],
control_msgs: vec![],
Expand Down Expand Up @@ -5530,7 +5530,7 @@ fn parses_idontwant() {
.create_network();

let message_id = MessageId::new(&[0, 1, 2, 3]);
let rpc = Rpc {
let rpc = RpcIn {
messages: vec![],
subscriptions: vec![],
control_msgs: vec![ControlAction::IDontWant(IDontWant {
Expand Down Expand Up @@ -6619,7 +6619,7 @@ fn test_validation_error_message_size_too_large_topic_specific() {
peers[0],
ConnectionId::new_unchecked(0),
HandlerEvent::Message {
rpc: Rpc {
rpc: RpcIn {
messages: vec![raw_message],
subscriptions: vec![],
control_msgs: vec![],
Expand Down Expand Up @@ -6723,7 +6723,7 @@ fn test_validation_message_size_within_topic_specific() {
peers[0],
ConnectionId::new_unchecked(0),
HandlerEvent::Message {
rpc: Rpc {
rpc: RpcIn {
messages: vec![raw_message],
subscriptions: vec![],
control_msgs: vec![],
Expand Down
4 changes: 2 additions & 2 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{
protocol::{GossipsubCodec, ProtocolConfig},
rpc::Receiver,
rpc_proto::proto,
types::{PeerKind, RawMessage, Rpc, RpcOut},
types::{PeerKind, RawMessage, RpcIn, RpcOut},
ValidationError,
};

Expand All @@ -51,7 +51,7 @@ pub enum HandlerEvent {
/// any) that were received.
Message {
/// The GossipsubRPC message excluding any invalid messages.
rpc: Rpc,
rpc: RpcIn,
/// Any invalid messages that were received in the RPC, along with the associated
/// validation error.
invalid_messages: Vec<(RawMessage, ValidationError)>,
Expand Down
3 changes: 0 additions & 3 deletions protocols/gossipsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,5 @@ pub use self::{
types::{FailedMessages, Message, MessageAcceptance, MessageId, RawMessage},
};

#[deprecated(note = "Will be removed from the public API.")]
pub type Rpc = self::types::Rpc;

pub type IdentTopic = Topic<self::topic::IdentityHash>;
pub type Sha256Topic = Topic<self::topic::Sha256Hash>;
17 changes: 10 additions & 7 deletions protocols/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
topic::TopicHash,
types::{
ControlAction, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune,
RawMessage, Rpc, Subscription, SubscriptionAction,
RawMessage, RpcIn, Subscription, SubscriptionAction,
},
ValidationError,
};
Expand Down Expand Up @@ -564,7 +564,7 @@ impl Decoder for GossipsubCodec {
}

Ok(Some(HandlerEvent::Message {
rpc: Rpc {
rpc: RpcIn {
messages,
subscriptions: rpc
.subscriptions
Expand All @@ -587,12 +587,16 @@ impl Decoder for GossipsubCodec {

#[cfg(test)]
mod tests {
use std::time::Duration;

use futures_timer::Delay;
use libp2p_identity::Keypair;
use quickcheck::*;

use super::*;
use crate::{
config::Config, Behaviour, ConfigBuilder, IdentTopic as Topic, MessageAuthenticity, Version,
config::Config, types::RpcOut, Behaviour, ConfigBuilder, IdentTopic as Topic,
MessageAuthenticity, Version,
};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -652,10 +656,9 @@ mod tests {
fn prop(message: Message) {
let message = message.0;

let rpc = Rpc {
messages: vec![message.clone()],
subscriptions: vec![],
control_msgs: vec![],
let rpc = RpcOut::Publish {
message: message.clone(),
timeout: Delay::new(Duration::from_secs(1)),
};

let mut codec =
Expand Down
117 changes: 2 additions & 115 deletions protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ impl From<RpcOut> for proto::RPC {

/// An RPC received/sent.
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Rpc {
pub struct RpcIn {
/// List of messages that were part of this RPC query.
pub messages: Vec<RawMessage>,
/// List of subscriptions.
Expand All @@ -483,120 +483,7 @@ pub struct Rpc {
pub control_msgs: Vec<ControlAction>,
}

impl Rpc {
/// Converts the GossipsubRPC into its protobuf format.
// A convenience function to avoid explicitly specifying types.
pub fn into_protobuf(self) -> proto::RPC {
self.into()
}
}

impl From<Rpc> for proto::RPC {
/// Converts the RPC into protobuf format.
fn from(rpc: Rpc) -> Self {
// Messages
let mut publish = Vec::new();

for message in rpc.messages.into_iter() {
let message = proto::Message {
from: message.source.map(|m| m.to_bytes()),
data: Some(message.data),
seqno: message.sequence_number.map(|s| s.to_be_bytes().to_vec()),
topic: TopicHash::into_string(message.topic),
signature: message.signature,
key: message.key,
};

publish.push(message);
}

// subscriptions
let subscriptions = rpc
.subscriptions
.into_iter()
.map(|sub| proto::SubOpts {
subscribe: Some(sub.action == SubscriptionAction::Subscribe),
topic_id: Some(sub.topic_hash.into_string()),
})
.collect::<Vec<_>>();

// control messages
let mut control = proto::ControlMessage {
ihave: Vec::new(),
iwant: Vec::new(),
graft: Vec::new(),
prune: Vec::new(),
idontwant: Vec::new(),
};

let empty_control_msg = rpc.control_msgs.is_empty();

for action in rpc.control_msgs {
match action {
// collect all ihave messages
ControlAction::IHave(IHave {
topic_hash,
message_ids,
}) => {
let rpc_ihave = proto::ControlIHave {
topic_id: Some(topic_hash.into_string()),
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.ihave.push(rpc_ihave);
}
ControlAction::IWant(IWant { message_ids }) => {
let rpc_iwant = proto::ControlIWant {
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.iwant.push(rpc_iwant);
}
ControlAction::Graft(Graft { topic_hash }) => {
let rpc_graft = proto::ControlGraft {
topic_id: Some(topic_hash.into_string()),
};
control.graft.push(rpc_graft);
}
ControlAction::Prune(Prune {
topic_hash,
peers,
backoff,
}) => {
let rpc_prune = proto::ControlPrune {
topic_id: Some(topic_hash.into_string()),
peers: peers
.into_iter()
.map(|info| proto::PeerInfo {
peer_id: info.peer_id.map(|id| id.to_bytes()),
// TODO, see https://github.com/libp2p/specs/pull/217
signed_peer_record: None,
})
.collect(),
backoff,
};
control.prune.push(rpc_prune);
}
ControlAction::IDontWant(IDontWant { message_ids }) => {
let rpc_idontwant = proto::ControlIDontWant {
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.idontwant.push(rpc_idontwant);
}
}
}

proto::RPC {
subscriptions,
publish,
control: if empty_control_msg {
None
} else {
Some(control)
},
}
}
}

impl fmt::Debug for Rpc {
impl fmt::Debug for RpcIn {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut b = f.debug_struct("GossipsubRpc");
if !self.messages.is_empty() {
Expand Down