diff --git a/Cargo.toml b/Cargo.toml index 3f574e6e4..0411cf98b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -109,6 +109,7 @@ tower-service = "0.3" futures-core = { version = "0.3.28", default-features = false } futures-util = { version = "0.3.28", default-features = false, optional = true } sync_wrapper = { version = "1.0", features = ["futures"] } +pin-project-lite = "0.2.11" # Optional deps... @@ -129,7 +130,6 @@ percent-encoding = "2.3" tokio = { version = "1.0", default-features = false, features = ["net", "time"] } tower = { version = "0.5.2", default-features = false, features = ["timeout", "util"] } tower-http = { version = "0.6.5", default-features = false, features = ["follow-redirect"] } -pin-project-lite = "0.2.11" # Optional deps... rustls-pki-types = { version = "1.9.0", features = ["std"], optional = true } diff --git a/src/lib.rs b/src/lib.rs index 66e286a45..e658db9e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -333,8 +333,11 @@ fn _assert_impls() { assert_sync::(); assert_clone::(); - assert_send::(); - assert_send::(); + #[cfg(not(target_arch = "wasm32"))] + { + assert_send::(); + assert_send::(); + } #[cfg(not(target_arch = "wasm32"))] { @@ -344,8 +347,11 @@ fn _assert_impls() { assert_send::(); assert_sync::(); - assert_send::(); - assert_sync::(); + #[cfg(not(target_arch = "wasm32"))] + { + assert_send::(); + assert_sync::(); + } } if_hyper! { diff --git a/src/wasm/body.rs b/src/wasm/body.rs index 241aa8173..4e8ee10ec 100644 --- a/src/wasm/body.rs +++ b/src/wasm/body.rs @@ -22,6 +22,8 @@ enum Inner { /// MultipartForm holds a multipart/form-data body. #[cfg(feature = "multipart")] MultipartForm(Form), + #[cfg(feature = "stream")] + Streaming(Streaming), } #[derive(Clone)] @@ -58,6 +60,15 @@ impl Single { } } +pub(crate) type BodyFuture = + std::pin::Pin> + 'static>>; + +#[cfg(feature = "stream")] +pub(crate) struct Streaming { + write_fut: BodyFuture, + readable: web_sys::ReadableStream, +} + impl Body { /// Returns a reference to the internal data of the `Body`. /// @@ -68,6 +79,56 @@ impl Body { Inner::Single(single) => Some(single.as_bytes()), #[cfg(feature = "multipart")] Inner::MultipartForm(_) => None, + #[cfg(feature = "stream")] + Inner::Streaming(_) => None, + } + } + + /// Turn a futures `Stream` into a JS `ReadableStream`. + /// + /// # Example + /// + /// ``` + /// # use reqwest::Body; + /// # use futures_util; + /// # fn main() { + /// let chunks: Vec> = vec![ + /// Ok("hello"), + /// Ok(" "), + /// Ok("world"), + /// ]; + /// + /// let stream = futures_util::stream::iter(chunks); + /// + /// let body = Body::wrap_stream(stream); + /// # } + /// ``` + /// + /// # Optional + /// + /// This requires the `stream` feature to be enabled. + #[cfg(feature = "stream")] + #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] + pub fn wrap_stream(stream: S) -> Body + where + S: futures_core::stream::TryStream + 'static, + S::Error: Into>, + Bytes: From, + { + use futures_util::{FutureExt, StreamExt, TryStreamExt}; + use wasm_bindgen::{JsError, UnwrapThrowExt}; + + let transform_stream = + wasm_streams::TransformStream::from_raw(web_sys::TransformStream::new().unwrap_throw()); + Body { + inner: Inner::Streaming(Streaming { + write_fut: stream + .map_ok(|b| Single::Bytes(b.into()).to_js_value()) + .map_err(|err| JsValue::from(JsError::new(&err.into().to_string()))) + .forward(transform_stream.writable().into_sink()) + .boxed_local(), + readable: transform_stream.readable().into_raw(), + }), } } @@ -80,6 +141,18 @@ impl Body { let js_value: &JsValue = form_data.as_ref(); Ok(js_value.to_owned()) } + #[cfg(feature = "stream")] + Inner::Streaming(streaming) => Ok(streaming.readable.clone().into()), + } + } + + pub(crate) fn into_future(self) -> Option { + match self.inner { + Inner::Single(_) => None, + #[cfg(feature = "multipart")] + Inner::MultipartForm(_) => None, + #[cfg(feature = "stream")] + Inner::Streaming(streaming) => Some(streaming.write_fut), } } @@ -88,6 +161,8 @@ impl Body { match &self.inner { Inner::Single(single) => Some(single), Inner::MultipartForm(_) => None, + #[cfg(feature = "stream")] + Inner::Streaming(_) => None, } } @@ -109,6 +184,10 @@ impl Body { Inner::MultipartForm(form) => Self { inner: Inner::MultipartForm(form), }, + #[cfg(feature = "stream")] + Inner::Streaming(streaming) => Self { + inner: Inner::Streaming(streaming), + }, } } @@ -117,6 +196,8 @@ impl Body { Inner::Single(single) => single.is_empty(), #[cfg(feature = "multipart")] Inner::MultipartForm(form) => form.is_empty(), + #[cfg(feature = "stream")] + Inner::Streaming(_) => false, } } @@ -127,6 +208,8 @@ impl Body { }), #[cfg(feature = "multipart")] Inner::MultipartForm(_) => None, + #[cfg(feature = "stream")] + Inner::Streaming(_) => None, } } } diff --git a/src/wasm/client.rs b/src/wasm/client.rs index f6d303e77..9d7f59095 100644 --- a/src/wasm/client.rs +++ b/src/wasm/client.rs @@ -1,8 +1,14 @@ +use std::convert::TryInto; +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{ready, Context, Poll}; + use http::header::USER_AGENT; use http::{HeaderMap, HeaderValue, Method}; use js_sys::{Promise, JSON}; -use std::convert::TryInto; -use std::{fmt, future::Future, sync::Arc}; +use pin_project_lite::pin_project; use url::Url; use wasm_bindgen::prelude::{wasm_bindgen, UnwrapThrowExt as _}; @@ -182,11 +188,46 @@ impl fmt::Debug for ClientBuilder { } } +pin_project! { + struct Pending { + #[pin] + body_fut: Option, + #[pin] + fetch: wasm_bindgen_futures::JsFuture, + } +} + +impl Future for Pending { + type Output = Result; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use wasm_bindgen::JsCast; + + let mut this = self.project(); + if let Some(body_fut) = this.body_fut.as_mut().as_pin_mut() { + if let Poll::Ready(res) = body_fut.poll(cx) { + this.body_fut.set(None); + if let Err(err) = res { + return Poll::Ready(Err(crate::error::wasm(err))); + } + } + } + Poll::Ready( + ready!(this.fetch.poll(cx)) + .map_err(crate::error::wasm) + .and_then(|js_resp| { + js_resp + .dyn_into::() + .map_err(|_js_val| "promise resolved to unexpected type".into()) + }), + ) + } +} + // Can use new methods in web-sys when requiring v0.2.93. // > `init.method(m)` to `init.set_method(m)` // For now, ignore their deprecation. #[allow(deprecated)] -async fn fetch(req: Request) -> crate::Result { +async fn fetch(mut req: Request) -> crate::Result { // Build the js Request let mut init = web_sys::RequestInit::new(); init.method(req.method().as_str()); @@ -216,11 +257,22 @@ async fn fetch(req: Request) -> crate::Result { init.credentials(creds); } - if let Some(body) = req.body() { + let body_fut = if let Some(body) = req.body_mut().take() { if !body.is_empty() { init.body(Some(body.to_js_value()?.as_ref())); + let fut = body.into_future(); + if fut.is_some() { + js_sys::Reflect::set(&init, &"duplex".into(), &"half".into()) + .map_err(crate::error::wasm) + .map_err(crate::error::builder)?; + } + fut + } else { + None } - } + } else { + None + }; let mut abort = AbortGuard::new()?; if let Some(timeout) = req.timeout() { @@ -233,8 +285,11 @@ async fn fetch(req: Request) -> crate::Result { .map_err(crate::error::builder)?; // Await the fetch() promise - let p = js_fetch(&js_req); - let js_resp = super::promise::(p) + let pending = Pending { + body_fut, + fetch: js_fetch(&js_req).into(), + }; + let js_resp = pending .await .map_err(|error| { if error.to_string() == "JsValue(\"reqwest::errors::TimedOut\")" {