Skip to content

Commit feaa304

Browse files
committed
Added wasm worker dispatcher and single threaded support, compilation works, runtime fails at handshake
Signed-off-by: Aditya <[email protected]>
1 parent e231a5f commit feaa304

File tree

12 files changed

+422
-140
lines changed

12 files changed

+422
-140
lines changed

Cargo.lock

Lines changed: 19 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqlx-core/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ async-std = { workspace = true, optional = true }
5252
smol = { workspace = true, optional = true }
5353
tokio = { workspace = true, optional = true, features = ["sync","rt"]}
5454
tokio-util = { workspace = true }
55+
async-lock = "2.5.0"
56+
once_cell = "1.21.3"
5557

5658
# TLS
5759
native-tls = { version = "0.2.10", optional = true }

sqlx-core/src/net/socket/mod.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,6 @@ pub async fn connect_tcp<Ws: WithSocket>(
199199
.await);
200200
}
201201

202-
#[cfg(target_arch = "wasm32")]
203-
{
204-
todo!("outer socket impl")
205-
}
206-
207202
cfg_if! {
208203
if #[cfg(feature = "_rt-async-io")] {
209204
Ok(with_socket.with_socket(connect_tcp_async_io(host, port).await?).await)

sqlx-core/src/pool/inner.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,23 @@ impl<DB: Database> PoolInner<DB> {
347347

348348
// result here is `Result<Result<C, Error>, TimeoutError>`
349349
// if this block does not return, sleep for the backoff timeout and try again
350+
eprintln!(
351+
"pool: attempting connect (deadline in {}ms, current size={})",
352+
timeout.as_millis(),
353+
self.size()
354+
);
355+
350356
let res = crate::rt::timeout(timeout, connect_options.connect()).await;
357+
if let Ok(Ok(_)) = &res {
358+
eprintln!("pool: connect attempt succeeded");
359+
} else if let Ok(Err(e)) = &res {
360+
eprintln!("pool: connect attempt returned error: {:?}", e);
361+
} else if res.is_err() {
362+
eprintln!(
363+
"pool: connect attempt timed out after {}ms",
364+
timeout.as_millis()
365+
);
366+
}
351367
match res {
352368
// successfully established connection
353369
Ok(Ok(mut raw)) => {

sqlx-core/src/rt/mod.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ pub mod rt_tokio;
1515
#[cfg(target_arch = "wasm32")]
1616
pub mod rt_wasip3;
1717

18+
#[cfg(target_arch = "wasm32")]
19+
pub mod wasm_worker;
20+
1821
#[derive(Debug, thiserror::Error)]
1922
#[error("operation timed out")]
2023
pub struct TimeoutError;
@@ -38,6 +41,30 @@ pub async fn timeout<F: Future>(duration: Duration, f: F) -> Result<F::Output, T
3841
#[cfg(debug_assertions)]
3942
let f = Box::pin(f);
4043

44+
// wasm: avoid requiring a Tokio runtime handle. Race the future against
45+
// a wasip3 monotonic sleep using futures::select so the function works
46+
// under the wasip3 executor which doesn't expose a Tokio Handle.
47+
#[cfg(target_arch = "wasm32")]
48+
{
49+
use futures_util::{future::FutureExt, pin_mut, select};
50+
// `sleep` is the runtime-agnostic sleep in this same `rt` module:
51+
use crate::rt::sleep;
52+
53+
// fuse so select! can safely poll them
54+
let mut fut = f.fuse();
55+
let mut timer = sleep(duration).fuse();
56+
57+
// pin them on the stack (avoids requiring F: Unpin)
58+
pin_mut!(fut, timer);
59+
60+
// select! is an expression — return it
61+
return select! {
62+
res = fut => Ok(res),
63+
_ = timer => Err(TimeoutError),
64+
};
65+
}
66+
67+
// Native: if Tokio is enabled and a handle is available, delegate to it.
4168
#[cfg(feature = "_rt-tokio")]
4269
if rt_tokio::available() {
4370
return tokio::time::timeout(duration, f)
@@ -171,11 +198,20 @@ pub fn test_block_on<F: Future>(f: F) -> F::Output {
171198

172199
#[cfg(any(feature = "_rt-tokio", target_arch = "wasm32"))]
173200
{
174-
return tokio::runtime::Builder::new_current_thread()
201+
let rt = tokio::runtime::Builder::new_current_thread()
175202
.enable_all()
176203
.build()
177-
.expect("failed to start Tokio runtime")
178-
.block_on(f);
204+
.expect("failed to start Tokio runtime");
205+
206+
#[cfg(target_arch = "wasm32")]
207+
{
208+
return rt.block_on(async { tokio::task::LocalSet::new().run_until(f).await });
209+
}
210+
211+
#[cfg(not(target_arch = "wasm32"))]
212+
{
213+
return rt.block_on(f);
214+
}
179215
}
180216

181217
#[cfg(all(

0 commit comments

Comments
 (0)