Skip to content

Commit 38f7f58

Browse files
committed
Prototype support for async native functions
This experiments with enabling support for async NativeFunctions that are only async from the pov of the host, and appear as synchronous from within JavaScript. Instead of running the async functions as a Promise via enqueue_job, this works by allowing Operations to be executed over multiple VM cycles, so an Operation may start some async work in one step and then further steps can poll for completion of that work and finish the Operation. In particular this works by allowing Call Operations to return an `OpStatus::Pending`value that indicates that the same Call operation needs to be executed repeatedly, until it returns an `OpStatus::Finished` status. In the case of a `Pending` status, the program counter is reset and anything that was taken off the stack is pushed back so the same Operation can be re-executed. There is a new `NativeFunction::from_async_as_sync_with_captures()` that lets the host provide a (sync) closure that itself returns / spawns a boxed Future. This is tracked internally as an `Inner::AsyncFn`. Whenever the function is `__call__`ed then (assuming the operation isn't already in a pending / running state) a new Future is spawned via the application's closure and the Operation enters a "pending" state. When a NativeFunction is pending then each `__call__` will `poll()` the spawned `Future` to see if the `async` function has a result. This effectively stalls the VM at the same Opcode while still accounting for any cycle budget and periodically yielding to the application's async runtime while waiting for an async Call Operation to finish. Limitations / Issues ==================== == Busy Loop Polling == Even though the implementation does yield back to the application's async runtime when waiting for a NativeFunction to complete, the implementation isn't ideal because it uses a noop task Context + Waker when polling NativeFunction Futures. The effectively relies on the VM polling the future in a busy loop, wasting CPU time. A better solution could be to implement a shim Waker that would flag some state on the Boa engine Context, and then adapt the Future that's used to yield the VM to the executor so that it only becomes Ready once the async NativeFunction has signalled the waker. I.e. the Waker would act like a bridge/proxy between a spawned async NativeFunction and the the Future/Task associated with the VM's async `run_async_with_budget`. This way I think the VM could remain async runtime agnostic but would be able to actually sleep while waiting for async functions instead of entering a busy yield loop. == Requires PC rewind and reverting stack state == Ideally operations that may complete over multiple steps would maintain a state machine via private registers, whereby it would not be necessary to repeatedly rewind the program counter and re-push values to the stack so that the operation can be decoded and executed repeatedly from the beginning. == Only adapts Call Operation == Currently only the Call Operation handles async NativeFunctions but there are other Call[XYZ] Operations that could be adapted too. == Not compatible with composite Operations that `call()` == The ability to track pending async functions is implemented in terms of repeatedly executing an Opcode in the VM until it signals that it's not Pending. This currently relies on being able to reset and re-execute the Operation (such as reverting program counter and stack changes). There are lots of Operations that make use of JsObject::call() internally and they would currently trigger a panic if they called an async NativeFunction because they would not be able to "resolve()" the "Pending" status that would be returned by the `call()`. Ideally all Operations that use `__call__` or `__construct__` should be fully resumable in the same way that the Call Operation is now. This would presumably be easier to achieve with Rust Coroutines if they were stable because it would otherwise be necessary to adapt composite Operations into a state machine, similar to what the compiler does for an async Future, so they can yield for async function calls and be resumed by the VM.
1 parent e954495 commit 38f7f58

File tree

12 files changed

+574
-112
lines changed

12 files changed

+574
-112
lines changed

core/engine/src/native_function/mod.rs

Lines changed: 352 additions & 45 deletions
Large diffs are not rendered by default.

core/engine/src/object/internal_methods/mod.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,10 +406,23 @@ pub(crate) enum CallValue {
406406
argument_count: usize,
407407
},
408408

409+
/// Further processing is needed.
410+
///
411+
/// Unlike for `Pending`, the further processing should not block the VM and
412+
/// be completed synchronously, it should integrate with VM cycle budgeting
413+
/// and yielding.
414+
AsyncPending,
415+
409416
/// The value has been computed and is the first element on the stack.
410417
Complete,
411418
}
412419

