Skip to content

Various fixes and updates for aws-smithy-http including errors and impl futures::Stream for SdkBody #1733

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aws/rust-runtime/aws-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ aws-types = { path = "../../sdk/build/aws-sdk/sdk/aws-types" }
time = { version = "0.3.4", features = ["parsing"] }
tokio = { version = "1.8.4", features = ["sync"] }
tracing = { version = "0.1" }
hyper = { version = "0.14.12", default-features = false }
hyper = { version = "0.14.20", default-features = false }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was the hyper upgrade necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't, I just saw that the server folks were using .20 so I figured I'd update our stuff too. I can remove this if you like.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


aws-http = { path = "../../sdk/build/aws-sdk/sdk/aws-http" }
aws-smithy-http = { path = "../../sdk/build/aws-sdk/sdk/aws-smithy-http" }
Expand Down
2 changes: 1 addition & 1 deletion aws/sdk/integration-tests/s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ bytes = "1"
bytes-utils = "0.1.2"
http = "0.2.3"
http-body = "0.4.5"
hyper = "0.14.12"
hyper = "0.14.20"
serde_json = "1"
tempfile = "3"
smol = "1.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ data class CargoDependency(
val Hex: CargoDependency = CargoDependency("hex", CratesIo("0.4.3"))
val HttpBody: CargoDependency = CargoDependency("http-body", CratesIo("0.4.4"))
val Http: CargoDependency = CargoDependency("http", CratesIo("0.2.0"))
val Hyper: CargoDependency = CargoDependency("hyper", CratesIo("0.14.12"))
val Hyper: CargoDependency = CargoDependency("hyper", CratesIo("0.14.20"))
val HyperWithStream: CargoDependency = Hyper.withFeature("stream")
val LazyStatic: CargoDependency = CargoDependency("lazy_static", CratesIo("1.4.0"))
val Md5: CargoDependency = CargoDependency("md-5", CratesIo("0.10.0"), rustName = "md5")
Expand Down
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ bytes = "1"
fastrand = "1.4.0"
http = "0.2.3"
http-body = "0.4.4"
hyper = { version = "0.14.12", features = ["client", "http2", "http1"], optional = true }
hyper = { version = "0.14.20", features = ["client", "http2", "http1"], optional = true }
hyper-rustls = { version = "0.22.1", optional = true, features = ["rustls-native-certs"] }
hyper-tls = { version = "0.5.0", optional = true }
lazy_static = { version = "1", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-http-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ bytes = "1.1"
futures-util = { version = "0.3", default-features = false }
http = "0.2"
http-body = "0.4"
hyper = { version = "0.14.12", features = ["server", "http1", "http2", "tcp", "stream"] }
hyper = { version = "0.14.20", features = ["server", "http1", "http2", "tcp", "stream"] }
lambda_http = "0.6.0"
mime = "0.3"
nom = "7"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ path = "src/lambda.rs"
[dependencies]
async-stream = "0.3"
clap = { version = "~3.2.1", features = ["derive"] }
hyper = {version = "0.14.12", features = ["server"] }
hyper = {version = "0.14.20", features = ["server"] }
lambda_http = "0.6.0"
rand = "0.8"
tokio = "1.20.1"
Expand Down
23 changes: 6 additions & 17 deletions rust-runtime/aws-smithy-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,25 @@ aws-smithy-eventstream = { path = "../aws-smithy-eventstream", optional = true }
aws-smithy-types = { path = "../aws-smithy-types" }
bytes = "1"
bytes-utils = "0.1"
futures = "0.3.24"
http = "0.2.3"
http-body = "0.4.4"
hyper = "0.14.20"
once_cell = "1.10"
percent-encoding = "2.1.0"
pin-project-lite = "0.2.9"
tracing = "0.1"

# We are using hyper for our streaming body implementation, but this is an internal detail.
hyper = "0.14.12"

# ByteStream internals
futures-core = "0.3.14"
tokio = { version = "1.8.4", optional = true }
tokio-util = { version = "0.7", optional = true }
tracing = "0.1"

[dev-dependencies]
async-stream = "0.3"
futures-util = "0.3"
hyper = { version = "0.14.12", features = ["stream"] }
hyper = { version = "0.14.20", features = ["stream"] }
pretty_assertions = "1.2"
proptest = "1"
tokio = { version = "1.8.4", features = [
"macros",
"rt",
"rt-multi-thread",
"fs",
"io-util",
] }
tokio-stream = "0.1.5"
tempfile = "3.2.0"
tokio = { version = "1.8.4", features = ["macros", "rt", "rt-multi-thread", "fs", "io-util", ] }
tokio-stream = "0.1.5"
tracing-test = "0.2.1"

[package.metadata.docs.rs]
Expand Down
145 changes: 76 additions & 69 deletions rust-runtime/aws-smithy-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
*/

use bytes::Bytes;
use futures::Stream;
use http::{HeaderMap, HeaderValue};
use http_body::{Body, SizeHint};
use pin_project_lite::pin_project;

use std::error::Error as StdError;
use std::fmt::{self, Debug, Formatter};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::callback::BodyCallback;
use crate::header::append_merge_header_maps;

pub type Error = Box<dyn StdError + Send + Sync>;

pin_project! {
Expand All @@ -35,9 +34,6 @@ pin_project! {
// In the event of retry, this function will be called to generate a new body. See
// [`try_clone()`](SdkBody::try_clone)
rebuild: Option<Arc<dyn (Fn() -> Inner) + Send + Sync>>,
// A list of callbacks that will be called at various points of this `SdkBody`'s lifecycle
#[pin]
callbacks: Vec<Box<dyn BodyCallback>>,
}
}

Expand Down Expand Up @@ -96,7 +92,6 @@ impl SdkBody {
Self {
inner: Inner::Dyn { inner: body },
rebuild: None,
callbacks: Vec::new(),
}
}

Expand All @@ -113,31 +108,28 @@ impl SdkBody {
SdkBody {
inner: initial.inner,
rebuild: Some(Arc::new(move || f().inner)),
callbacks: Vec::new(),
}
}

pub fn taken() -> Self {
Self {
inner: Inner::Taken,
rebuild: None,
callbacks: Vec::new(),
}
}

pub fn empty() -> Self {
Self {
inner: Inner::Once { inner: None },
rebuild: Some(Arc::new(|| Inner::Once { inner: None })),
callbacks: Vec::new(),
}
}

fn poll_inner(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
let mut this = self.project();
let this = self.project();
let polling_result = match this.inner.project() {
InnerProj::Once { ref mut inner } => {
let data = inner.take();
Expand All @@ -154,26 +146,6 @@ impl SdkBody {
}
};

match &polling_result {
// When we get some bytes back from polling, pass those bytes to each callback in turn
Poll::Ready(Some(Ok(bytes))) => {
for callback in this.callbacks.iter_mut() {
// Callbacks can run into errors when reading bytes. They'll be surfaced here
callback.update(bytes)?;
}
}
// When we're done polling for bytes, run each callback's `trailers()` method. If any calls to
// `trailers()` return an error, propagate that error up. Otherwise, continue.
Poll::Ready(None) => {
for callback_result in this.callbacks.iter().map(BodyCallback::trailers) {
if let Err(e) = callback_result {
return Poll::Ready(Some(Err(e)));
}
}
}
_ => (),
}

polling_result
}

Expand All @@ -192,12 +164,9 @@ impl SdkBody {
pub fn try_clone(&self) -> Option<Self> {
self.rebuild.as_ref().map(|rebuild| {
let next = rebuild();
let callbacks = self.callbacks.iter().map(BodyCallback::make_new).collect();

Self {
inner: next,
rebuild: self.rebuild.clone(),
callbacks,
}
})
}
Expand All @@ -206,11 +175,6 @@ impl SdkBody {
http_body::Body::size_hint(self).exact()
}

pub fn with_callback(&mut self, callback: Box<dyn BodyCallback>) -> &mut Self {
self.callbacks.push(callback);
self
}

pub fn map(self, f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static) -> SdkBody {
if self.rebuild.is_some() {
SdkBody::retryable(move || f(self.try_clone().unwrap()))
Expand All @@ -235,7 +199,6 @@ impl From<Bytes> for SdkBody {
rebuild: Some(Arc::new(move || Inner::Once {
inner: Some(bytes.clone()),
})),
callbacks: Vec::new(),
}
}
}
Expand All @@ -245,7 +208,6 @@ impl From<hyper::Body> for SdkBody {
SdkBody {
inner: Inner::Streaming { inner: body },
rebuild: None,
callbacks: Vec::new(),
}
}
}
Expand Down Expand Up @@ -283,38 +245,17 @@ impl http_body::Body for SdkBody {
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
let mut header_map = None;
// Iterate over all callbacks, checking each for any `HeaderMap`s
for callback in &self.callbacks {
match callback.trailers() {
// If this is the first `HeaderMap` we've encountered, save it
Ok(Some(right_header_map)) if header_map.is_none() => {
header_map = Some(right_header_map);
}
// If this is **not** the first `HeaderMap` we've encountered, merge it
Ok(Some(right_header_map)) if header_map.is_some() => {
header_map = Some(append_merge_header_maps(
header_map.unwrap(),
right_header_map,
));
}
// Early return if a callback encountered an error.
Err(e) => {
return Poll::Ready(Err(e));
}
// Otherwise, continue on to the next iteration of the loop.
_ => continue,
}
}
Poll::Ready(Ok(header_map))
// `SdkBody`s have no trailers. If returning trailers is necessary,
// use `SdkBody::map` to wrap this body with another body that does return trailers.
Poll::Ready(Ok(None))
}

fn is_end_stream(&self) -> bool {
match &self.inner {
Inner::Once { inner: None } => true,
Inner::Once { inner: Some(bytes) } => bytes.is_empty(),
Inner::Streaming { inner: hyper_body } => hyper_body.is_end_stream(),
Inner::Dyn { inner: box_body } => box_body.is_end_stream(),
Inner::Streaming { inner } => inner.is_end_stream(),
Inner::Dyn { inner } => inner.is_end_stream(),
Inner::Taken => true,
}
}
Expand All @@ -323,13 +264,47 @@ impl http_body::Body for SdkBody {
match &self.inner {
Inner::Once { inner: None } => SizeHint::with_exact(0),
Inner::Once { inner: Some(bytes) } => SizeHint::with_exact(bytes.len() as u64),
Inner::Streaming { inner: hyper_body } => hyper_body.size_hint(),
Inner::Dyn { inner: box_body } => box_body.size_hint(),
Inner::Streaming { inner } => http_body::Body::size_hint(inner),
Inner::Dyn { inner } => http_body::Body::size_hint(inner),
Inner::Taken => SizeHint::new(),
}
}
}

const SIZE_HINT_32_BIT_PANIC_MESSAGE: &str = r#"
You're running a 32-bit system and this stream's length is too large to be represented with a usize.
Please limit stream length to less than 4.294Gb or run this program on a 64-bit computer architecture.
"#;

impl Stream for SdkBody {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reasoning for not doing this up until now was that AsyncIterator was close to becoming stabilized in std, and this will be obsolete as soon as that happens. This is definitely worth discussing further since it looks like that will take longer due to discussion of generic keywords. Maybe we should create another runtime crate similar to aws-smithy-types-convert that adds in useful conversions to the current set of async traits that are widely used until they become std.

There are a couple of crates that need to get revisited as we get closer to a stable release:

type Item = Result<Bytes, std::io::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// By converting to IO errors here, we get access to a bunch of `futures` extension traits.
// We get access to extension traits from `tokio` too!
//
// Convert an `SdkBody` into a `tokio::io::AsyncRead` like so:
// let body = tokio_util::io::StreamReader::new(SdkBody::from(input));
//
// Convert an `SdkBody` into a `futures::io::AsyncRead` like so:
// let body = SdkBody::from(input).into_async_read();
self.poll_data(cx).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
}

fn size_hint(&self) -> (usize, Option<usize>) {
let size_hint = http_body::Body::size_hint(self);
match (
size_hint.lower().try_into(),
size_hint.upper().map(TryInto::try_into).transpose(),
) {
(Ok(lower), Ok(upper)) => (lower, upper),
(Err(_), _) | (_, Err(_)) => {
panic!("{}", SIZE_HINT_32_BIT_PANIC_MESSAGE)
}
}
}
}

