Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How can I send messages larger than 65535 bytes? #272

Open
zkrising opened this issue Jul 16, 2023 · 3 comments
Open

How can I send messages larger than 65535 bytes? #272

zkrising opened this issue Jul 16, 2023 · 3 comments
Labels
question Further information is requested

Comments

@zkrising
Copy link

Full disclosure, I know little to nothing about networks at this layer.

I'm using matchbox_socket to send data between peers. One of the messages I'd like to send is fairly large (~200kb-1mb). This fails to send due to this error:

2023-07-16T00:24:53.516938Z ERROR matchbox_socket::webrtc_socket::native: error sending to data channel: Data(Sctp(ErrOutboundPacketTooLarge))    

Looking into this more shows that it happens in webrtc-data, and some wikipedia sleuthing says that sctp is fundamentally limited to 65535 bytes.

I would love to be able to send a message larger than that -- do I have to implement my own chunking and ordering? This seems like the sort of thing that someone's solved before, so am I missing some config option or something?

@simbleau
Copy link
Collaborator

simbleau commented Jul 30, 2023

You'll need to implement your own chunking and ordering, I suppose, yes. To my knowledge there's no config for this in matchbox. Perhaps in the underlying webrtc crate we use.

@simbleau simbleau added the question Further information is requested label Sep 18, 2023
@simbleau
Copy link
Collaborator

Do we want to handle this or expect the user to do this themselves? @johanhelsing

@Craig-Macomber
Copy link

Given that matchbox exposes the same API for reliable and unreliable channels, and automatic chunking of unreliable channels seems like a bad idea, I think it makes sense to not build this is as an automatic thing.

That said I can see multiple people needing such a utility. I wrote one for my project. Its not quite general purpose (it also handles serializing), but maybe it will be useful for people landing on this issue:

Feel free to use this under the MIT license.

//! Prefixes packets with a u32 (little endian) indicating how many additional packets (with no prefixes) should be merged onto the packet.

use std::marker::PhantomData;

use serde::{de::DeserializeOwned, Serialize};

pub trait ChunkyTransmit {
    fn max_message_size(&self) -> usize;
    fn transmit_packet(&mut self, packet: Box<[u8]>);
    fn chunked_transmit<T: Serialize>(&mut self, item: &T) {
        // Output vector, preloaded with 0 for extra packet count. May be overwritten if data needs extra packets.
        let serialized: Vec<u8> = vec![0, 0, 0, 0];
        let serialized = postcard::to_extend(item, serialized).unwrap();
        let max = self.max_message_size();
        if serialized.len() <= max {
            self.transmit_packet(serialized.into());
        } else {
            // Number of additional packets must fit in first packet.
            assert!(max >= 4);
            let mut chunks: Vec<Box<[u8]>> = serialized
                .chunks(max)
                .map(|chunk| {
                    let mut out = Vec::with_capacity(chunk.len());
                    out.extend_from_slice(chunk);
                    out.into_boxed_slice()
                })
                .collect();

            // Write header into first packet (over zeros reserved initially)
            let extra_packets = u32::try_from(chunks.len() - 1).unwrap();
            let prefix = extra_packets.to_le_bytes();
            for index in 0..prefix.len() {
                chunks[0][index] = prefix[index];
            }
            for packet in chunks {
                self.transmit_packet(packet);
            }
        }
    }
}

pub struct ChunkyReceive<T> {
    phantom: PhantomData<T>,
    buffer: Vec<u8>,
    packets_needed: u32,
}

impl<T> Default for ChunkyReceive<T> {
    fn default() -> Self {
        Self {
            phantom: Default::default(),
            buffer: Default::default(),
            packets_needed: Default::default(),
        }
    }
}

impl<T: DeserializeOwned> ChunkyReceive<T> {
    pub fn process_packet(&mut self, packet: &[u8]) -> Option<T> {
        if self.packets_needed == 0 {
            // beginning of a new object case

            let prefix: u32 = u32::from_le_bytes(packet[0..4].try_into().unwrap());
            let data = &packet[4..];
            if prefix == 0 {
                // Fast path single packet case saving a copy
                Some(postcard::from_bytes(data).unwrap())
            } else {
                self.buffer.extend_from_slice(data);
                self.packets_needed = prefix;
                None
            }
        } else {
            // continuing existing object case

            self.buffer.extend_from_slice(packet);
            self.packets_needed -= 1;
            if self.packets_needed == 0 {
                let result = postcard::from_bytes(&self.buffer).unwrap();
                self.buffer.truncate(0);
                Some(result)
            } else {
                None
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Default, Debug)]
    struct TestData {
        data: String,
    }

    #[derive(Default)]
    struct Buffer<const CHUNK_SIZE: usize>(Vec<Box<[u8]>>);

    impl<const CHUNK_SIZE: usize> ChunkyTransmit for Buffer<CHUNK_SIZE> {
        fn max_message_size(&self) -> usize {
            CHUNK_SIZE
        }

        fn transmit_packet(&mut self, packet: Box<[u8]>) {
            self.0.push(packet)
        }
    }

    #[test]
    fn round_trip_single_packet() {
        let data = TestData {
            data: "Hello World".to_string(),
        };

        let mut big_buffer = Buffer::<50>::default();
        big_buffer.chunked_transmit(&data);
        assert!(big_buffer.0.len() == 1);

        let mut receive = ChunkyReceive::<TestData>::default();
        let result = receive.process_packet(&big_buffer.0[0]);
        assert_eq!(result.unwrap(), data);

        // make sure reading a second item works
        let result = receive.process_packet(&big_buffer.0[0]);
        assert_eq!(result.unwrap(), data);
    }

    #[test]
    fn round_trip_multi_packet() {
        let data = TestData {
            data: "Hello World".to_string(),
        };

        let mut big_buffer = Buffer::<8>::default();
        big_buffer.chunked_transmit(&data);
        assert!(big_buffer.0.len() == 2);

        let mut receive = ChunkyReceive::<TestData>::default();
        // make sure reading two items works (state is reset after the first one correctly)
        for _ in 0..2 {
            let result = receive.process_packet(&big_buffer.0[0]);
            assert_eq!(result, None);

            let result = receive.process_packet(&big_buffer.0[1]);
            assert_eq!(result.unwrap(), data);
        }
    }
}
impl ChunkyTransmit for (&mut WebRtcSocket, PeerId) {
    fn max_message_size(&self) -> usize {
        // From https://github.com/johanhelsing/matchbox/issues/272
        65535
    }

    fn transmit_packet(&mut self, packet: Box<[u8]>) {
        self.0.send(packet, self.1);
    }
}

That will panic if you give it more than about 281 terabytes, but that's enough for my use-case.

If you are handling large data, you will likely want something like #453 so you can implement back pressure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants