Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ members = [
"benchmark",
"capnpc/test",
"capnpc/test/external-crate",
"capnpc/test-edition-2015",
"capnpc/test-edition-2018",
"capnpc/test-edition-2021",
"capnp-futures/test",
"capnp-rpc/examples/hello-world",
Expand Down
15 changes: 7 additions & 8 deletions capnp-rpc/examples/calculator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,27 @@
// THE SOFTWARE.

use crate::calculator_capnp::calculator;
use capnp::capability::Promise;
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};

use futures::AsyncReadExt;

#[derive(Clone, Copy)]
pub struct PowerFunction;

impl calculator::function::Server for PowerFunction {
fn call(
&mut self,
async fn call(
&self,
params: calculator::function::CallParams,
mut results: calculator::function::CallResults,
) -> Promise<(), ::capnp::Error> {
let params = pry!(pry!(params.get()).get_params());
) -> Result<(), ::capnp::Error> {
let params = params.get()?.get_params()?;
if params.len() != 2 {
Promise::err(::capnp::Error::failed(
Err(::capnp::Error::failed(
"Wrong number of parameters".to_string(),
))
} else {
results.get().set_value(params.get(0).powf(params.get(1)));
Promise::ok(())
Ok(())
}
}
}
Expand Down
109 changes: 58 additions & 51 deletions capnp-rpc/examples/calculator/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

use std::cell::RefCell;

use ::capnp::message::HeapAllocator;
use capnp::capability::Promise;
use capnp::primitive_list;
use capnp::Error;

use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};
use ::capnp_rpc::ImbuedMessageBuilder;
use capnp_rpc::pry;
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};

use crate::calculator_capnp::calculator;
use capnp::capability::Promise;

use futures::future;
use futures::{AsyncReadExt, FutureExt, TryFutureExt};
Expand All @@ -41,19 +46,19 @@ impl ValueImpl {
}

impl calculator::value::Server for ValueImpl {
fn read(
&mut self,
async fn read(
&self,
_params: calculator::value::ReadParams,
mut results: calculator::value::ReadResults,
) -> Promise<(), Error> {
) -> Result<(), Error> {
results.get().set_value(self.value);
Promise::ok(())
Ok(())
}
}

fn evaluate_impl(
expression: calculator::expression::Reader,
params: Option<primitive_list::Reader<f64>>,
expression: calculator::expression::Reader<'_>,
params: Option<primitive_list::Reader<'_, f64>>,
) -> Promise<f64, Error> {
match pry!(expression.which()) {
calculator::expression::Literal(v) => Promise::ok(v),
Expand Down Expand Up @@ -92,43 +97,45 @@ fn evaluate_impl(

struct FunctionImpl {
param_count: u32,
body: ::capnp_rpc::ImbuedMessageBuilder<::capnp::message::HeapAllocator>,
body: RefCell<ImbuedMessageBuilder<HeapAllocator>>,
}

impl FunctionImpl {
fn new(param_count: u32, body: calculator::expression::Reader) -> ::capnp::Result<Self> {
let mut result = Self {
let result = Self {
param_count,
body: ::capnp_rpc::ImbuedMessageBuilder::new(::capnp::message::HeapAllocator::new()),
body: RefCell::new(ImbuedMessageBuilder::new(HeapAllocator::new())),
};
result.body.set_root(body)?;
result.body.borrow_mut().set_root(body)?;
Ok(result)
}
}

impl calculator::function::Server for FunctionImpl {
fn call(
&mut self,
async fn call(
&self,
params: calculator::function::CallParams,
mut results: calculator::function::CallResults,
) -> Promise<(), Error> {
let params = pry!(pry!(params.get()).get_params());
) -> Result<(), Error> {
let params = params.get()?.get_params()?;
if params.len() != self.param_count {
return Promise::err(Error::failed(format!(
return Err(Error::failed(format!(
"Expected {} parameters but got {}.",
self.param_count,
params.len()
)));
}

let eval = evaluate_impl(
pry!(self.body.get_root::<calculator::expression::Builder>()).into_reader(),
self.body
.borrow_mut()
.get_root::<calculator::expression::Builder>()?
.into_reader(),
Some(params),
);
Promise::from_future(async move {
results.get().set_value(eval.await?);
Ok(())
})

results.get().set_value(eval.await?);
Ok(())
}
}

Expand All @@ -138,14 +145,14 @@ pub struct OperatorImpl {
}

impl calculator::function::Server for OperatorImpl {
fn call(
&mut self,
async fn call(
&self,
params: calculator::function::CallParams,
mut results: calculator::function::CallResults,
) -> Promise<(), Error> {
let params = pry!(pry!(params.get()).get_params());
) -> Result<(), Error> {
let params = params.get()?.get_params()?;
if params.len() != 2 {
Promise::err(Error::failed("Wrong number of parameters.".to_string()))
Err(Error::failed("Wrong number of parameters.".to_string()))
} else {
let v = match self.op {
calculator::Operator::Add => params.get(0) + params.get(1),
Expand All @@ -154,50 +161,50 @@ impl calculator::function::Server for OperatorImpl {
calculator::Operator::Divide => params.get(0) / params.get(1),
};
results.get().set_value(v);
Promise::ok(())
Ok(())
}
}
}

struct CalculatorImpl;

impl calculator::Server for CalculatorImpl {
fn evaluate(
&mut self,
async fn evaluate(
&self,
params: calculator::EvaluateParams,
mut results: calculator::EvaluateResults,
) -> Promise<(), Error> {
Promise::from_future(async move {
let v = evaluate_impl(params.get()?.get_expression()?, None).await?;
results
.get()
.set_value(capnp_rpc::new_client(ValueImpl::new(v)));
Ok(())
})
) -> Result<(), Error> {
let v = evaluate_impl(params.get()?.get_expression()?, None).await?;
results
.get()
.set_value(capnp_rpc::new_client(ValueImpl::new(v)));
Ok(())
}
fn def_function(
&mut self,

async fn def_function(
&self,
params: calculator::DefFunctionParams,
mut results: calculator::DefFunctionResults,
) -> Promise<(), Error> {
) -> Result<(), Error> {
results
.get()
.set_func(capnp_rpc::new_client(pry!(FunctionImpl::new(
pry!(params.get()).get_param_count() as u32,
pry!(pry!(params.get()).get_body())
))));
Promise::ok(())
.set_func(capnp_rpc::new_client(FunctionImpl::new(
params.get()?.get_param_count() as u32,
params.get()?.get_body()?,
)?));
Ok(())
}
fn get_operator(
&mut self,

async fn get_operator(
&self,
params: calculator::GetOperatorParams,
mut results: calculator::GetOperatorResults,
) -> Promise<(), Error> {
let op = pry!(pry!(params.get()).get_op());
) -> Result<(), Error> {
let op = params.get()?.get_op()?;
results
.get()
.set_func(capnp_rpc::new_client(OperatorImpl { op }));
Promise::ok(())
Ok(())
}
}

Expand Down
15 changes: 7 additions & 8 deletions capnp-rpc/examples/hello-world/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

use capnp::capability::Promise;
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};

use crate::hello_world_capnp::hello_world;

Expand All @@ -30,17 +29,17 @@ use std::net::ToSocketAddrs;
struct HelloWorldImpl;

impl hello_world::Server for HelloWorldImpl {
fn say_hello(
&mut self,
async fn say_hello(
&self,
params: hello_world::SayHelloParams,
mut results: hello_world::SayHelloResults,
) -> Promise<(), ::capnp::Error> {
let request = pry!(pry!(params.get()).get_request());
let name = pry!(pry!(request.get_name()).to_str());
) -> Result<(), ::capnp::Error> {
let request = params.get()?.get_request()?;
let name = request.get_name()?.to_str()?;
let message = format!("Hello, {name}!");
results.get().init_reply().set_message(message);

Promise::ok(())
Ok(())
}
}

Expand Down
13 changes: 6 additions & 7 deletions capnp-rpc/examples/pubsub/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,23 @@
// THE SOFTWARE.

use crate::pubsub_capnp::{publisher, subscriber};
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};

use capnp::capability::Promise;
use futures::AsyncReadExt;

struct SubscriberImpl;

impl subscriber::Server<::capnp::text::Owned> for SubscriberImpl {
fn push_message(
&mut self,
async fn push_message(
&self,
params: subscriber::PushMessageParams<::capnp::text::Owned>,
_results: subscriber::PushMessageResults<::capnp::text::Owned>,
) -> Promise<(), ::capnp::Error> {
) -> Result<(), ::capnp::Error> {
println!(
"message from publisher: {}",
pry!(pry!(pry!(params.get()).get_message()).to_str())
params.get()?.get_message()?.to_str()?
);
Promise::ok(())
Ok(())
}
}

Expand Down
26 changes: 12 additions & 14 deletions capnp-rpc/examples/pubsub/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

use std::cell::RefCell;
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::rc::Rc;

use crate::pubsub_capnp::{publisher, subscriber, subscription};
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};

use capnp::capability::Promise;
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};

use futures::{AsyncReadExt, FutureExt, StreamExt};

Expand Down Expand Up @@ -68,7 +66,7 @@ impl Drop for SubscriptionImpl {
impl subscription::Server for SubscriptionImpl {}

struct PublisherImpl {
next_id: u64,
next_id: Cell<u64>,
subscribers: Rc<RefCell<SubscriberMap>>,
}

Expand All @@ -77,7 +75,7 @@ impl PublisherImpl {
let subscribers = Rc::new(RefCell::new(SubscriberMap::new()));
(
Self {
next_id: 0,
next_id: Cell::new(0),
subscribers: subscribers.clone(),
},
subscribers,
Expand All @@ -86,29 +84,29 @@ impl PublisherImpl {
}

impl publisher::Server<::capnp::text::Owned> for PublisherImpl {
fn subscribe(
&mut self,
async fn subscribe(
&self,
params: publisher::SubscribeParams<::capnp::text::Owned>,
mut results: publisher::SubscribeResults<::capnp::text::Owned>,
) -> Promise<(), ::capnp::Error> {
) -> Result<(), ::capnp::Error> {
println!("subscribe");
self.subscribers.borrow_mut().subscribers.insert(
self.next_id,
self.next_id.get(),
SubscriberHandle {
client: pry!(pry!(params.get()).get_subscriber()),
client: params.get()?.get_subscriber()?,
requests_in_flight: 0,
},
);

results
.get()
.set_subscription(capnp_rpc::new_client(SubscriptionImpl::new(
self.next_id,
self.next_id.get(),
self.subscribers.clone(),
)));

self.next_id += 1;
Promise::ok(())
self.next_id.set(self.next_id.get() + 1);
Ok(())
}
}

Expand Down
Loading
Loading