Skip to content

feat(iroh)!: make ProtocolHandler use async functions #3320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,16 @@ let router = Router::builder(endpoint)
struct Echo;

impl ProtocolHandler for Echo {
fn accept(&self, connection: Connection) -> BoxedFuture<Result<()>> {
Box::pin(async move {
let (mut send, mut recv) = connection.accept_bi().await?;
async fn accept(&self, connection: Connection) -> Result<()> {
let (mut send, mut recv) = connection.accept_bi().await?;

// Echo any bytes received back directly.
let bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;
// Echo any bytes received back directly.
let bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;

send.finish()?;
connection.closed().await;
send.finish()?;
connection.closed().await;

Ok(())
})
Ok(())
}
}
```
Expand Down
50 changes: 23 additions & 27 deletions iroh/examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use iroh::{
watcher::Watcher as _,
Endpoint, NodeAddr,
};
use n0_future::boxed::BoxFuture;

/// Each protocol is identified by its ALPN string.
///
Expand Down Expand Up @@ -84,31 +83,28 @@ impl ProtocolHandler for Echo {
///
/// The returned future runs on a newly spawned tokio task, so it can run as long as
/// the connection lasts.
fn accept(&self, connection: Connection) -> BoxFuture<Result<()>> {
// We have to return a boxed future from the handler.
Box::pin(async move {
// We can get the remote's node id from the connection.
let node_id = connection.remote_node_id()?;
println!("accepted connection from {node_id}");

// Our protocol is a simple request-response protocol, so we expect the
// connecting peer to open a single bi-directional stream.
let (mut send, mut recv) = connection.accept_bi().await?;

// Echo any bytes received back directly.
// This will keep copying until the sender signals the end of data on the stream.
let bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;
println!("Copied over {bytes_sent} byte(s)");

// By calling `finish` on the send stream we signal that we will not send anything
// further, which makes the receive stream on the other end terminate.
send.finish()?;

// Wait until the remote closes the connection, which it does once it
// received the response.
connection.closed().await;

Ok(())
})
async fn accept(&self, connection: Connection) -> Result<()> {
// We can get the remote's node id from the connection.
let node_id = connection.remote_node_id()?;
println!("accepted connection from {node_id}");

// Our protocol is a simple request-response protocol, so we expect the
// connecting peer to open a single bi-directional stream.
let (mut send, mut recv) = connection.accept_bi().await?;

// Echo any bytes received back directly.
// This will keep copying until the sender signals the end of data on the stream.
let bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;
println!("Copied over {bytes_sent} byte(s)");

// By calling `finish` on the send stream we signal that we will not send anything
// further, which makes the receive stream on the other end terminate.
send.finish()?;

// Wait until the remote closes the connection, which it does once it
// received the response.
connection.closed().await;

Ok(())
}
}
65 changes: 30 additions & 35 deletions iroh/examples/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use iroh::{
protocol::{ProtocolHandler, Router},
Endpoint, NodeId,
};
use n0_future::boxed::BoxFuture;
use tokio::sync::Mutex;
use tracing_subscriber::{prelude::*, EnvFilter};

Expand Down Expand Up @@ -127,40 +126,36 @@ impl ProtocolHandler for BlobSearch {
///
/// The returned future runs on a newly spawned tokio task, so it can run as long as
/// the connection lasts.
fn accept(&self, connection: Connection) -> BoxFuture<Result<()>> {
let this = self.clone();
// We have to return a boxed future from the handler.
Box::pin(async move {
// We can get the remote's node id from the connection.
let node_id = connection.remote_node_id()?;
println!("accepted connection from {node_id}");

// Our protocol is a simple request-response protocol, so we expect the
// connecting peer to open a single bi-directional stream.
let (mut send, mut recv) = connection.accept_bi().await?;

// We read the query from the receive stream, while enforcing a max query length.
let query_bytes = recv.read_to_end(64).await?;

// Now, we can perform the actual query on our local database.
let query = String::from_utf8(query_bytes)?;
let num_matches = this.query_local(&query).await;

// We want to return a list of hashes. We do the simplest thing possible, and just send
// one hash after the other. Because the hashes have a fixed size of 32 bytes, this is
// very easy to parse on the other end.
send.write_all(&num_matches.to_le_bytes()).await?;

// By calling `finish` on the send stream we signal that we will not send anything
// further, which makes the receive stream on the other end terminate.
send.finish()?;

// Wait until the remote closes the connection, which it does once it
// received the response.
connection.closed().await;

Ok(())
})
async fn accept(&self, connection: Connection) -> Result<()> {
// We can get the remote's node id from the connection.
let node_id = connection.remote_node_id()?;
println!("accepted connection from {node_id}");

// Our protocol is a simple request-response protocol, so we expect the
// connecting peer to open a single bi-directional stream.
let (mut send, mut recv) = connection.accept_bi().await?;

// We read the query from the receive stream, while enforcing a max query length.
let query_bytes = recv.read_to_end(64).await?;

// Now, we can perform the actual query on our local database.
let query = String::from_utf8(query_bytes)?;
let num_matches = self.query_local(&query).await;

// We want to return a list of hashes. We do the simplest thing possible, and just send
// one hash after the other. Because the hashes have a fixed size of 32 bytes, this is
// very easy to parse on the other end.
send.write_all(&num_matches.to_le_bytes()).await?;

// By calling `finish` on the send stream we signal that we will not send anything
// further, which makes the receive stream on the other end terminate.
send.finish()?;

// Wait until the remote closes the connection, which it does once it
// received the response.
connection.closed().await;

Ok(())
}
}

Expand Down
2 changes: 1 addition & 1 deletion iroh/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1991,7 +1991,7 @@ impl Connection {
/// [`Connecting::handshake_data()`] succeeds. See that method's documentations for
/// details on the returned value.
///
/// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
/// [`Connection::handshake_data()`]: crate::endpoint::Connecting::handshake_data
#[inline]
pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
self.inner.handshake_data()
Expand Down
Loading
Loading