Skip to content

Commit 59760b4

Browse files
committed
optimization: allow Finish message to be elided when not needed
upstream: capnproto/capnproto@d8efac8
1 parent d1e2156 commit 59760b4

File tree

2 files changed

+42
-28
lines changed

2 files changed

+42
-28
lines changed

capnp-rpc/src/rpc.rs

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,9 @@ where
154154

155155
/// The local QuestionRef, set to None when it is destroyed.
156156
self_ref: Option<Weak<RefCell<QuestionRef<VatId>>>>,
157+
158+
/// If true, don't send a Finish message.
159+
skip_finish: bool,
157160
}
158161

159162
impl<VatId> Question<VatId> {
@@ -163,6 +166,7 @@ impl<VatId> Question<VatId> {
163166
param_exports: Vec::new(),
164167
is_tail_call: false,
165168
self_ref: None,
169+
skip_finish: false,
166170
}
167171
}
168172
}
@@ -210,20 +214,22 @@ impl<VatId> Drop for QuestionRef<VatId> {
210214
unreachable!()
211215
};
212216
if let Ok(ref mut c) = *self.connection_state.connection.borrow_mut() {
213-
let mut message = c.new_outgoing_message(5);
214-
{
215-
let root: message::Builder = message.get_body().unwrap().init_as();
216-
let mut builder = root.init_finish();
217-
builder.set_question_id(self.id);
218-
219-
// If we're still awaiting a return, then this request is being
220-
// canceled, and we're going to ignore any capabilities in the return
221-
// message, so set releaseResultCaps true. If we already received the
222-
// return, then we've already built local proxies for the caps and will
223-
// send Release messages when those are destroyed.
224-
builder.set_release_result_caps(q.is_awaiting_return);
217+
if !q.skip_finish {
218+
let mut message = c.new_outgoing_message(5);
219+
{
220+
let root: message::Builder = message.get_body().unwrap().init_as();
221+
let mut builder = root.init_finish();
222+
builder.set_question_id(self.id);
223+
224+
// If we're still awaiting a return, then this request is being
225+
// canceled, and we're going to ignore any capabilities in the return
226+
// message, so set releaseResultCaps true. If we already received the
227+
// return, then we've already built local proxies for the caps and will
228+
// send Release messages when those are destroyed.
229+
builder.set_release_result_caps(q.is_awaiting_return);
230+
}
231+
let _ = message.send();
225232
}
226-
let _ = message.send();
227233
}
228234

229235
if q.is_awaiting_return {
@@ -774,9 +780,10 @@ impl<VatId> ConnectionState<VatId> {
774780
let answers_slots = &mut connection_state.answers.borrow_mut().slots;
775781
match answers_slots.entry(answer_id) {
776782
hash_map::Entry::Vacant(_) => {
777-
return Err(Error::failed(format!(
778-
"Invalid question ID {answer_id} in Finish message."
779-
)));
783+
// The `Finish` message targets a question ID that isn't present in our answer table.
784+
// Probably, we sent a `Return` with `noFinishNeeded = true`, but the other side didn't
785+
// recognize this hint and sent a `Finish` anyway, or the `Finish` was already in-flight at
786+
// the time we sent the `Return`. We can silently ignore this.
780787
}
781788
hash_map::Entry::Occupied(mut entry) => {
782789
let answer = entry.get_mut();
@@ -1008,6 +1015,9 @@ impl<VatId> ConnectionState<VatId> {
10081015
match questions.slots[question_id as usize] {
10091016
Some(ref mut question) => {
10101017
question.is_awaiting_return = false;
1018+
if ret.get_no_finish_needed() {
1019+
question.skip_finish = true;
1020+
}
10111021
match question.self_ref {
10121022
Some(ref question_ref) => match ret.which()? {
10131023
return_::Results(results) => {
@@ -1215,23 +1225,23 @@ impl<VatId> ConnectionState<VatId> {
12151225
let promised_answer = promised_answer?;
12161226
let question_id = promised_answer.get_question_id();
12171227

1218-
match self.answers.borrow().slots.get(&question_id) {
1219-
None => Err(Error::failed(
1220-
"PromisedAnswer.questionId is not a current question.".to_string(),
1221-
)),
1228+
let pipeline = match self.answers.borrow().slots.get(&question_id) {
1229+
None => Box::new(broken::Pipeline::new(Error::failed(
1230+
"Pipeline call on a request that returned no capabilities or was already closed.".to_string(),
1231+
))) as Box<dyn PipelineHook>,
12221232
Some(base) => {
1223-
let pipeline = match base.pipeline {
1233+
match base.pipeline {
12241234
Some(ref pipeline) => pipeline.add_ref(),
12251235
None => Box::new(broken::Pipeline::new(Error::failed(
12261236
"Pipeline call on a request that returned not capabilities or was \
12271237
already closed."
12281238
.to_string(),
12291239
))) as Box<dyn PipelineHook>,
1230-
};
1231-
let ops = to_pipeline_ops(promised_answer.get_transform()?)?;
1232-
Ok(pipeline.get_pipelined_cap(&ops))
1240+
}
12331241
}
1234-
}
1242+
};
1243+
let ops = to_pipeline_ops(promised_answer.get_transform()?)?;
1244+
Ok(pipeline.get_pipelined_cap(&ops))
12351245
}
12361246
}
12371247
}
@@ -2356,11 +2366,15 @@ impl ResultsDone {
23562366
(false, Ok(())) => {
23572367
let exports = {
23582368
let root: message::Builder = message.get_body()?.get_as()?;
2359-
let message::Return(ret) = root.which()? else {
2369+
let message::Return(Ok(mut ret)) = root.which()? else {
23602370
unreachable!()
23612371
};
2372+
if cap_table.is_empty() {
2373+
ret.set_no_finish_needed(true);
2374+
finish_received.set(true);
2375+
}
23622376
let crate::rpc_capnp::return_::Results(Ok(payload)) =
2363-
ret?.which()?
2377+
ret.which()?
23642378
else {
23652379
unreachable!()
23662380
};

capnp-rpc/test/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ fn pipelining_return_null() {
384384
let cap = request.send().pipeline.get_cap();
385385
match cap.foo_request().send().promise.await {
386386
Err(ref e) => {
387-
if e.extra.contains("Message contains null capability pointer") {
387+
if e.extra.contains("Pipeline call on a request that returned no capabilities") {
388388
Ok(())
389389
} else {
390390
Err(Error::failed(format!(

0 commit comments

Comments
 (0)