Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement connection resets #31

Merged
merged 21 commits into from
Nov 22, 2023
Merged
Prev Previous commit
Next Next commit
test cleanup
hawkw committed Nov 22, 2023
commit 0e766e641d32161172b5a833b0632f78c6671e09
152 changes: 69 additions & 83 deletions source/mgnp/src/tests/integration.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::*;
use crate::{
message::{self, InboundFrame, OutboundFrame},
Wire,
message::{self, DecodeError, DecodeErrorKind, Header, InboundFrame, OutboundFrame, Reset},
Id, Wire,
};
use tricky_pipe::serbox;

@@ -16,7 +16,7 @@ async fn reset_decode_error() {
let hello = hellobox.share(()).await;

wire.send(OutboundFrame::connect(
crate::Id::new(1),
Id::new(1),
svcs::hello_world_id(),
hello,
))
@@ -28,36 +28,33 @@ async fn reset_decode_error() {
assert_eq!(
msg,
Ok(InboundFrame {
header: message::Header::Ack {
local_id: crate::Id::new(1),
remote_id: crate::Id::new(1),
header: Header::Ack {
local_id: Id::new(1),
remote_id: Id::new(1),
},
body: &[]
})
);

let mut out_frame = postcard::to_allocvec(&message::Header::Data {
local_id: crate::Id::new(1),
remote_id: crate::Id::new(1),
let mut out_frame = postcard::to_allocvec(&Header::Data {
local_id: Id::new(1),
remote_id: Id::new(1),
})
.unwrap();
out_frame.extend(&[0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff]);

wire.send_bytes(out_frame).await.unwrap();

let frame = wire.recv().await.unwrap();
let msg = InboundFrame::from_bytes(&frame[..]);
assert_eq!(
msg,
Ok(InboundFrame {
header: message::Header::Reset {
remote_id: crate::Id::new(1),
reason: message::Reset::YouDoneGoofed(message::DecodeError::Body(
message::DecodeErrorKind::UnexpectedEnd
))
expect_inbound_frame(
frame,
InboundFrame {
header: Header::Reset {
remote_id: Id::new(1),
reason: Reset::YouDoneGoofed(DecodeError::Body(DecodeErrorKind::UnexpectedEnd)),
},
body: &[]
})
body: &[],
},
);
}

@@ -72,7 +69,7 @@ async fn reset_no_such_conn() {
let hello = hellobox.share(()).await;

wire.send(OutboundFrame::connect(
crate::Id::new(1),
Id::new(1),
svcs::hello_world_id(),
hello,
))
@@ -84,9 +81,9 @@ async fn reset_no_such_conn() {
assert_eq!(
msg,
Ok(InboundFrame {
header: message::Header::Ack {
local_id: crate::Id::new(1),
remote_id: crate::Id::new(1),
header: Header::Ack {
local_id: Id::new(1),
remote_id: Id::new(1),
},
body: &[]
})
@@ -95,25 +92,27 @@ async fn reset_no_such_conn() {
let chan = tricky_pipe::mpsc::TrickyPipe::new(8);
let rx = chan.ser_receiver().unwrap();
let tx = chan.sender();
tx.try_send(svcs::HelloWorldRequest {
hello: "hello".into(),
})
.unwrap();

let body = rx.try_recv().unwrap();

let out_frame = {
let data_frame = |header: Header| {
tx.try_send(svcs::HelloWorldRequest {
hello: "hello".into(),
})
.expect("send should just work");
let body = rx.try_recv().expect("recv should just work");
let frame = OutboundFrame {
header: message::Header::Data {
local_id: crate::Id::new(1),
remote_id: crate::Id::new(1), // good conn ID
},
header,
body: message::OutboundData::Data(body),
};
frame.to_vec().unwrap()
tracing::info!(frame = %format_args!("{frame:#?}"), "OUTBOUND FRAME");
frame.to_vec().expect("frame must serialize")
};

wire.send_bytes(out_frame).await.unwrap();
wire.send_bytes(data_frame(Header::Data {
local_id: Id::new(1), // known good ID
remote_id: Id::new(1),
}))
.await
.unwrap();

let frame = wire.recv().await.unwrap();
let msg = InboundFrame::from_bytes(&frame[..]).unwrap();
@@ -125,62 +124,49 @@ async fn reset_no_such_conn() {
);

// another message, with a bad conn ID
tx.try_send(svcs::HelloWorldRequest {
hello: "hello".into(),
})
.unwrap();
let body = rx.try_recv().unwrap();
let out_frame = OutboundFrame {
header: message::Header::Data {
remote_id: crate::Id::new(666), // bad conn ID
local_id: crate::Id::new(1),
},
body: message::OutboundData::Data(body),
}
.to_vec()
wire.send_bytes(data_frame(Header::Data {
local_id: Id::new(1),
remote_id: Id::new(666), // bad conn ID
}))
.await
.unwrap();

wire.send_bytes(out_frame).await.unwrap();
let frame = wire.recv().await.unwrap();
let msg = dbg!(InboundFrame::from_bytes(&frame[..]));
assert_eq!(
msg,
Ok(InboundFrame {
header: message::Header::Reset {
remote_id: crate::Id::new(1),
reason: message::Reset::NoSuchConn,
expect_inbound_frame(
frame,
InboundFrame {
header: Header::Reset {
remote_id: Id::new(1),
reason: Reset::NoSuchConn,
},
body: &[]
})
body: &[],
},
);

// another message, with a differently conn ID
tx.try_send(svcs::HelloWorldRequest {
hello: "hello".into(),
})
.unwrap();
let body = rx.try_recv().unwrap();
let out_frame = OutboundFrame {
header: message::Header::Data {
remote_id: crate::Id::new(1),
local_id: crate::Id::new(666), // bad conn ID
},
body: message::OutboundData::Data(body),
}
.to_vec()
wire.send_bytes(data_frame(Header::Data {
local_id: Id::new(666), // bad conn ID
remote_id: Id::new(1),
}))
.await
.unwrap();

wire.send_bytes(out_frame).await.unwrap();
let frame = wire.recv().await.unwrap();
let msg = dbg!(InboundFrame::from_bytes(&frame[..]));
assert_eq!(
msg,
Ok(InboundFrame {
header: message::Header::Reset {
remote_id: crate::Id::new(666),
reason: message::Reset::NoSuchConn,
expect_inbound_frame(
frame,
InboundFrame {
header: Header::Reset {
remote_id: Id::new(666),
reason: Reset::NoSuchConn,
},
body: &[]
})
body: &[],
},
);
}

#[track_caller]
fn expect_inbound_frame(frame: impl AsRef<[u8]>, expected: InboundFrame<'_>) {
let decoded = InboundFrame::from_bytes(frame.as_ref());
tracing::info!(frame = %format_args!("{decoded:#?}"), "INBOUND FRAME");
assert_eq!(decoded, Ok(expected));
}