diff --git a/aws/rust-runtime/aws-config/Cargo.toml b/aws/rust-runtime/aws-config/Cargo.toml index 2cafa33a75..3abc3361bb 100644 --- a/aws/rust-runtime/aws-config/Cargo.toml +++ b/aws/rust-runtime/aws-config/Cargo.toml @@ -26,7 +26,7 @@ aws-types = { path = "../../sdk/build/aws-sdk/sdk/aws-types" } time = { version = "0.3.4", features = ["parsing"] } tokio = { version = "1.8.4", features = ["sync"] } tracing = { version = "0.1" } -hyper = { version = "0.14.12", default-features = false } +hyper = { version = "0.14.20", default-features = false } aws-http = { path = "../../sdk/build/aws-sdk/sdk/aws-http" } aws-smithy-http = { path = "../../sdk/build/aws-sdk/sdk/aws-smithy-http" } diff --git a/aws/sdk/integration-tests/s3/Cargo.toml b/aws/sdk/integration-tests/s3/Cargo.toml index f9064dcaf7..2611955f4b 100644 --- a/aws/sdk/integration-tests/s3/Cargo.toml +++ b/aws/sdk/integration-tests/s3/Cargo.toml @@ -23,7 +23,7 @@ bytes = "1" bytes-utils = "0.1.2" http = "0.2.3" http-body = "0.4.5" -hyper = "0.14.12" +hyper = "0.14.20" serde_json = "1" tempfile = "3" smol = "1.2" diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/rustlang/CargoDependency.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/rustlang/CargoDependency.kt index 192393792d..c74f1bf2b7 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/rustlang/CargoDependency.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/rustlang/CargoDependency.kt @@ -196,7 +196,7 @@ data class CargoDependency( val Hex: CargoDependency = CargoDependency("hex", CratesIo("0.4.3")) val HttpBody: CargoDependency = CargoDependency("http-body", CratesIo("0.4.4")) val Http: CargoDependency = CargoDependency("http", CratesIo("0.2.0")) - val Hyper: CargoDependency = CargoDependency("hyper", CratesIo("0.14.12")) + val Hyper: CargoDependency = CargoDependency("hyper", CratesIo("0.14.20")) val HyperWithStream: CargoDependency = Hyper.withFeature("stream") val LazyStatic: CargoDependency = CargoDependency("lazy_static", CratesIo("1.4.0")) val Md5: CargoDependency = CargoDependency("md-5", CratesIo("0.10.0"), rustName = "md5") diff --git a/rust-runtime/aws-smithy-client/Cargo.toml b/rust-runtime/aws-smithy-client/Cargo.toml index 30337e9db5..45a8da6245 100644 --- a/rust-runtime/aws-smithy-client/Cargo.toml +++ b/rust-runtime/aws-smithy-client/Cargo.toml @@ -24,7 +24,7 @@ bytes = "1" fastrand = "1.4.0" http = "0.2.3" http-body = "0.4.4" -hyper = { version = "0.14.12", features = ["client", "http2", "http1"], optional = true } +hyper = { version = "0.14.20", features = ["client", "http2", "http1"], optional = true } hyper-rustls = { version = "0.22.1", optional = true, features = ["rustls-native-certs"] } hyper-tls = { version = "0.5.0", optional = true } lazy_static = { version = "1", optional = true } diff --git a/rust-runtime/aws-smithy-http-server/Cargo.toml b/rust-runtime/aws-smithy-http-server/Cargo.toml index 055b7aab8c..2cf916ee02 100644 --- a/rust-runtime/aws-smithy-http-server/Cargo.toml +++ b/rust-runtime/aws-smithy-http-server/Cargo.toml @@ -25,7 +25,7 @@ bytes = "1.1" futures-util = { version = "0.3", default-features = false } http = "0.2" http-body = "0.4" -hyper = { version = "0.14.12", features = ["server", "http1", "http2", "tcp", "stream"] } +hyper = { version = "0.14.20", features = ["server", "http1", "http2", "tcp", "stream"] } lambda_http = "0.6.0" mime = "0.3" nom = "7" diff --git a/rust-runtime/aws-smithy-http-server/examples/pokemon-service/Cargo.toml b/rust-runtime/aws-smithy-http-server/examples/pokemon-service/Cargo.toml index 81821cd46d..ea382bbf41 100644 --- a/rust-runtime/aws-smithy-http-server/examples/pokemon-service/Cargo.toml +++ b/rust-runtime/aws-smithy-http-server/examples/pokemon-service/Cargo.toml @@ -22,7 +22,7 @@ path = "src/lambda.rs" [dependencies] async-stream = "0.3" clap = { version = "~3.2.1", features = ["derive"] } -hyper = {version = "0.14.12", features = ["server"] } +hyper = {version = "0.14.20", features = ["server"] } lambda_http = "0.6.0" rand = "0.8" tokio = "1.20.1" diff --git a/rust-runtime/aws-smithy-http/Cargo.toml b/rust-runtime/aws-smithy-http/Cargo.toml index 3593989fd6..becfe0b3e4 100644 --- a/rust-runtime/aws-smithy-http/Cargo.toml +++ b/rust-runtime/aws-smithy-http/Cargo.toml @@ -19,36 +19,25 @@ aws-smithy-eventstream = { path = "../aws-smithy-eventstream", optional = true } aws-smithy-types = { path = "../aws-smithy-types" } bytes = "1" bytes-utils = "0.1" +futures = "0.3.24" http = "0.2.3" http-body = "0.4.4" +hyper = "0.14.20" once_cell = "1.10" percent-encoding = "2.1.0" pin-project-lite = "0.2.9" -tracing = "0.1" - -# We are using hyper for our streaming body implementation, but this is an internal detail. -hyper = "0.14.12" - -# ByteStream internals -futures-core = "0.3.14" tokio = { version = "1.8.4", optional = true } tokio-util = { version = "0.7", optional = true } +tracing = "0.1" [dev-dependencies] async-stream = "0.3" -futures-util = "0.3" -hyper = { version = "0.14.12", features = ["stream"] } +hyper = { version = "0.14.20", features = ["stream"] } pretty_assertions = "1.2" proptest = "1" -tokio = { version = "1.8.4", features = [ - "macros", - "rt", - "rt-multi-thread", - "fs", - "io-util", -] } -tokio-stream = "0.1.5" tempfile = "3.2.0" +tokio = { version = "1.8.4", features = ["macros", "rt", "rt-multi-thread", "fs", "io-util", ] } +tokio-stream = "0.1.5" tracing-test = "0.2.1" [package.metadata.docs.rs] diff --git a/rust-runtime/aws-smithy-http/src/body.rs b/rust-runtime/aws-smithy-http/src/body.rs index f9cc9f3635..b450bd6a89 100644 --- a/rust-runtime/aws-smithy-http/src/body.rs +++ b/rust-runtime/aws-smithy-http/src/body.rs @@ -4,18 +4,17 @@ */ use bytes::Bytes; +use futures::Stream; use http::{HeaderMap, HeaderValue}; use http_body::{Body, SizeHint}; use pin_project_lite::pin_project; + use std::error::Error as StdError; use std::fmt::{self, Debug, Formatter}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::callback::BodyCallback; -use crate::header::append_merge_header_maps; - pub type Error = Box; pin_project! { @@ -35,9 +34,6 @@ pin_project! { // In the event of retry, this function will be called to generate a new body. See // [`try_clone()`](SdkBody::try_clone) rebuild: Option Inner) + Send + Sync>>, - // A list of callbacks that will be called at various points of this `SdkBody`'s lifecycle - #[pin] - callbacks: Vec>, } } @@ -96,7 +92,6 @@ impl SdkBody { Self { inner: Inner::Dyn { inner: body }, rebuild: None, - callbacks: Vec::new(), } } @@ -113,7 +108,6 @@ impl SdkBody { SdkBody { inner: initial.inner, rebuild: Some(Arc::new(move || f().inner)), - callbacks: Vec::new(), } } @@ -121,7 +115,6 @@ impl SdkBody { Self { inner: Inner::Taken, rebuild: None, - callbacks: Vec::new(), } } @@ -129,7 +122,6 @@ impl SdkBody { Self { inner: Inner::Once { inner: None }, rebuild: Some(Arc::new(|| Inner::Once { inner: None })), - callbacks: Vec::new(), } } @@ -137,7 +129,7 @@ impl SdkBody { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - let mut this = self.project(); + let this = self.project(); let polling_result = match this.inner.project() { InnerProj::Once { ref mut inner } => { let data = inner.take(); @@ -154,26 +146,6 @@ impl SdkBody { } }; - match &polling_result { - // When we get some bytes back from polling, pass those bytes to each callback in turn - Poll::Ready(Some(Ok(bytes))) => { - for callback in this.callbacks.iter_mut() { - // Callbacks can run into errors when reading bytes. They'll be surfaced here - callback.update(bytes)?; - } - } - // When we're done polling for bytes, run each callback's `trailers()` method. If any calls to - // `trailers()` return an error, propagate that error up. Otherwise, continue. - Poll::Ready(None) => { - for callback_result in this.callbacks.iter().map(BodyCallback::trailers) { - if let Err(e) = callback_result { - return Poll::Ready(Some(Err(e))); - } - } - } - _ => (), - } - polling_result } @@ -192,12 +164,9 @@ impl SdkBody { pub fn try_clone(&self) -> Option { self.rebuild.as_ref().map(|rebuild| { let next = rebuild(); - let callbacks = self.callbacks.iter().map(BodyCallback::make_new).collect(); - Self { inner: next, rebuild: self.rebuild.clone(), - callbacks, } }) } @@ -206,11 +175,6 @@ impl SdkBody { http_body::Body::size_hint(self).exact() } - pub fn with_callback(&mut self, callback: Box) -> &mut Self { - self.callbacks.push(callback); - self - } - pub fn map(self, f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static) -> SdkBody { if self.rebuild.is_some() { SdkBody::retryable(move || f(self.try_clone().unwrap())) @@ -235,7 +199,6 @@ impl From for SdkBody { rebuild: Some(Arc::new(move || Inner::Once { inner: Some(bytes.clone()), })), - callbacks: Vec::new(), } } } @@ -245,7 +208,6 @@ impl From for SdkBody { SdkBody { inner: Inner::Streaming { inner: body }, rebuild: None, - callbacks: Vec::new(), } } } @@ -283,38 +245,17 @@ impl http_body::Body for SdkBody { self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>, Self::Error>> { - let mut header_map = None; - // Iterate over all callbacks, checking each for any `HeaderMap`s - for callback in &self.callbacks { - match callback.trailers() { - // If this is the first `HeaderMap` we've encountered, save it - Ok(Some(right_header_map)) if header_map.is_none() => { - header_map = Some(right_header_map); - } - // If this is **not** the first `HeaderMap` we've encountered, merge it - Ok(Some(right_header_map)) if header_map.is_some() => { - header_map = Some(append_merge_header_maps( - header_map.unwrap(), - right_header_map, - )); - } - // Early return if a callback encountered an error. - Err(e) => { - return Poll::Ready(Err(e)); - } - // Otherwise, continue on to the next iteration of the loop. - _ => continue, - } - } - Poll::Ready(Ok(header_map)) + // `SdkBody`s have no trailers. If returning trailers is necessary, + // use `SdkBody::map` to wrap this body with another body that does return trailers. + Poll::Ready(Ok(None)) } fn is_end_stream(&self) -> bool { match &self.inner { Inner::Once { inner: None } => true, Inner::Once { inner: Some(bytes) } => bytes.is_empty(), - Inner::Streaming { inner: hyper_body } => hyper_body.is_end_stream(), - Inner::Dyn { inner: box_body } => box_body.is_end_stream(), + Inner::Streaming { inner } => inner.is_end_stream(), + Inner::Dyn { inner } => inner.is_end_stream(), Inner::Taken => true, } } @@ -323,13 +264,47 @@ impl http_body::Body for SdkBody { match &self.inner { Inner::Once { inner: None } => SizeHint::with_exact(0), Inner::Once { inner: Some(bytes) } => SizeHint::with_exact(bytes.len() as u64), - Inner::Streaming { inner: hyper_body } => hyper_body.size_hint(), - Inner::Dyn { inner: box_body } => box_body.size_hint(), + Inner::Streaming { inner } => http_body::Body::size_hint(inner), + Inner::Dyn { inner } => http_body::Body::size_hint(inner), Inner::Taken => SizeHint::new(), } } } +const SIZE_HINT_32_BIT_PANIC_MESSAGE: &str = r#" +You're running a 32-bit system and this stream's length is too large to be represented with a usize. +Please limit stream length to less than 4.294Gb or run this program on a 64-bit computer architecture. +"#; + +impl Stream for SdkBody { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // By converting to IO errors here, we get access to a bunch of `futures` extension traits. + // We get access to extension traits from `tokio` too! + // + // Convert an `SdkBody` into a `tokio::io::AsyncRead` like so: + // let body = tokio_util::io::StreamReader::new(SdkBody::from(input)); + // + // Convert an `SdkBody` into a `futures::io::AsyncRead` like so: + // let body = SdkBody::from(input).into_async_read(); + self.poll_data(cx).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + } + + fn size_hint(&self) -> (usize, Option) { + let size_hint = http_body::Body::size_hint(self); + match ( + size_hint.lower().try_into(), + size_hint.upper().map(TryInto::try_into).transpose(), + ) { + (Ok(lower), Ok(upper)) => (lower, upper), + (Err(_), _) | (_, Err(_)) => { + panic!("{}", SIZE_HINT_32_BIT_PANIC_MESSAGE) + } + } + } +} + #[cfg(test)] mod test { use crate::body::{BoxBody, SdkBody}; @@ -395,4 +370,36 @@ mod test { fn is_send() {} is_send::() } + + + #[cfg(feature = "rt-tokio")] + #[tokio::test] + async fn sdk_body_can_be_tokio_async_read() { + use tokio::io::AsyncBufReadExt; + + let body = SdkBody::from("data 1\ndata 2\ndata 3\n"); + let body = tokio_util::io::StreamReader::new(body); + let async_buf_read = tokio::io::BufReader::new(body); + + let mut lines = async_buf_read.lines(); + + assert_eq!(lines.next_line().await.unwrap(), Some("data 1".to_owned())); + assert_eq!(lines.next_line().await.unwrap(), Some("data 2".to_owned())); + assert_eq!(lines.next_line().await.unwrap(), Some("data 3".to_owned())); + assert_eq!(lines.next_line().await.unwrap(), None); + } + + #[cfg(feature = "rt-tokio")] + #[tokio::test] + async fn sdk_body_can_be_futures_async_read() { + use futures::AsyncReadExt; + use futures::stream::TryStreamExt; + + let input: &[u8] = b"This is a test body."; + let mut body = SdkBody::from(input).into_async_read(); + let mut output = [0u8; 20]; + let _ = body.read(&mut output).await.unwrap(); + + assert_eq!(&output, input); + } } diff --git a/rust-runtime/aws-smithy-http/src/byte_stream.rs b/rust-runtime/aws-smithy-http/src/byte_stream.rs index 6c58a59119..e79b8e9b0e 100644 --- a/rust-runtime/aws-smithy-http/src/byte_stream.rs +++ b/rust-runtime/aws-smithy-http/src/byte_stream.rs @@ -121,12 +121,13 @@ //! ``` use crate::body::SdkBody; -use crate::callback::BodyCallback; + use bytes::Buf; use bytes::Bytes; use bytes_utils::SegmentedBuf; -use http_body::Body; +use futures::Stream; use pin_project_lite::pin_project; + use std::error::Error as StdError; use std::fmt::{Debug, Formatter}; use std::io::IoSlice; @@ -372,16 +373,8 @@ impl ByteStream { FsBuilder::new().file(file).build().await } - /// Set a callback on this `ByteStream`. The callback's methods will be called at various points - /// throughout this `ByteStream`'s life cycle. See the [`BodyCallback`](BodyCallback) trait for - /// more information. - pub fn with_body_callback(&mut self, body_callback: Box) -> &mut Self { - self.inner.with_body_callback(body_callback); - self - } - #[cfg(feature = "rt-tokio")] - /// Convert this `ByteStream` into a struct that implements [`AsyncRead`](tokio::io::AsyncRead). + /// Convert this `ByteStream` into a struct that implements [`tokio::io::AsyncRead`](tokio::io::AsyncRead). /// /// # Example /// @@ -454,11 +447,7 @@ impl std::fmt::Display for Error { } } -impl StdError for Error { - fn source(&self) -> Option<&(dyn StdError + 'static)> { - Some(self.0.as_ref() as _) - } -} +impl StdError for Error {} impl From for std::io::Error { fn from(err: Error) -> Self { @@ -466,7 +455,7 @@ impl From for std::io::Error { } } -impl futures_core::stream::Stream for ByteStream { +impl Stream for ByteStream { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -542,6 +531,8 @@ impl Inner { where B: http_body::Body, { + use http_body::Body; + let mut output = SegmentedBuf::new(); let body = self.body; crate::pin_mut!(body); @@ -552,19 +543,12 @@ impl Inner { } } -impl Inner { - fn with_body_callback(&mut self, body_callback: Box) -> &mut Self { - self.body.with_callback(body_callback); - self - } -} - const SIZE_HINT_32_BIT_PANIC_MESSAGE: &str = r#" You're running a 32-bit system and this stream's length is too large to be represented with a usize. Please limit stream length to less than 4.294Gb or run this program on a 64-bit computer architecture. "#; -impl futures_core::stream::Stream for Inner +impl Stream for Inner where B: http_body::Body, { @@ -576,10 +560,10 @@ where fn size_hint(&self) -> (usize, Option) { let size_hint = http_body::Body::size_hint(&self.body); - let lower = size_hint.lower().try_into(); - let upper = size_hint.upper().map(|u| u.try_into()).transpose(); - - match (lower, upper) { + match ( + size_hint.lower().try_into(), + size_hint.upper().map(TryInto::try_into).transpose(), + ) { (Ok(lower), Ok(upper)) => (lower, upper), (Err(_), _) | (_, Err(_)) => { panic!("{}", SIZE_HINT_32_BIT_PANIC_MESSAGE) diff --git a/rust-runtime/aws-smithy-http/src/byte_stream/bytestream_util.rs b/rust-runtime/aws-smithy-http/src/byte_stream/bytestream_util.rs index 00b445c571..b25df4fd0b 100644 --- a/rust-runtime/aws-smithy-http/src/byte_stream/bytestream_util.rs +++ b/rust-runtime/aws-smithy-http/src/byte_stream/bytestream_util.rs @@ -4,7 +4,7 @@ */ use bytes::Bytes; -use futures_core::ready; +use futures::ready; use http::HeaderMap; use http_body::{Body, SizeHint}; use std::future::Future; @@ -285,7 +285,7 @@ impl Body for PathBody { }; } State::Loaded(ref mut stream) => { - use futures_core::Stream; + use futures::Stream; return match ready!(Pin::new(stream).poll_next(cx)) { Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))), None => Poll::Ready(None), diff --git a/rust-runtime/aws-smithy-http/src/callback.rs b/rust-runtime/aws-smithy-http/src/callback.rs deleted file mode 100644 index 253e731c33..0000000000 --- a/rust-runtime/aws-smithy-http/src/callback.rs +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -//! A module for traits that define callbacks that will be called at specific points in an HTTP request's lifecycle. - -use http::{HeaderMap, HeaderValue}; - -type BoxError = Box; - -/// A callback that, when inserted into a request body, will be called for corresponding lifecycle events. -pub trait BodyCallback: Send + Sync { - /// This lifecycle function is called for each chunk **successfully** read. If an error occurs while reading a chunk, - /// this method will not be called. This method takes `&mut self` so that implementors may modify an implementing - /// struct/enum's internal state. Implementors may return an error. - fn update(&mut self, bytes: &[u8]) -> Result<(), BoxError> { - // "Use" bytes so that the compiler won't complain. - let _ = bytes; - Ok(()) - } - - /// This callback is called once all chunks have been read. If the callback encountered one or more errors - /// while running `update`s, this is how those errors are raised. Implementors may return a [`HeaderMap`][HeaderMap] - /// that will be appended to the HTTP body as a trailer. This is only useful to do for streaming requests. - fn trailers(&self) -> Result>, BoxError> { - Ok(None) - } - - /// Create a new `BodyCallback` from an existing one. This is called when a `BodyCallback` needs to be - /// re-initialized with default state. For example: when a request has a body that needs to be - /// rebuilt, all callbacks for that body need to be run again but with a fresh internal state. - fn make_new(&self) -> Box; -} - -impl BodyCallback for Box { - fn update(&mut self, bytes: &[u8]) -> Result<(), BoxError> { - self.as_mut().update(bytes) - } - fn trailers(&self) -> Result>, BoxError> { - self.as_ref().trailers() - } - fn make_new(&self) -> Box { - self.as_ref().make_new() - } -} - -#[cfg(test)] -mod tests { - use super::{BodyCallback, BoxError}; - use crate::body::SdkBody; - use crate::byte_stream::ByteStream; - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::sync::Arc; - - #[tracing_test::traced_test] - #[tokio::test] - async fn callbacks_are_called_for_update() { - struct CallbackA; - struct CallbackB; - - impl BodyCallback for CallbackA { - fn update(&mut self, _bytes: &[u8]) -> Result<(), BoxError> { - tracing::debug!("callback A was called"); - - Ok(()) - } - - fn make_new(&self) -> Box { - Box::new(Self) - } - } - - impl BodyCallback for CallbackB { - fn update(&mut self, _bytes: &[u8]) -> Result<(), BoxError> { - tracing::debug!("callback B was called"); - - Ok(()) - } - - fn make_new(&self) -> Box { - Box::new(Self) - } - } - - let mut body = SdkBody::from("test"); - body.with_callback(Box::new(CallbackA)) - .with_callback(Box::new(CallbackB)); - - let body = ByteStream::from(body).collect().await.unwrap().into_bytes(); - let body = std::str::from_utf8(&body).unwrap(); - - // Assert that the body that went in is the same as the body coming out. - assert_eq!(body, "test"); - - // Test that all callbacks were called. - assert!(logs_contain("callback A was called")); - assert!(logs_contain("callback B was called")); - } - - struct TestCallback { - times_called: Arc, - } - - impl BodyCallback for TestCallback { - fn update(&mut self, _bytes: &[u8]) -> Result<(), BoxError> { - self.times_called.fetch_add(1, Ordering::SeqCst); - - Ok(()) - } - - fn make_new(&self) -> Box { - Box::new(Self { - times_called: Arc::new(AtomicUsize::new(0)), - }) - } - } - - #[tokio::test] - async fn callback_for_buffered_body_is_called_once() { - let times_called = Arc::new(AtomicUsize::new(0)); - let test_text: String = (0..=1000) - .into_iter() - .map(|n| format!("line {}\n", n)) - .collect(); - - { - let mut body = SdkBody::from(test_text); - let callback = TestCallback { - times_called: times_called.clone(), - }; - body.with_callback(Box::new(callback)); - let _body = ByteStream::new(body).collect().await.unwrap().into_bytes(); - } - - let times_called = Arc::try_unwrap(times_called).unwrap(); - let times_called = times_called.into_inner(); - - // Callback only gets called once because it's not a streaming body - assert_eq!(times_called, 1); - } - - #[tracing_test::traced_test] - #[tokio::test] - async fn callback_for_streaming_body_is_called_per_chunk() { - // Include a large body of text for testing - let times_called = Arc::new(AtomicUsize::new(0)); - - { - let test_stream = tokio_stream::iter( - (1..=1000) - .into_iter() - .map(|n| -> Result { Ok(format!("line {}\n", n)) }), - ); - let mut body = SdkBody::from(hyper::body::Body::wrap_stream(test_stream)); - tracing::trace!("{:?}", body); - assert!(logs_contain("Streaming(Body(Streaming))")); - - let callback = TestCallback { - times_called: times_called.clone(), - }; - body.with_callback(Box::new(callback)); - let _body = ByteStream::new(body).collect().await.unwrap().into_bytes(); - } - - let times_called = Arc::try_unwrap(times_called).unwrap(); - let times_called = times_called.into_inner(); - - // Callback is called once per chunk - assert_eq!(times_called, 1000); - } -} diff --git a/rust-runtime/aws-smithy-http/src/event_stream.rs b/rust-runtime/aws-smithy-http/src/event_stream.rs index ae6176cbd2..843bf60924 100644 --- a/rust-runtime/aws-smithy-http/src/event_stream.rs +++ b/rust-runtime/aws-smithy-http/src/event_stream.rs @@ -5,12 +5,10 @@ //! Provides Sender/Receiver implementations for Event Stream codegen. -use std::error::Error as StdError; - mod receiver; mod sender; -pub type BoxError = Box; +pub use crate::BoxError; #[doc(inline)] pub use sender::{EventStreamSender, MessageStreamAdapter, MessageStreamError}; diff --git a/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs b/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs index ee728d4b14..be1941560f 100644 --- a/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs +++ b/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs @@ -333,7 +333,7 @@ mod tests { async fn receive_success() { let chunks: Vec> = vec![Ok(encode_message("one")), Ok(encode_message("two"))]; - let chunk_stream = futures_util::stream::iter(chunks); + let chunk_stream = futures::stream::iter(chunks); let body = SdkBody::from(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); assert_eq!( @@ -354,7 +354,7 @@ mod tests { Ok(encode_message("two")), Ok(Bytes::from_static(&[])), ]; - let chunk_stream = futures_util::stream::iter(chunks); + let chunk_stream = futures::stream::iter(chunks); let body = SdkBody::from(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); assert_eq!( @@ -375,7 +375,7 @@ mod tests { Ok(encode_message("two")), Ok(encode_message("three").split_to(10)), ]; - let chunk_stream = futures_util::stream::iter(chunks); + let chunk_stream = futures::stream::iter(chunks); let body = SdkBody::from(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); assert_eq!( @@ -401,7 +401,7 @@ mod tests { [encode_message("three"), encode_message("four")].concat(), )), ]; - let chunk_stream = futures_util::stream::iter(chunks); + let chunk_stream = futures::stream::iter(chunks); let body = SdkBody::from(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); assert_eq!( @@ -454,7 +454,7 @@ mod tests { Ok(Bytes::copy_from_slice(&combined[boundary2..end])), ]; - let chunk_stream = futures_util::stream::iter(chunks); + let chunk_stream = futures::stream::iter(chunks); let body = SdkBody::from(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); for payload in &["one", "two", "three", "four", "five", "six", "seven", "eight"] { @@ -474,7 +474,7 @@ mod tests { Ok(encode_message("one")), Err(IOError::new(ErrorKind::ConnectionReset, FakeError)), ]; - let chunk_stream = futures_util::stream::iter(chunks); + let chunk_stream = futures::stream::iter(chunks); let body = SdkBody::from(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); assert_eq!( @@ -495,7 +495,7 @@ mod tests { // for the MessageFrameDecoder to actually start parsing it. Ok(Bytes::from_static(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ]; - let chunk_stream = futures_util::stream::iter(chunks); + let chunk_stream = futures::stream::iter(chunks); let body = SdkBody::from(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); assert_eq!( @@ -512,7 +512,7 @@ mod tests { async fn receive_initial_response() { let chunks: Vec> = vec![Ok(encode_initial_response()), Ok(encode_message("one"))]; - let chunk_stream = futures_util::stream::iter(chunks); + let chunk_stream = futures::stream::iter(chunks); let body = SdkBody::from(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); assert!(receiver.try_recv_initial().await.unwrap().is_some()); @@ -526,7 +526,7 @@ mod tests { async fn receive_no_initial_response() { let chunks: Vec> = vec![Ok(encode_message("one")), Ok(encode_message("two"))]; - let chunk_stream = futures_util::stream::iter(chunks); + let chunk_stream = futures::stream::iter(chunks); let body = SdkBody::from(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); assert!(receiver.try_recv_initial().await.unwrap().is_none()); diff --git a/rust-runtime/aws-smithy-http/src/event_stream/sender.rs b/rust-runtime/aws-smithy-http/src/event_stream/sender.rs index e1b3ce9d4f..87079868a2 100644 --- a/rust-runtime/aws-smithy-http/src/event_stream/sender.rs +++ b/rust-runtime/aws-smithy-http/src/event_stream/sender.rs @@ -6,7 +6,7 @@ use crate::result::SdkError; use aws_smithy_eventstream::frame::{MarshallMessage, SignMessage}; use bytes::Bytes; -use futures_core::Stream; +use futures::Stream; use std::error::Error as StdError; use std::fmt; use std::fmt::Debug; @@ -184,8 +184,7 @@ mod tests { Header, HeaderValue, Message, NoOpSigner, SignMessage, SignMessageError, }; use bytes::Bytes; - use futures_core::Stream; - use futures_util::stream::StreamExt; + use futures::{Stream, StreamExt}; use std::error::Error as StdError; #[derive(Debug)] diff --git a/rust-runtime/aws-smithy-http/src/lib.rs b/rust-runtime/aws-smithy-http/src/lib.rs index 2b7e6b9c0e..9601c0d217 100644 --- a/rust-runtime/aws-smithy-http/src/lib.rs +++ b/rust-runtime/aws-smithy-http/src/lib.rs @@ -18,7 +18,6 @@ #![cfg_attr(docsrs, feature(doc_cfg))] pub mod body; -pub mod callback; pub mod endpoint; pub mod header; pub mod http_versions; @@ -38,3 +37,5 @@ pub mod byte_stream; mod pin_util; mod urlencode; + +pub type BoxError = Box; diff --git a/rust-runtime/aws-smithy-http/src/operation.rs b/rust-runtime/aws-smithy-http/src/operation.rs index 7fc7ce5174..58a114bdc9 100644 --- a/rust-runtime/aws-smithy-http/src/operation.rs +++ b/rust-runtime/aws-smithy-http/src/operation.rs @@ -104,10 +104,10 @@ impl Display for BuildError { BuildError::MissingField { field, details } => { write!(f, "{} was missing. {}", field, details) } - BuildError::SerializationError(inner) => { - write!(f, "failed to serialize input: {}", inner) + BuildError::SerializationError(_) => { + write!(f, "failed to serialize input") } - BuildError::Other(inner) => write!(f, "error during request construction: {}", inner), + BuildError::Other(_) => write!(f, "unexpected error during request construction"), BuildError::InvalidUri { uri, err, message } => { write!( f, @@ -122,7 +122,7 @@ impl Display for BuildError { impl Error for BuildError { fn source(&self) -> Option<&(dyn Error + 'static)> { match self { - BuildError::SerializationError(inner) => Some(inner as _), + BuildError::SerializationError(inner) => Some(inner), BuildError::Other(inner) => Some(inner.as_ref()), _ => None, } diff --git a/rust-runtime/aws-smithy-http/src/result.rs b/rust-runtime/aws-smithy-http/src/result.rs index 04a03769a5..b695f06e17 100644 --- a/rust-runtime/aws-smithy-http/src/result.rs +++ b/rust-runtime/aws-smithy-http/src/result.rs @@ -43,7 +43,7 @@ pub enum SdkError { /// have been sent. DispatchFailure(ConnectorError), - /// A response was received but it was not parseable according the the protocol (for example + /// A response was received but it was not parseable according to the protocol (for example /// the server hung up while the body was being read) ResponseError { /// Error encountered while parsing the response @@ -79,11 +79,7 @@ impl Display for ConnectorError { } } -impl Error for ConnectorError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - Some(self.err.as_ref()) - } -} +impl Error for ConnectorError {} impl ConnectorError { /// Construct a [`ConnectorError`] from an error caused by a timeout @@ -167,8 +163,7 @@ impl Display for ConnectorErrorKind { ConnectorErrorKind::Timeout => write!(f, "timeout"), ConnectorErrorKind::User => write!(f, "user error"), ConnectorErrorKind::Io => write!(f, "io error"), - ConnectorErrorKind::Other(Some(kind)) => write!(f, "{:?}", kind), - ConnectorErrorKind::Other(None) => write!(f, "other"), + ConnectorErrorKind::Other(_) => write!(f, "unclassified error"), } } } @@ -178,12 +173,16 @@ where E: Error, { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + use SdkError::*; match self { - SdkError::ConstructionFailure(err) => write!(f, "failed to construct request: {}", err), - SdkError::TimeoutError(err) => write!(f, "request has timed out: {}", err), - SdkError::DispatchFailure(err) => Display::fmt(&err, f), - SdkError::ResponseError { err, .. } => Display::fmt(&err, f), - SdkError::ServiceError { err, .. } => Display::fmt(&err, f), + ConstructionFailure(_) => write!(f, "failed to construct request"), + TimeoutError(_) => write!(f, "request has timed out"), + DispatchFailure(_) => write!(f, "failed to dispatch the request"), + ResponseError { .. } => write!( + f, + "received response could not be parsed according to the expected protocol" + ), + ServiceError { .. } => write!(f, "service returned an error"), } } } diff --git a/tools/echo-server/Cargo.toml b/tools/echo-server/Cargo.toml index f11225b64c..7504b991f2 100644 --- a/tools/echo-server/Cargo.toml +++ b/tools/echo-server/Cargo.toml @@ -11,4 +11,4 @@ tokio = { version = "1.20.1", features = ["full"] } tracing = "0.1" tracing-subscriber = { version = "0.3.15", features = ["env-filter"] } tower = { version = "0.4", features = ["util", "filter"] } -hyper = { version = "0.14.12", features = ["full"] } +hyper = { version = "0.14.20", features = ["full"] }