Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement connection resets #31

Merged
merged 21 commits into from
Nov 22, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add test that a service shutting down sends reset
hawkw committed Nov 22, 2023
commit 9e6f47c3cf01a6bd9a559a6f6642d409b5d1cf75
114 changes: 90 additions & 24 deletions source/mgnp/src/tests/e2e.rs
Original file line number Diff line number Diff line change
@@ -15,9 +15,7 @@ async fn basically_works() {

let chan = connect(&mut connector, "hello-world", ()).await;
chan.tx()
.send(HelloWorldRequest {
hello: "hello".to_string(),
})
.send(svcs::hello_req("hello"))
.await
.expect("send request");
let rsp = chan.rx().recv().await;
@@ -52,9 +50,7 @@ async fn hellos_work() {
.await;

chan.tx()
.send(HelloWorldRequest {
hello: "hello".to_string(),
})
.send(svcs::hello_req("hello"))
.await
.expect("send request");
let rsp = chan.rx().recv().await;
@@ -102,9 +98,7 @@ async fn nak_bad_hello() {

// the good connection should stil lwork
chan.tx()
.send(HelloWorldRequest {
hello: "hello".to_string(),
})
.send(svcs::hello_req("hello"))
.await
.expect("send request");
let rsp = chan.rx().recv().await;
@@ -134,12 +128,8 @@ async fn mux_single_service() {
let chan2 = connect(&mut connector, "hello-world", ()).await;

tokio::try_join! {
chan1.tx().send(HelloWorldRequest {
hello: "hello".to_string(),
}),
chan2.tx().send(HelloWorldRequest {
hello: "hello".to_string(),
})
chan1.tx().send(svcs::hello_req("hello")),
chan2.tx().send(svcs::hello_req("hello"))
}
.expect("send should work");

@@ -218,9 +208,7 @@ async fn service_type_routing() {

helloworld_chan
.tx()
.send(HelloWorldRequest {
hello: "hello".to_string(),
})
.send(svcs::hello_req("hello"))
.await
.expect("send request");
let rsp = helloworld_chan.rx().recv().await;
@@ -245,17 +233,13 @@ async fn service_type_routing() {

hellohello_chan
.tx()
.send(HelloWorldRequest {
hello: "hello".to_string(),
})
.send(svcs::hello_req("hello"))
.await
.expect("send request");

helloworld_chan
.tx()
.send(HelloWorldRequest {
hello: "hello".to_string(),
})
.send(svcs::hello_req("hello"))
.await
.expect("send request");

@@ -355,3 +339,85 @@ async fn service_identity_routing() {
})
);
}

#[tokio::test]
async fn reset_closed() {
let remote_registry: TestRegistry = TestRegistry::default();
let conns = remote_registry.add_service(svcs::hello_world_id());
let shutdown = Arc::new(tokio::sync::Notify::new());

tokio::spawn(svcs::serve_hello_with_shutdown(
"hello",
"world",
conns,
shutdown.clone(),
));

let fixture = Fixture::new()
.spawn_local(Default::default())
.spawn_remote(remote_registry);

let mut connector = fixture.local_iface().connector::<svcs::HelloWorld>();

let chan1 = connect(&mut connector, "hello-world", ()).await;

let chan2 = connect(&mut connector, "hello-world", ()).await;

tokio::try_join! {
chan1.tx().send(svcs::hello_req("hello")),
chan2.tx().send(svcs::hello_req("hello"))
}
.expect("send should work");

let (rsp1, rsp2) = tokio::join! {
chan1.rx().recv(),
chan2.rx().recv(),
};

assert_eq!(
rsp1,
Ok(HelloWorldResponse {
world: "world".to_string()
})
);
assert_eq!(
rsp2,
Ok(HelloWorldResponse {
world: "world".to_string()
})
);

// now shut down the remote service
tracing::info!("");
tracing::info!("!!! shutting down remote service !!!");
tracing::info!("");
shutdown.notify_waiters();

let send2 = tokio::join! {
chan1.tx().send(svcs::hello_req("hello")),
chan2.tx().send(svcs::hello_req("hello"))
};

let _ = dbg!(send2);

let (rsp1, rsp2) = tokio::join! {
chan1.rx().recv(),
chan2.rx().recv(),
};

assert_eq!(
dbg!(rsp1),
Err(tricky_pipe::mpsc::error::RecvError::Error(
Reset::BecauseISaidSo
))
);

assert_eq!(
dbg!(rsp2),
Err(tricky_pipe::mpsc::error::RecvError::Error(
Reset::BecauseISaidSo
))
);

fixture.finish_test().await;
}
70 changes: 56 additions & 14 deletions source/mgnp/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ use crate::{
use std::{
collections::HashMap,
fmt,
future::Future,
sync::{Arc, RwLock},
};
use tokio::sync::{mpsc, oneshot, Notify};
@@ -66,42 +67,76 @@ pub(crate) mod svcs {
registry::Identity::from_name::<HelloWorld>("hello-world")
}

pub fn hello_req(hello: impl ToString) -> HelloWorldRequest {
HelloWorldRequest {
hello: hello.to_string(),
}
}

#[tracing::instrument(level = tracing::Level::INFO, skip(conns))]
pub async fn serve_hello(
req_msg: &'static str,
rsp_msg: &'static str,
conns: mpsc::Receiver<InboundConnect>,
) {
let shutdown = Arc::new(tokio::sync::Notify::new());
serve_hello_with_shutdown(req_msg, rsp_msg, conns, shutdown).await
}

#[tracing::instrument(level = tracing::Level::INFO, skip(conns, shutdown))]
pub async fn serve_hello_with_shutdown(
req_msg: &'static str,
rsp_msg: &'static str,
mut conns: mpsc::Receiver<InboundConnect>,
shutdown: Arc<tokio::sync::Notify>,
) {
let mut worker = 1;
while let Some(req) = conns.recv().await {
let InboundConnect { hello, rsp } = req;
tracing::info!(?hello, "hello world service received connection");
let (their_chan, my_chan) =
make_bidis::<svcs::HelloWorldRequest, svcs::HelloWorldResponse>(8);
tokio::spawn(hello_worker(worker, req_msg, rsp_msg, my_chan));
tokio::spawn(hello_worker(
worker,
req_msg,
rsp_msg,
my_chan,
shutdown.clone(),
));
worker += 1;
let sent = rsp.send(Ok(their_chan)).is_ok();
tracing::debug!(?sent);
}
}

#[tracing::instrument(level = tracing::Level::INFO, skip(chan))]
pub(super) async fn hello_worker(
#[tracing::instrument(level = tracing::Level::INFO, skip(chan, shutdown))]
pub async fn hello_worker(
worker: usize,
req_msg: &'static str,
rsp_msg: &'static str,
chan: BiDi<svcs::HelloWorldRequest, svcs::HelloWorldResponse, Reset>,
shutdown: Arc<tokio::sync::Notify>,
) {
tracing::debug!("hello world worker {worker} running...");
while let Ok(req) = chan.rx().recv().await {
tracing::info!(?req);
assert_eq!(req.hello, req_msg);
chan.tx()
.send(svcs::HelloWorldResponse {
world: rsp_msg.into(),
})
.await
.unwrap();
loop {
tokio::select! {
biased;
_ = shutdown.notified() => {
tracing::info!("worker shutting down!");
return;
}
req = chan.rx().recv() => {
tracing::info!(?req);
assert_eq!(req.expect("request should be Ok").hello, req_msg);
chan.tx()
.send(svcs::HelloWorldResponse {
world: rsp_msg.into(),
})
.await
.unwrap();
}

}
}
}
}
@@ -144,7 +179,7 @@ impl<T, E> Fixture<T, E> {
registry,
TrickyPipe::new(8),
);
let task = tokio::spawn(interface("name", machine, test_done.clone()));
let task = tokio::spawn(interface(name, machine, test_done.clone()));
(iface, task)
}
}
@@ -354,6 +389,7 @@ impl TestRegistry {
let mut chan = self.add_service(svcs::hello_with_hello_id());
tokio::spawn(
async move {
let shutdown = Arc::new(Notify::new());
let mut worker = 1;
while let Some(req) = chan.recv().await {
let InboundConnect { hello, rsp } = req;
@@ -364,7 +400,13 @@ impl TestRegistry {
tracing::info!(?hello, "hellohello service received hello");
let (their_chan, my_chan) =
make_bidis::<svcs::HelloWorldRequest, svcs::HelloWorldResponse>(8);
tokio::spawn(svcs::hello_worker(worker, "hello", "world", my_chan));
tokio::spawn(svcs::hello_worker(
worker,
"hello",
"world",
my_chan,
shutdown.clone(),
));
worker += 1;
Ok(their_chan)
} else {