Skip to content

Commit 7e13aa3

Browse files
matheus23flub
andauthored
chore(iroh): Add echo-no-router.rs example (#3267)
## Description Adds an example showing how to implement a basic protocol without using a `Router`. As a bonus, one can compare what `echo.rs` and `echo-no-router.rs` are doing to see the difference. ## Notes & open questions I think this is slightly useful, but YMMV. I don't think it's a lot of work to maintain though. ## Change checklist <!-- Remove any that are not relevant. --> - [x] Self-review. --------- Co-authored-by: Floris Bruynooghe <[email protected]>
1 parent 792e6c4 commit 7e13aa3

File tree

3 files changed

+139
-4
lines changed

3 files changed

+139
-4
lines changed

iroh/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,10 @@ required-features = ["examples"]
209209
name = "echo"
210210
required-features = ["examples"]
211211

212+
[[example]]
213+
name = "echo-no-router"
214+
required-features = ["examples"]
215+
212216
[[example]]
213217
name = "transfer"
214218
required-features = ["examples"]

iroh/examples/echo-no-router.rs

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
//! Very basic example showing how to implement a basic echo protocol,
2+
//! without using the `Router` API. (For the router version, check out the echo.rs example.)
3+
//!
4+
//! The echo protocol echos any data sent to it in the first stream.
5+
//!
6+
//! ## Running the Example
7+
//!
8+
//! cargo run --example echo-no-router --features=examples
9+
10+
use anyhow::Result;
11+
use iroh::{Endpoint, NodeAddr};
12+
13+
/// Each protocol is identified by its ALPN string.
14+
///
15+
/// The ALPN, or application-layer protocol negotiation, is exchanged in the connection handshake,
16+
/// and the connection is aborted unless both nodes pass the same bytestring.
17+
const ALPN: &[u8] = b"iroh-example/echo/0";
18+
19+
#[tokio::main]
20+
async fn main() -> Result<()> {
21+
let endpoint = start_accept_side().await?;
22+
let node_addr = endpoint.node_addr().await?;
23+
24+
connect_side(node_addr).await?;
25+
26+
// This makes sure the endpoint is closed properly and connections close gracefully
27+
// and will indirectly close the tasks spawned by `start_accept_side`.
28+
endpoint.close().await;
29+
30+
Ok(())
31+
}
32+
33+
async fn connect_side(addr: NodeAddr) -> Result<()> {
34+
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
35+
36+
// Open a connection to the accepting node
37+
let conn = endpoint.connect(addr, ALPN).await?;
38+
39+
// Open a bidirectional QUIC stream
40+
let (mut send, mut recv) = conn.open_bi().await?;
41+
42+
// Send some data to be echoed
43+
send.write_all(b"Hello, world!").await?;
44+
45+
// Signal the end of data for this particular stream
46+
send.finish()?;
47+
48+
// Receive the echo, but limit reading up to maximum 1000 bytes
49+
let response = recv.read_to_end(1000).await?;
50+
assert_eq!(&response, b"Hello, world!");
51+
52+
// Explicitly close the whole connection.
53+
conn.close(0u32.into(), b"bye!");
54+
55+
// The above call only queues a close message to be sent (see how it's not async!).
56+
// We need to actually call this to make sure this message is sent out.
57+
endpoint.close().await;
58+
// If we don't call this, but continue using the endpoint, then the queued
59+
// close call will eventually be picked up and sent.
60+
// But always try to wait for endpoint.close().await to go through before dropping
61+
// the endpoint to ensure any queued messages are sent through and connections are
62+
// closed gracefully.
63+
64+
Ok(())
65+
}
66+
67+
async fn start_accept_side() -> Result<Endpoint> {
68+
let endpoint = Endpoint::builder()
69+
.discovery_n0()
70+
// The accept side needs to opt-in to the protocols it accepts,
71+
// as any connection attempts that can't be found with a matching ALPN
72+
// will be rejected.
73+
.alpns(vec![ALPN.to_vec()])
74+
.bind()
75+
.await?;
76+
77+
// spawn a task so that `start_accept_side` returns immediately and we can continue in main().
78+
tokio::spawn({
79+
let endpoint = endpoint.clone();
80+
async move {
81+
// This task won't leak, because we call `endpoint.close()` in `main()`,
82+
// which causes `endpoint.accept().await` to return `None`.
83+
// In a more serious environment, we recommend avoiding `tokio::spawn` and use either a `TaskTracker` or
84+
// `JoinSet` instead to make sure you're not accidentally leaking tasks.
85+
while let Some(incoming) = endpoint.accept().await {
86+
// spawn a task for each incoming connection, so we can serve multiple connections asynchronously
87+
tokio::spawn(async move {
88+
let connection = incoming.await?;
89+
90+
// We can get the remote's node id from the connection.
91+
let node_id = connection.remote_node_id()?;
92+
println!("accepted connection from {node_id}");
93+
94+
// Our protocol is a simple request-response protocol, so we expect the
95+
// connecting peer to open a single bi-directional stream.
96+
let (mut send, mut recv) = connection.accept_bi().await?;
97+
98+
// Echo any bytes received back directly.
99+
// This will keep copying until the sender signals the end of data on the stream.
100+
let bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;
101+
println!("Copied over {bytes_sent} byte(s)");
102+
103+
// By calling `finish` on the send stream we signal that we will not send anything
104+
// further, which makes the receive stream on the other end terminate.
105+
send.finish()?;
106+
107+
// Wait until the remote closes the connection, which it does once it
108+
// received the response.
109+
connection.closed().await;
110+
111+
anyhow::Ok(())
112+
});
113+
}
114+
115+
anyhow::Ok(())
116+
}
117+
});
118+
119+
Ok(endpoint)
120+
}

iroh/examples/echo.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ const ALPN: &[u8] = b"iroh-example/echo/0";
2222

2323
#[tokio::main]
2424
async fn main() -> Result<()> {
25-
let router = accept_side().await?;
25+
let router = start_accept_side().await?;
2626
let node_addr = router.endpoint().node_addr().await?;
2727

2828
connect_side(node_addr).await?;
2929

30+
// This makes sure the endpoint in the router is closed properly and connections close gracefully
3031
router.shutdown().await?;
3132

3233
Ok(())
@@ -44,19 +45,28 @@ async fn connect_side(addr: NodeAddr) -> Result<()> {
4445
// Send some data to be echoed
4546
send.write_all(b"Hello, world!").await?;
4647

47-
// Signal the end of transfer.
48+
// Signal the end of data for this particular stream
4849
send.finish()?;
4950

50-
// Receive the echo
51+
// Receive the echo, but limit reading up to maximum 1000 bytes
5152
let response = recv.read_to_end(1000).await?;
5253
assert_eq!(&response, b"Hello, world!");
5354

55+
// Explicitly close the whole connection.
5456
conn.close(0u32.into(), b"bye!");
5557

58+
// The above call only queues a close message to be sent (see how it's not async!).
59+
// We need to actually call this to make sure this message is sent out.
60+
endpoint.close().await;
61+
// If we don't call this, but continue using the endpoint, we then the queued
62+
// close call will eventually be picked up and sent.
63+
// But always try to wait for endpoint.close().await to go through before dropping
64+
// the endpoint to ensure any queued messages are sent through and connections are
65+
// closed gracefully.
5666
Ok(())
5767
}
5868

59-
async fn accept_side() -> Result<Router> {
69+
async fn start_accept_side() -> Result<Router> {
6070
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
6171

6272
// Build our protocol handler and add our protocol, identified by its ALPN, and spawn the node.
@@ -85,6 +95,7 @@ impl ProtocolHandler for Echo {
8595
let (mut send, mut recv) = connection.accept_bi().await?;
8696

8797
// Echo any bytes received back directly.
98+
// This will keep copying until the sender signals the end of data on the stream.
8899
let bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;
89100
println!("Copied over {bytes_sent} byte(s)");
90101

0 commit comments

Comments
 (0)