Skip to content

Commit 1323d6c

Browse files
griffdwrensha
authored andcommitted
Ported reconnect API
1 parent b444699 commit 1323d6c

File tree

4 files changed

+568
-0
lines changed

4 files changed

+568
-0
lines changed

capnp-rpc/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ use std::task::{Context, Poll};
7171
pub use crate::rpc::Disconnector;
7272
use crate::task_set::TaskSet;
7373

74+
pub use crate::reconnect::{lazy_auto_reconnect, auto_reconnect, SetTarget};
75+
7476
/// Code generated from
7577
/// [rpc.capnp](https://github.com/sandstorm-io/capnproto/blob/master/c%2B%2B/src/capnp/rpc.capnp).
7678
pub mod rpc_capnp;
@@ -103,6 +105,7 @@ mod rpc;
103105
mod sender_queue;
104106
mod split;
105107
mod task_set;
108+
mod reconnect;
106109
pub mod twoparty;
107110

108111
pub trait OutgoingMessage {

capnp-rpc/src/reconnect.rs

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
use std::cell::RefCell;
2+
use std::marker::PhantomData;
3+
use std::rc::Rc;
4+
5+
use capnp::capability::{Promise, FromClientHook};
6+
use capnp::private::capability::{ClientHook, RequestHook};
7+
use futures::TryFutureExt;
8+
9+
pub trait SetTarget<C> {
10+
fn add_ref(&self) -> Box<dyn SetTarget<C>>;
11+
fn set_target(&self, target: C);
12+
}
13+
14+
impl<C> Clone for Box<dyn SetTarget<C>> {
15+
fn clone(&self) -> Self {
16+
self.add_ref()
17+
}
18+
}
19+
20+
struct ClientInner<F, C> {
21+
connect: F,
22+
current: Option<Box<dyn ClientHook>>,
23+
generation: usize,
24+
marker: PhantomData<C>,
25+
}
26+
27+
impl<F, C> ClientInner<F, C>
28+
where F: FnMut() -> capnp::Result<C>,
29+
F: 'static,
30+
C: FromClientHook,
31+
{
32+
fn get_current(&mut self) -> Box<dyn ClientHook> {
33+
if let Some(hook) = self.current.as_ref() {
34+
hook.add_ref()
35+
} else {
36+
let hook = match (self.connect)() {
37+
Ok(hook) => hook.into_client_hook(),
38+
Err(err) => {
39+
crate::broken::new_cap(err)
40+
}
41+
};
42+
self.current = Some(hook.add_ref());
43+
hook
44+
}
45+
}
46+
}
47+
48+
struct Client<F, C> {
49+
inner: Rc<RefCell<ClientInner<F, C>>>,
50+
}
51+
52+
impl<F, C> Client<F, C>
53+
where F: FnMut() -> capnp::Result<C>,
54+
F: 'static,
55+
C: FromClientHook,
56+
C: 'static,
57+
{
58+
pub fn new(connect: F) -> Client<F, C> {
59+
Client {
60+
inner: Rc::new(RefCell::new(ClientInner {
61+
connect,
62+
generation: 0,
63+
current: None,
64+
marker: PhantomData,
65+
}))
66+
}
67+
}
68+
69+
pub fn get_current(&self) -> Box<dyn ClientHook> {
70+
self.inner.borrow_mut().get_current()
71+
}
72+
73+
fn wrap<T: 'static>(&self, promise: Promise<T, capnp::Error>) -> Promise<T, capnp::Error>
74+
{
75+
let c = self.clone();
76+
let generation = self.inner.borrow().generation;
77+
Promise::from_future(promise.map_err(move |err| {
78+
if err.kind == capnp::ErrorKind::Disconnected
79+
&& generation == c.inner.borrow().generation
80+
{
81+
let mut inner = c.inner.borrow_mut();
82+
inner.generation = generation + 1;
83+
match (inner.connect)() {
84+
Ok(hook) => {
85+
inner.current = Some(hook.into_client_hook())
86+
}
87+
Err(err) => inner.current = Some(crate::broken::new_cap(err)),
88+
}
89+
}
90+
err
91+
}))
92+
}
93+
}
94+
95+
impl<F: 'static, C> SetTarget<C> for Client<F, C>
96+
where F: 'static,
97+
C: FromClientHook,
98+
C: 'static,
99+
{
100+
fn add_ref(&self) -> Box<dyn SetTarget<C>> {
101+
Box::new(self.clone())
102+
}
103+
104+
fn set_target(&self, target: C) {
105+
self.inner.borrow_mut().current = Some(target.into_client_hook());
106+
}
107+
}
108+
109+
impl<F, C> Clone for Client<F, C> {
110+
fn clone(&self) -> Self {
111+
Self { inner: self.inner.clone() }
112+
}
113+
}
114+
115+
impl<F, C> ClientHook for Client<F, C>
116+
where F: FnMut() -> capnp::Result<C>,
117+
F: 'static,
118+
C: FromClientHook,
119+
C: 'static,
120+
{
121+
fn add_ref(&self) -> Box<dyn ClientHook> {
122+
Box::new(self.clone())
123+
}
124+
125+
fn new_call(&self,
126+
interface_id: u64,
127+
method_id: u16,
128+
size_hint: Option<capnp::MessageSize>)
129+
-> capnp::capability::Request<capnp::any_pointer::Owned, capnp::any_pointer::Owned>
130+
{
131+
let result = self.get_current().new_call(interface_id, method_id, size_hint);
132+
let hook = Request::new(self.clone(), result.hook);
133+
capnp::capability::Request::new(Box::new(hook))
134+
}
135+
136+
fn call(&self, interface_id: u64, method_id: u16,
137+
params: Box<dyn capnp::private::capability::ParamsHook>,
138+
results: Box<dyn capnp::private::capability::ResultsHook>)
139+
-> Promise<(), capnp::Error>
140+
{
141+
let result = self.get_current().call(interface_id, method_id, params, results);
142+
self.wrap(result)
143+
}
144+
145+
fn get_brand(&self) -> usize {
146+
0
147+
}
148+
149+
fn get_ptr(&self) -> usize {
150+
(self.inner.as_ref()) as * const _ as usize
151+
}
152+
153+
fn get_resolved(&self) -> Option<Box<dyn ClientHook>> {
154+
None
155+
}
156+
157+
fn when_more_resolved(&self) -> Option<Promise<Box<dyn ClientHook>, capnp::Error>> {
158+
None
159+
}
160+
161+
fn when_resolved(&self) -> Promise<(), capnp::Error> {
162+
Promise::ok(())
163+
}
164+
}
165+
166+
struct Request<F, C> {
167+
parent: Client<F, C>,
168+
inner: Box<dyn RequestHook>,
169+
}
170+
171+
impl<F, C> Request<F, C> {
172+
fn new(parent: Client<F, C>, inner: Box<dyn RequestHook>) -> Request<F, C> {
173+
Request { parent, inner }
174+
}
175+
}
176+
177+
impl<F, C> RequestHook for Request<F, C>
178+
where F: FnMut() -> capnp::Result<C>,
179+
F: 'static,
180+
C: FromClientHook,
181+
C: 'static,
182+
{
183+
fn get(&mut self) -> capnp::any_pointer::Builder<'_> {
184+
self.inner.get()
185+
}
186+
187+
fn get_brand(&self) -> usize {
188+
0
189+
}
190+
191+
fn send(self: Box<Self>) -> capnp::capability::RemotePromise<capnp::any_pointer::Owned> {
192+
let parent = self.parent;
193+
let mut result = self.inner.send();
194+
result.promise = parent.wrap(result.promise);
195+
result
196+
}
197+
198+
fn tail_send(self: Box<Self>)
199+
-> Option<(u32, Promise<(), capnp::Error>, Box<dyn capnp::private::capability::PipelineHook>)> {
200+
todo!()
201+
}
202+
}
203+
204+
pub fn auto_reconnect<F, C>(mut connect: F) -> capnp::Result<(C, Box<dyn SetTarget<C>>)>
205+
where F: FnMut() -> capnp::Result<C>,
206+
F: 'static,
207+
C: FromClientHook,
208+
C: 'static,
209+
{
210+
let current = connect()?;
211+
let c = Client::new(connect);
212+
c.set_target(current);
213+
let hook : Box<dyn ClientHook> = Box::new(c.clone());
214+
Ok((FromClientHook::new(hook), Box::new(c)))
215+
}
216+
217+
218+
pub fn lazy_auto_reconnect<F, C>(connect: F) -> (C, Box<dyn SetTarget<C>>)
219+
where F: FnMut() -> capnp::Result<C>,
220+
F: 'static,
221+
C: FromClientHook,
222+
C: 'static,
223+
{
224+
let c : Client<F, C> = Client::new(connect);
225+
let hook : Box<dyn ClientHook> = Box::new(c.clone());
226+
(FromClientHook::new(hook), Box::new(c))
227+
}

0 commit comments

Comments
 (0)