420+
pub(crate) enum ResolvedCallValue {
421+
Ready { register_count: usize },
422+
Pending,
423+
Complete,
424+
}
425+
413426
impl CallValue {
414427
/// Resolves the [`CallValue`], and return if the value is complete.
415428
pub(crate) fn resolve(mut self, context: &mut Context) -> JsResult<Option<usize>> {
@@ -425,7 +438,25 @@ impl CallValue {
425438
match self {
426439
Self::Ready { register_count } => Ok(Some(register_count)),
427440
Self::Complete => Ok(None),
441+
Self::Pending { .. } | Self::AsyncPending { .. } => unreachable!(),
442+
}
443+
}
444+
445+
pub(crate) fn async_resolve(mut self, context: &mut Context) -> JsResult<ResolvedCallValue> {
446+
while let Self::Pending {
447+
func,
448+
object,
449+
argument_count,
450+
} = self
451+
{
452+
self = func(&object, argument_count, context)?;
453+
}
454+
455+
match self {
456+
Self::Ready { register_count } => Ok(ResolvedCallValue::Ready { register_count }),
457+
Self::Complete => Ok(ResolvedCallValue::Complete),
428458
Self::Pending { .. } => unreachable!(),
459+
Self::AsyncPending { .. } => Ok(ResolvedCallValue::Pending),
429460
}
430461
}
431462
}

core/engine/src/vm/completion_record.rs

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
33
#![allow(clippy::inline_always)]
44

5-
use super::Registers;
5+
use super::{OpStatus, Registers};
66
use crate::{Context, JsError, JsResult, JsValue};
77
use boa_gc::{custom_trace, Finalize, Trace};
88
use std::ops::ControlFlow;
@@ -56,7 +56,8 @@ pub(crate) trait IntoCompletionRecord {
5656
self,
5757
context: &mut Context,
5858
registers: &mut Registers,
59-
) -> ControlFlow<CompletionRecord>;
59+
saved_pc: u32,
60+
) -> ControlFlow<CompletionRecord, OpStatus>;
6061
}
6162

6263
impl IntoCompletionRecord for () {
@@ -65,8 +66,9 @@ impl IntoCompletionRecord for () {
6566
self,
6667
_: &mut Context,
6768
_: &mut Registers,
68-
) -> ControlFlow<CompletionRecord> {
69-
ControlFlow::Continue(())
69+
_: u32,
70+
) -> ControlFlow<CompletionRecord, OpStatus> {
71+
ControlFlow::Continue(OpStatus::Finished)
7072
}
7173
}
7274

