Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,19 +333,17 @@ fn _assert_impls() {
assert_sync::<Client>();
assert_clone::<Client>();

assert_send::<Request>();
assert_send::<RequestBuilder>();
assert_send::<Error>();
assert_sync::<Error>();

#[cfg(not(target_arch = "wasm32"))]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know what do you think about this change.

cc @seanmonstar

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a breaking change. And perhaps surprising that it only is true on a certain target?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that might be a breaking change for users relying on Send + Sync for Body on the wasm32 target, although I don’t think there are valid use cases for it. Also, Response has already been handled separately.

Do you have suggestions for this? Would it be a good idea to have a separate response Body type for the wasm32 target?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I just realized that Body isn’t exposed at all in the wasm target. My current plan is to keep Request unchanged but replace its internal Body with RequestBody. Then introduce a new public Body type that’s non-Send and currently used only by Response.

  • Request -> pub (crate) RequestBody (Send + Sync)
  • Response -> pub Body (non-Send)

What do you think? @seanmonstar

{
assert_send::<Request>();
assert_send::<RequestBuilder>();
assert_send::<Response>();
assert_send::<Body>();
assert_sync::<Body>();
}

assert_send::<Error>();
assert_sync::<Error>();

assert_send::<Body>();
assert_sync::<Body>();
}

if_hyper! {
Expand Down
137 changes: 137 additions & 0 deletions src/wasm/body.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
#[cfg(feature = "multipart")]
use super::multipart::Form;
use super::AbortGuard;
/// dox
use bytes::Bytes;
#[cfg(feature = "stream")]
use futures_core::Stream;
#[cfg(feature = "stream")]
use futures_util::stream::{self, StreamExt};
use js_sys::Uint8Array;
#[cfg(feature = "stream")]
use std::pin::Pin;
use std::{borrow::Cow, fmt};
#[cfg(feature = "stream")]
use wasm_bindgen::JsCast;
use wasm_bindgen::JsValue;
use web_sys::Response as WebResponse;

/// The body of a `Request`.
///
Expand All @@ -22,6 +32,7 @@ enum Inner {
/// MultipartForm holds a multipart/form-data body.
#[cfg(feature = "multipart")]
MultipartForm(Form),
Streaming(StreamingBody),
}

#[derive(Clone)]
Expand Down Expand Up @@ -58,6 +69,70 @@ impl Single {
}
}

struct StreamingBody {
response: WebResponse,
abort: AbortGuard,
}

impl StreamingBody {
#[cfg(feature = "stream")]
fn into_stream(self) -> Pin<Box<dyn Stream<Item = crate::Result<Bytes>>>> {
let StreamingBody { response, abort } = self;

if let Some(body) = response.body() {
let abort = abort;
let body = wasm_streams::ReadableStream::from_raw(body.unchecked_into());
Box::pin(body.into_stream().map(move |buf_js| {
// Keep the abort guard alive while the stream is active.
let _abort = &abort;
let buf_js = buf_js
.map_err(crate::error::wasm)
.map_err(crate::error::decode)?;
let buffer = Uint8Array::new(&buf_js);
let mut bytes = vec![0; buffer.length() as usize];
buffer.copy_to(&mut bytes);
Ok(bytes.into())
}))
} else {
drop(abort);
Box::pin(stream::empty())
}
}

async fn into_bytes(self) -> crate::Result<Bytes> {
let StreamingBody { response, abort } = self;
let promise = response
.array_buffer()
.map_err(crate::error::wasm)
.map_err(crate::error::decode)?;
let js_value = super::promise::<wasm_bindgen::JsValue>(promise)
.await
.map_err(crate::error::decode)?;
drop(abort);
let buffer = Uint8Array::new(&js_value);
let mut bytes = vec![0; buffer.length() as usize];
buffer.copy_to(&mut bytes);
Ok(bytes.into())
}

async fn into_text(self) -> crate::Result<String> {
let StreamingBody { response, abort } = self;
let promise = response
.text()
.map_err(crate::error::wasm)
.map_err(crate::error::decode)?;
let js_value = super::promise::<wasm_bindgen::JsValue>(promise)
.await
.map_err(crate::error::decode)?;
drop(abort);
if let Some(text) = js_value.as_string() {
Ok(text)
} else {
Err(crate::error::decode("response.text isn't string"))
}
}
}