#[cfg(test)]
mod test {
use crate::body::{BoxBody, SdkBody};
Expand Down Expand Up @@ -395,4 +370,36 @@ mod test {
fn is_send<T: Send>() {}
is_send::<SdkBody>()
}


#[cfg(feature = "rt-tokio")]
#[tokio::test]
async fn sdk_body_can_be_tokio_async_read() {
use tokio::io::AsyncBufReadExt;

let body = SdkBody::from("data 1\ndata 2\ndata 3\n");
let body = tokio_util::io::StreamReader::new(body);
let async_buf_read = tokio::io::BufReader::new(body);

let mut lines = async_buf_read.lines();

assert_eq!(lines.next_line().await.unwrap(), Some("data 1".to_owned()));
assert_eq!(lines.next_line().await.unwrap(), Some("data 2".to_owned()));
assert_eq!(lines.next_line().await.unwrap(), Some("data 3".to_owned()));
assert_eq!(lines.next_line().await.unwrap(), None);
}

#[cfg(feature = "rt-tokio")]
#[tokio::test]
async fn sdk_body_can_be_futures_async_read() {
use futures::AsyncReadExt;
use futures::stream::TryStreamExt;

let input: &[u8] = b"This is a test body.";
let mut body = SdkBody::from(input).into_async_read();
let mut output = [0u8; 20];
let _ = body.read(&mut output).await.unwrap();

assert_eq!(&output, input);
}
}
Loading