@@ -76,7 +78,8 @@ impl IntoCompletionRecord for JsError {
7678
self,
7779
context: &mut Context,
7880
registers: &mut Registers,
79-
) -> ControlFlow<CompletionRecord> {
81+
_: u32,
82+
) -> ControlFlow<CompletionRecord, OpStatus> {
8083
context.handle_error(registers, self)
8184
}
8285
}
@@ -87,9 +90,29 @@ impl IntoCompletionRecord for JsResult<()> {
8790
self,
8891
context: &mut Context,
8992
registers: &mut Registers,
90-
) -> ControlFlow<CompletionRecord> {
93+
_: u32,
94+
) -> ControlFlow<CompletionRecord, OpStatus> {
9195
match self {
92-
Ok(()) => ControlFlow::Continue(()),
96+
Ok(()) => ControlFlow::Continue(OpStatus::Finished),
97+
Err(err) => context.handle_error(registers, err),
98+
}
99+
}
100+
}
101+
102+
impl IntoCompletionRecord for JsResult<OpStatus> {
103+
#[inline(always)]
104+
fn into_completion_record(
105+
self,
106+
context: &mut Context,
107+
registers: &mut Registers,
108+
saved_pc: u32,
109+
) -> ControlFlow<CompletionRecord, OpStatus> {
110+
match self {
111+
Ok(OpStatus::Finished) => ControlFlow::Continue(OpStatus::Finished),
112+
Ok(OpStatus::Pending) => {
113+
context.vm.frame_mut().pc = saved_pc;
114+
ControlFlow::Continue(OpStatus::Pending)
115+
}
93116
Err(err) => context.handle_error(registers, err),
94117
}
95118
}
@@ -101,7 +124,30 @@ impl IntoCompletionRecord for ControlFlow<CompletionRecord> {
101124
self,
102125
_: &mut Context,
103126
_: &mut Registers,
104-
) -> ControlFlow<CompletionRecord> {
105-
self
127+
_: u32,
128+
) -> ControlFlow<CompletionRecord, OpStatus> {
129+
match self {
130+
ControlFlow::Continue(()) => ControlFlow::Continue(OpStatus::Finished),
131+
ControlFlow::Break(completion_record) => ControlFlow::Break(completion_record),
132+
}
133+
}
134+
}
135+
136+
impl IntoCompletionRecord for ControlFlow<CompletionRecord, OpStatus> {
137+
#[inline(always)]
138+
fn into_completion_record(
139+
self,
140+
context: &mut Context,
141+
_: &mut Registers,
142+
saved_pc: u32,
143+
) -> ControlFlow<CompletionRecord, OpStatus> {
144+
match self {
145+
ControlFlow::Continue(OpStatus::Finished) => ControlFlow::Continue(OpStatus::Finished),
146+
ControlFlow::Continue(OpStatus::Pending) => {
147+
context.vm.frame_mut().pc = saved_pc;
148+
ControlFlow::Continue(OpStatus::Pending)
149+
}
150+
ControlFlow::Break(completion_record) => ControlFlow::Break(completion_record),
151+
}
106152
}
107153
}