impl Body {
/// Returns a reference to the internal data of the `Body`.
///
Expand All @@ -68,6 +143,7 @@ impl Body {
Inner::Single(single) => Some(single.as_bytes()),
#[cfg(feature = "multipart")]
Inner::MultipartForm(_) => None,
Inner::Streaming(_) => None,
}
}

Expand All @@ -80,6 +156,9 @@ impl Body {
let js_value: &JsValue = form_data.as_ref();
Ok(js_value.to_owned())
}
Inner::Streaming(_) => Err(crate::error::decode(
"streaming body cannot be converted to JsValue",
)),
}
}

Expand Down Expand Up @@ -117,6 +196,7 @@ impl Body {
Inner::Single(single) => single.is_empty(),
#[cfg(feature = "multipart")]
Inner::MultipartForm(form) => form.is_empty(),
Inner::Streaming(_) => false,
}
}

Expand All @@ -127,6 +207,63 @@ impl Body {
}),
#[cfg(feature = "multipart")]
Inner::MultipartForm(_) => None,
Inner::Streaming(_) => None,
}
}

pub(super) fn from_response(response: WebResponse, abort: AbortGuard) -> Body {
if response.body().is_some() {
Body {
inner: Inner::Streaming(StreamingBody { response, abort }),
}
} else {
// Even without a body, ensure the guard lives until completion.
drop(abort);
Body::default()
}
}

/// Consume the body into bytes.
pub async fn bytes(self) -> crate::Result<Bytes> {
match self.inner {
Inner::Single(Single::Bytes(bytes)) => Ok(bytes),
Inner::Single(Single::Text(text)) => Ok(Bytes::copy_from_slice(text.as_bytes())),
#[cfg(feature = "multipart")]
Inner::MultipartForm(_) => Err(crate::error::decode(
"multipart body cannot be converted into bytes",
)),
Inner::Streaming(streaming) => streaming.into_bytes().await,
}
}

/// Consume the body into a UTF-8 string.
pub async fn text(self) -> crate::Result<String> {
match self.inner {
Inner::Single(Single::Bytes(bytes)) => String::from_utf8(bytes.to_vec())
.map_err(|_| crate::error::decode("body is not valid UTF-8")),
Inner::Single(Single::Text(text)) => Ok(text.into_owned()),
#[cfg(feature = "multipart")]
Inner::MultipartForm(_) => Err(crate::error::decode(
"multipart body cannot be converted into text",
)),
Inner::Streaming(streaming) => streaming.into_text().await,
}
}

/// Convert the body into a stream of `Bytes`.
#[cfg(feature = "stream")]
#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
pub fn bytes_stream(self) -> Pin<Box<dyn Stream<Item = crate::Result<Bytes>>>> {
match self.inner {
Inner::Single(single) => {
let bytes = Bytes::copy_from_slice(single.as_bytes());
Box::pin(stream::once(async move { Ok(bytes) }))
}
#[cfg(feature = "multipart")]
Inner::MultipartForm(_) => Box::pin(stream::once(async {
Err(crate::error::decode("multipart body cannot be streamed"))
})),
Inner::Streaming(streaming) => streaming.into_stream(),
}
}
}
Expand Down
74 changes: 17 additions & 57 deletions src/wasm/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,11 @@ use std::fmt;

use bytes::Bytes;
use http::{HeaderMap, StatusCode};
use js_sys::Uint8Array;
use url::Url;

use crate::wasm::AbortGuard;
use crate::{response::ResponseUrl, wasm::AbortGuard};

#[cfg(feature = "stream")]
use wasm_bindgen::JsCast;

#[cfg(feature = "stream")]
use futures_util::stream::{self, StreamExt};
use super::Body;

