Skip to content

Commit

Permalink
[#52] refactored Packet trait, replaced with DecodablePacket
Browse files Browse the repository at this point in the history
- splits Packet into EncodablePacket and DecodablePacket
- removes default payload() and payload_ref() methods
- fixes encoded_length() implementations for all packets
- add PublishPacketRef for performance optimization
  • Loading branch information
zonyitoo committed Feb 20, 2021
1 parent 1579d72 commit 58a3b00
Show file tree
Hide file tree
Showing 18 changed files with 211 additions and 228 deletions.
2 changes: 1 addition & 1 deletion examples/sub-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ fn main() {
info!("Receiving PINGRESP from broker ..");
}
VariablePacket::PublishPacket(ref publ) => {
let msg = match str::from_utf8(&publ.payload_ref()[..]) {
let msg = match str::from_utf8(publ.payload()) {
Ok(msg) => msg,
Err(err) => {
error!("Failed to decode publish message {:?}", err);
Expand Down
16 changes: 8 additions & 8 deletions src/encodable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ pub trait Encodable {
fn encoded_length(&self) -> u32;
}

impl<T: Encodable> Encodable for &T {
fn encode<W: Write>(&self, writer: &mut W) -> io::Result<()> {
(**self).encode(writer)
}
fn encoded_length(&self) -> u32 {
(**self).encoded_length()
}
}
// impl<T: Encodable> Encodable for &T {
// fn encode<W: Write>(&self, writer: &mut W) -> io::Result<()> {
// (**self).encode(writer)
// }
// fn encoded_length(&self) -> u32 {
// (**self).encoded_length()
// }
// }

impl<T: Encodable> Encodable for Option<T> {
fn encode<W: Write>(&self, writer: &mut W) -> io::Result<()> {
Expand Down
15 changes: 2 additions & 13 deletions src/packet/connack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io::Read;

use crate::control::variable_header::{ConnackFlags, ConnectReturnCode};
use crate::control::{ControlType, FixedHeader, PacketType};
use crate::packet::{Packet, PacketError};
use crate::packet::{DecodablePacket, PacketError};
use crate::Decodable;

/// `CONNACK` packet
Expand All @@ -13,7 +13,6 @@ pub struct ConnackPacket {
fixed_header: FixedHeader,
flags: ConnackFlags,
ret_code: ConnectReturnCode,
payload: (),
}

encodable_packet!(ConnackPacket(flags, ret_code));
Expand All @@ -24,7 +23,6 @@ impl ConnackPacket {
fixed_header: FixedHeader::new(PacketType::with_default(ControlType::ConnectAcknowledgement), 2),
flags: ConnackFlags { session_present },
ret_code,
payload: (),
}
}

Expand All @@ -37,17 +35,9 @@ impl ConnackPacket {
}
}

impl Packet for ConnackPacket {
impl DecodablePacket for ConnackPacket {
type Payload = ();

fn payload(self) -> Self::Payload {
self.payload
}

fn payload_ref(&self) -> &Self::Payload {
&self.payload
}

fn decode_packet<R: Read>(reader: &mut R, fixed_header: FixedHeader) -> Result<Self, PacketError<Self>> {
let flags: ConnackFlags = Decodable::decode(reader)?;
let code: ConnectReturnCode = Decodable::decode(reader)?;
Expand All @@ -56,7 +46,6 @@ impl Packet for ConnackPacket {
fixed_header,
flags,
ret_code: code,
payload: (),
})
}
}
Expand Down
14 changes: 3 additions & 11 deletions src/packet/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::control::variable_header::protocol_level::SPEC_3_1_1;
use crate::control::variable_header::{ConnectFlags, KeepAlive, ProtocolLevel, ProtocolName, VariableHeaderError};
use crate::control::{ControlType, FixedHeader, PacketType};
use crate::encodable::VarBytes;
use crate::packet::{Packet, PacketError};
use crate::packet::{DecodablePacket, PacketError};
use crate::topic_name::{TopicName, TopicNameDecodeError, TopicNameError};
use crate::{Decodable, Encodable};

Expand All @@ -23,7 +23,7 @@ pub struct ConnectPacket {
payload: ConnectPacketPayload,
}

encodable_packet!(ConnectPacket(protocol_name, protocol_level, flags, keep_alive));
encodable_packet!(ConnectPacket(protocol_name, protocol_level, flags, keep_alive, payload));

impl ConnectPacket {
pub fn new<C>(client_identifier: C) -> ConnectPacket
Expand Down Expand Up @@ -151,17 +151,9 @@ impl ConnectPacket {
}
}

impl Packet for ConnectPacket {
impl DecodablePacket for ConnectPacket {
type Payload = ConnectPacketPayload;

fn payload(self) -> ConnectPacketPayload {
self.payload
}

fn payload_ref(&self) -> &ConnectPacketPayload {
&self.payload
}

fn decode_packet<R: Read>(reader: &mut R, fixed_header: FixedHeader) -> Result<Self, PacketError<Self>> {
let protoname: ProtocolName = Decodable::decode(reader)?;
let protocol_level: ProtocolLevel = Decodable::decode(reader)?;
Expand Down
19 changes: 3 additions & 16 deletions src/packet/disconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
use std::io::Read;

use crate::control::{ControlType, FixedHeader, PacketType};
use crate::packet::{Packet, PacketError};
use crate::packet::{DecodablePacket, PacketError};

/// `DISCONNECT` packet
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct DisconnectPacket {
fixed_header: FixedHeader,
payload: (),
}

encodable_packet!(DisconnectPacket());
Expand All @@ -18,7 +17,6 @@ impl DisconnectPacket {
pub fn new() -> DisconnectPacket {
DisconnectPacket {
fixed_header: FixedHeader::new(PacketType::with_default(ControlType::Disconnect), 0),
payload: (),
}
}
}
Expand All @@ -29,21 +27,10 @@ impl Default for DisconnectPacket {
}
}

impl Packet for DisconnectPacket {
impl DecodablePacket for DisconnectPacket {
type Payload = ();

fn payload(self) -> Self::Payload {
self.payload
}

fn payload_ref(&self) -> &Self::Payload {
&self.payload
}

fn decode_packet<R: Read>(_reader: &mut R, fixed_header: FixedHeader) -> Result<Self, PacketError<Self>> {
Ok(DisconnectPacket {
fixed_header,
payload: (),
})
Ok(DisconnectPacket { fixed_header })
}
}
132 changes: 95 additions & 37 deletions src/packet/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Specific packets
use std::fmt;
use std::fmt::{self, Debug};
use std::io::{self, Read, Write};

#[cfg(feature = "tokio")]
Expand All @@ -15,29 +15,28 @@ use crate::{Decodable, Encodable};

macro_rules! encodable_packet {
($typ:ident($($field:ident),* $(,)?)) => {
impl $crate::encodable::Encodable for $typ {
fn encode<W: std::io::Write>(&self, writer: &mut W) -> std::io::Result<()> {
$crate::encodable::Encodable::encode(&self.fixed_header, writer)?;
impl $crate::packet::EncodablePacket for $typ {
fn fixed_header(&self) -> &$crate::control::fixed_header::FixedHeader {
&self.fixed_header
}

#[allow(unused)]
fn encode_packet<W: ::std::io::Write>(&self, writer: &mut W) -> ::std::io::Result<()> {
$($crate::encodable::Encodable::encode(&self.$field, writer)?;)*
$crate::encodable::Encodable::encode(&self.payload, writer)?;
Ok(())
}

fn encoded_length(&self) -> u32 {
$crate::encodable::Encodable::encoded_length(&self.fixed_header)
fn encoded_packet_length(&self) -> u32 {
$($crate::encodable::Encodable::encoded_length(&self.$field) +)*
0
}
}

impl $crate::packet::EncodablePacket for $typ {}

impl $typ {
fn encoded_length_noheader(&self) -> u32 {
$($crate::encodable::Encodable::encoded_length(&self.$field) +)*
$crate::encodable::Encodable::encoded_length(&self.payload)
}
#[allow(unused)]
#[inline(always)]
fn fix_header_remaining_len(&mut self) {
self.fixed_header.remaining_length = self.encoded_length_noheader()
self.fixed_header.remaining_length = $crate::packet::EncodablePacket::encoded_packet_length(self);
}
}
};
Expand Down Expand Up @@ -79,24 +78,40 @@ pub mod unsubscribe;
/// `&FooPacket`. Different from [`Encodable`] in that it prevents you from accidentally passing
/// a type intended to be encoded only as a part of a packet and doesn't have a header, e.g.
/// `Vec<u8>`.
pub trait EncodablePacket: Encodable {}
pub trait EncodablePacket {
/// Get a reference to `FixedHeader`. All MQTT packet must have a fixed header.
fn fixed_header(&self) -> &FixedHeader;

impl<T: EncodablePacket> EncodablePacket for &T {}
/// Encodes packet data after fixed header, including variable headers and payload
fn encode_packet<W: Write>(&self, _writer: &mut W) -> io::Result<()> {
Ok(())
}

/// Length in bytes for data after fixed header, including variable headers and payload
fn encoded_packet_length(&self) -> u32 {
0
}
}

impl<T: EncodablePacket> Encodable for T {
fn encode<W: Write>(&self, writer: &mut W) -> io::Result<()> {
self.fixed_header().encode(writer)?;
self.encode_packet(writer)
}

/// Methods for encoding and decoding a packet
pub trait Packet: Encodable + fmt::Debug + Sized + 'static {
type Payload: Encodable + Decodable;
fn encoded_length(&self) -> u32 {
self.fixed_header().encoded_length() + self.encoded_packet_length()
}
}

/// Get the payload
fn payload(self) -> Self::Payload;
/// Get a borrow of payload
fn payload_ref(&self) -> &Self::Payload;
pub trait DecodablePacket: EncodablePacket + Sized {
type Payload: Decodable + 'static;

/// Decode packet given a `FixedHeader`
fn decode_packet<R: Read>(reader: &mut R, fixed_header: FixedHeader) -> Result<Self, PacketError<Self>>;
}

impl<T: Packet> Decodable for T {
impl<T: DecodablePacket> Decodable for T {
type Error = PacketError<T>;
type Cond = Option<FixedHeader>;

Expand All @@ -107,22 +122,41 @@ impl<T: Packet> Decodable for T {
Decodable::decode(reader)?
};

<Self as Packet>::decode_packet(reader, fixed_header)
<Self as DecodablePacket>::decode_packet(reader, fixed_header)
}
}

/// Parsing errors for packet
#[derive(Debug, thiserror::Error)]
#[derive(thiserror::Error)]
#[error(transparent)]
pub enum PacketError<P: Packet> {
pub enum PacketError<P>
where
P: DecodablePacket,
{
FixedHeaderError(#[from] FixedHeaderError),
VariableHeaderError(#[from] VariableHeaderError),
PayloadError(<<P as Packet>::Payload as Decodable>::Error),
PayloadError(<<P as DecodablePacket>::Payload as Decodable>::Error),
IoError(#[from] io::Error),
TopicNameError(#[from] TopicNameError),
}

impl<P: Packet> From<TopicNameDecodeError> for PacketError<P> {
impl<P> Debug for PacketError<P>
where
P: DecodablePacket,
<P::Payload as Decodable>::Error: Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
PacketError::FixedHeaderError(ref e) => f.debug_tuple("FixedHeaderError").field(e).finish(),
PacketError::VariableHeaderError(ref e) => f.debug_tuple("VariableHeaderError").field(e).finish(),
PacketError::PayloadError(ref e) => f.debug_tuple("PayloadError").field(e).finish(),
PacketError::IoError(ref e) => f.debug_tuple("IoError").field(e).finish(),
PacketError::TopicNameError(ref e) => f.debug_tuple("TopicNameError").field(e).finish(),
}
}
}

impl<P: DecodablePacket> From<TopicNameDecodeError> for PacketError<P> {
fn from(e: TopicNameDecodeError) -> Self {
match e {
TopicNameDecodeError::IoError(e) => e.into(),
Expand Down Expand Up @@ -162,7 +196,7 @@ macro_rules! impl_variable_packet {
match fixed_header.packet_type.control_type {
$(
ControlType::$hdr => {
let pk = <$name as Packet>::decode_packet(rdr, fixed_header)?;
let pk = <$name as DecodablePacket>::decode_packet(rdr, fixed_header)?;
Ok(VariablePacket::$name(pk))
}
)+
Expand All @@ -177,25 +211,49 @@ macro_rules! impl_variable_packet {
}
)+

impl Encodable for VariablePacket {
fn encode<W: Write>(&self, writer: &mut W) -> Result<(), io::Error> {
// impl Encodable for VariablePacket {
// fn encode<W: Write>(&self, writer: &mut W) -> Result<(), io::Error> {
// match *self {
// $(
// VariablePacket::$name(ref pk) => pk.encode(writer),
// )+
// }
// }

// fn encoded_length(&self) -> u32 {
// match *self {
// $(
// VariablePacket::$name(ref pk) => pk.encoded_length(),
// )+
// }
// }
// }

impl EncodablePacket for VariablePacket {
fn fixed_header(&self) -> &FixedHeader {
match *self {
$(
VariablePacket::$name(ref pk) => pk.encode(writer),
VariablePacket::$name(ref pk) => pk.fixed_header(),
)+
}
}

fn encoded_length(&self) -> u32 {
fn encode_packet<W: Write>(&self, writer: &mut W) -> io::Result<()> {
match *self {
$(
VariablePacket::$name(ref pk) => pk.encoded_length(),
VariablePacket::$name(ref pk) => pk.encode_packet(writer),
)+
}
}
}

impl EncodablePacket for VariablePacket {}
fn encoded_packet_length(&self) -> u32 {
match *self {
$(
VariablePacket::$name(ref pk) => pk.encoded_packet_length(),
)+
}
}
}

impl Decodable for VariablePacket {
type Error = VariablePacketError;
Expand Down
Loading

0 comments on commit 58a3b00

Please sign in to comment.