core/engine/src/vm/mod.rs

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ impl Vm {
136136
where
137137
T: Into<JsValue>,
138138
{
139+
//println!("Pushing value");
139140
self.stack.push(value.into());
140141
}
141142

@@ -146,6 +147,7 @@ impl Vm {
146147
/// If there is nothing to pop, then this will panic.
147148
#[track_caller]
148149
pub(crate) fn pop(&mut self) -> JsValue {
150+
//println!("Popping value");
149151
self.stack.pop().expect("stack was empty")
150152
}
151153

@@ -244,11 +246,13 @@ impl Vm {
244246
}
245247

246248
pub(crate) fn pop_n_values(&mut self, n: usize) -> Vec<JsValue> {
249+
//println!("Popping {n} values");
247250
let at = self.stack.len() - n;
248251
self.stack.split_off(at)
249252
}
250253

251254
pub(crate) fn push_values(&mut self, values: &[JsValue]) {
255+
//println!("Pushing {} values", values.len());
252256
self.stack.extend_from_slice(values);
253257
}
254258

@@ -298,9 +302,9 @@ impl Context {
298302
f: F,
299303
registers: &mut Registers,
300304
opcode: Opcode,
301-
) -> ControlFlow<CompletionRecord>
305+
) -> ControlFlow<CompletionRecord, OpStatus>
302306
where
303-
F: FnOnce(&mut Context, &mut Registers, Opcode) -> ControlFlow<CompletionRecord>,
307+
F: FnOnce(&mut Context, &mut Registers, Opcode) -> ControlFlow<CompletionRecord, OpStatus>,
304308
{
305309
let frame = self.vm.frame();
306310
let (instruction, _) = frame
@@ -375,15 +379,21 @@ impl Context {
375379
}
376380
}
377381

382+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
383+
pub(crate) enum OpStatus {
384+
Finished,
385+
Pending,
386+
}
387+
378388
impl Context {
379389
fn execute_instruction<F>(
380390
&mut self,
381391
f: F,
382392
registers: &mut Registers,
383393
opcode: Opcode,
384-
) -> ControlFlow<CompletionRecord>
394+
) -> ControlFlow<CompletionRecord, OpStatus>
385395
where
386-
F: FnOnce(&mut Context, &mut Registers, Opcode) -> ControlFlow<CompletionRecord>,
396+
F: FnOnce(&mut Context, &mut Registers, Opcode) -> ControlFlow<CompletionRecord, OpStatus>,
387397
{
388398
f(self, registers, opcode)
389399
}
@@ -393,9 +403,9 @@ impl Context {
393403
f: F,
394404
registers: &mut Registers,
395405
opcode: Opcode,
396-
) -> ControlFlow<CompletionRecord>
406+
) -> ControlFlow<CompletionRecord, OpStatus>
397407
where
398-
F: FnOnce(&mut Context, &mut Registers, Opcode) -> ControlFlow<CompletionRecord>,
408+
F: FnOnce(&mut Context, &mut Registers, Opcode) -> ControlFlow<CompletionRecord, OpStatus>,
399409
{
400410
#[cfg(feature = "fuzz")]
401411
{
@@ -422,7 +432,7 @@ impl Context {
422432
&mut self,
423433
registers: &mut Registers,
424434
err: JsError,
425-
) -> ControlFlow<CompletionRecord> {
435+
) -> ControlFlow<CompletionRecord, OpStatus> {
426436
// If we hit the execution step limit, bubble up the error to the
427437
// (Rust) caller instead of trying to handle as an exception.
428438
if !err.is_catchable() {
@@ -451,7 +461,7 @@ impl Context {
451461
let pc = self.vm.frame().pc.saturating_sub(1);
452462
if self.vm.handle_exception_at(pc) {
453463
self.vm.pending_exception = Some(err);
454-
return ControlFlow::Continue(());
464+
return ControlFlow::Continue(OpStatus::Finished);
455465
}
456466

457467
// Inject realm before crossing the function boundry
@@ -461,7 +471,10 @@ impl Context {
461471
self.handle_thow(registers)
462472
}
463473

464-
fn handle_return(&mut self, registers: &mut Registers) -> ControlFlow<CompletionRecord> {
474+
fn handle_return(
475+
&mut self,
476+
registers: &mut Registers,
477+
) -> ControlFlow<CompletionRecord, OpStatus> {
465478
let frame = self.vm.frame();
466479
let fp = frame.fp() as usize;
467480
let exit_early = frame.exit_early();
@@ -475,10 +488,13 @@ impl Context {
475488
self.vm.push(result);
476489
self.vm.pop_frame().expect("frame must exist");
477490
registers.pop_function(self.vm.frame().code_block().register_count as usize);
478-
ControlFlow::Continue(())
491+
ControlFlow::Continue(OpStatus::Finished)
479492
}
480493

481-
fn handle_yield(&mut self, registers: &mut Registers) -> ControlFlow<CompletionRecord> {
494+
fn handle_yield(
495+
&mut self,
496+
registers: &mut Registers,
497+
) -> ControlFlow<CompletionRecord, OpStatus> {
482498
let result = self.vm.take_return_value();
483499
if self.vm.frame().exit_early() {
484500
return ControlFlow::Break(CompletionRecord::Return(result));
@@ -487,10 +503,13 @@ impl Context {
487503
self.vm.push(result);
488504
self.vm.pop_frame().expect("frame must exist");
489505
registers.pop_function(self.vm.frame().code_block().register_count as usize);
490-
ControlFlow::Continue(())
506+
ControlFlow::Continue(OpStatus::Finished)
491507
}
492508

493-
fn handle_thow(&mut self, registers: &mut Registers) -> ControlFlow<CompletionRecord> {
509+
fn handle_thow(
510+
&mut self,
511+
registers: &mut Registers,
512+
) -> ControlFlow<CompletionRecord, OpStatus> {
494513
let frame = self.vm.frame();
495514
let mut fp = frame.fp();
496515
let mut env_fp = frame.env_fp;
@@ -515,7 +534,7 @@ impl Context {
515534
let exit_early = self.vm.frame.exit_early();
516535

517536
if self.vm.handle_exception_at(pc) {
518-
return ControlFlow::Continue(());
537+
return ControlFlow::Continue(OpStatus::Finished);
519538
}
520539

521540
if exit_early {
@@ -535,7 +554,7 @@ impl Context {
535554
}
536555
self.vm.environments.truncate(env_fp as usize);
537556
self.vm.stack.truncate(fp as usize);
538-
ControlFlow::Continue(())
557+
ControlFlow::Continue(OpStatus::Finished)
539558
}
540559

541560
/// Runs the current frame to completion, yielding to the caller each time `budget`
@@ -576,12 +595,16 @@ impl Context {
576595
registers,
577596
opcode,
578597
) {
579-
ControlFlow::Continue(()) => {}
598+
ControlFlow::Continue(OpStatus::Finished) => {}
599+
ControlFlow::Continue(OpStatus::Pending) => {
600+
runtime_budget = 0;
601+
}
580602
ControlFlow::Break(value) => return value,
581603
}
582604

583605
if runtime_budget == 0 {
584606
runtime_budget = budget;
607+
//println!("Yielding to executor");
585608
yield_now().await;
586609
}
587610
}
@@ -608,7 +631,7 @@ impl Context {
608631
let opcode = Opcode::decode(*byte);
609632

610633
match self.execute_one(Self::execute_bytecode_instruction, registers, opcode) {
611-
ControlFlow::Continue(()) => {}
634+
ControlFlow::Continue(_) => {}
612635
ControlFlow::Break(value) => return value,
613636
}
614637
}

core/engine/src/vm/opcode/await/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
js_string,
88
native_function::NativeFunction,
99
object::FunctionObjectBuilder,
10-
vm::{opcode::Operation, CompletionRecord, GeneratorResumeKind, Registers},
10+
vm::{opcode::Operation, CompletionRecord, GeneratorResumeKind, OpStatus, Registers},
1111
Context, JsArgs, JsValue,
1212
};
1313
use boa_gc::Gc;
@@ -26,7 +26,7 @@ impl Await {
2626
value: VaryingOperand,
2727
registers: &mut Registers,
2828
context: &mut Context,
29-
) -> ControlFlow<CompletionRecord> {
29+
) -> ControlFlow<CompletionRecord, OpStatus> {
3030
let value = registers.get(value.into());
3131

3232
// 2. Let promise be ? PromiseResolve(%Promise%, value).
@@ -197,14 +197,14 @@ impl CompletePromiseCapability {
197197
(): (),
198198
registers: &mut Registers,
199199
context: &mut Context,
200-
) -> ControlFlow<CompletionRecord> {
200+
) -> ControlFlow<CompletionRecord, OpStatus> {
201201
// If the current executing function is an async function we have to resolve/reject it's promise at the end.
202202
// The relevant spec section is 3. in [AsyncBlockStart](https://tc39.es/ecma262/#sec-asyncblockstart).
203203
let Some(promise_capability) = context.vm.frame().promise_capability(registers) else {
204204
return if context.vm.pending_exception.is_some() {
205205
context.handle_thow(registers)
206206
} else {
207-
ControlFlow::Continue(())
207+
ControlFlow::Continue(OpStatus::Finished)
208208
};
209209
};
210210

@@ -225,7 +225,7 @@ impl CompletePromiseCapability {
225225
.vm
226226
.set_return_value(promise_capability.promise().clone().into());
227227

228-
ControlFlow::Continue(())
228+
ControlFlow::Continue(OpStatus::Finished)
229229
}
230230
}
231231

0 commit comments

Comments
 (0)