#[cfg(feature = "json")]
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -97,68 +92,23 @@ impl Response {

/// Get the response text.
pub async fn text(self) -> crate::Result<String> {
let p = self
.http
.body()
Body::from_response(self.http.into_body(), self._abort)
.text()
.map_err(crate::error::wasm)
.map_err(crate::error::decode)?;
let js_val = super::promise::<wasm_bindgen::JsValue>(p)
.await
.map_err(crate::error::decode)?;
if let Some(s) = js_val.as_string() {
Ok(s)
} else {
Err(crate::error::decode("response.text isn't string"))
}
}

/// Get the response as bytes
pub async fn bytes(self) -> crate::Result<Bytes> {
let p = self
.http
.body()
.array_buffer()
.map_err(crate::error::wasm)
.map_err(crate::error::decode)?;

let buf_js = super::promise::<wasm_bindgen::JsValue>(p)
Body::from_response(self.http.into_body(), self._abort)
.bytes()
.await
.map_err(crate::error::decode)?;

let buffer = Uint8Array::new(&buf_js);
let mut bytes = vec![0; buffer.length() as usize];
buffer.copy_to(&mut bytes);
Ok(bytes.into())
}

/// Convert the response into a `Stream` of `Bytes` from the body.
#[cfg(feature = "stream")]
pub fn bytes_stream(self) -> impl futures_core::Stream<Item = crate::Result<Bytes>> {
use futures_core::Stream;
use std::pin::Pin;

let web_response = self.http.into_body();
let abort = self._abort;

if let Some(body) = web_response.body() {
let body = wasm_streams::ReadableStream::from_raw(body.unchecked_into());
Box::pin(body.into_stream().map(move |buf_js| {
// Keep the abort guard alive as long as this stream is.
let _abort = &abort;
let buffer = Uint8Array::new(
&buf_js
.map_err(crate::error::wasm)
.map_err(crate::error::decode)?,
);
let mut bytes = vec![0; buffer.length() as usize];
buffer.copy_to(&mut bytes);
Ok(bytes.into())
})) as Pin<Box<dyn Stream<Item = crate::Result<Bytes>>>>
} else {
// If there's no body, return an empty stream.
Box::pin(stream::empty()) as Pin<Box<dyn Stream<Item = crate::Result<Bytes>>>>
}
let body = Body::from_response(self.http.into_body(), self._abort);
body.bytes_stream()
}

// util methods
Expand Down Expand Up @@ -193,3 +143,13 @@ impl fmt::Debug for Response {
.finish()
}
}

impl From<Response> for http::Response<Body> {
fn from(response: Response) -> http::Response<Body> {
let Response { http, _abort, url } = response;
let (mut parts, body) = http.into_parts();
parts.extensions.insert(ResponseUrl(*url));
let body = Body::from_response(body, _abort);
http::Response::from_parts(parts, body)
}
}
31 changes: 31 additions & 0 deletions tests/wasm_simple.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#![cfg(target_arch = "wasm32")]
use std::time::Duration;

#[cfg(feature = "stream")]
use futures_util::StreamExt;
use wasm_bindgen::prelude::*;
use wasm_bindgen_test::*;
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
Expand Down Expand Up @@ -37,3 +39,32 @@ async fn request_with_timeout() {
assert!(err.is_request());
assert!(err.is_timeout());
}

#[wasm_bindgen_test]
async fn convert_response_into_http_response_body_bytes() {
let res = reqwest::get("https://hyper.rs").await.expect("fetch body");

let http_res: http::Response<reqwest::Body> = res.into();
let body = http_res.into_body();
let bytes = body.bytes().await.expect("read body bytes");

assert!(!bytes.is_empty());
}

#[cfg(feature = "stream")]
#[wasm_bindgen_test]
async fn convert_response_into_http_response_body_stream() {
let res = reqwest::get("https://hyper.rs")
.await
.expect("fetch streaming bytes");

let http_res: http::Response<reqwest::Body> = res.into();
let mut stream = http_res.into_body().bytes_stream();
let mut total = 0usize;

while let Some(chunk) = stream.next().await {
total += chunk.expect("stream chunk").len();
}

assert!(total > 0);
}