diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index 992ae6662cdb..7e5124583446 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -45,24 +45,28 @@ walkdir = { version = "2", optional = true } # Cloud storage support base64 = { version = "0.22", default-features = false, features = ["std"], optional = true } +form_urlencoded = { version = "1.2", optional = true } +http = { version = "1.2.0", optional = true } +http-body-util = { version = "0.1", optional = true } +httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true } hyper = { version = "1.2", default-features = false, optional = true } +md-5 = { version = "0.10.6", default-features = false, optional = true } quick-xml = { version = "0.37.0", features = ["serialize", "overlapped-lists"], optional = true } -serde = { version = "1.0", default-features = false, features = ["derive"], optional = true } -serde_json = { version = "1.0", default-features = false, optional = true } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true } reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"], optional = true } ring = { version = "0.17", default-features = false, features = ["std"], optional = true } rustls-pemfile = { version = "2.0", default-features = false, features = ["std"], optional = true } +serde = { version = "1.0", default-features = false, features = ["derive"], optional = true } +serde_json = { version = "1.0", default-features = false, features = ["std"], optional = true } +serde_urlencoded = { version = "0.7", optional = true } tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-util"] } -md-5 = { version = "0.10.6", default-features = false, optional = true } -httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true } [target.'cfg(target_family="unix")'.dev-dependencies] nix = { version = "0.29.0", features = ["fs"] } [features] default = ["fs"] -cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"] +cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/stream", "chrono/serde", "base64", "rand", "ring", "dep:http", "http-body-util", "form_urlencoded", "serde_urlencoded"] azure = ["cloud", "httparse"] fs = ["walkdir"] gcp = ["cloud", "rustls-pemfile"] @@ -72,16 +76,13 @@ tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"] integration = [] [dev-dependencies] # In alphabetical order -futures-test = "0.3" hyper = { version = "1.2", features = ["server"] } hyper-util = "0.1" -http-body-util = "0.1" rand = "0.8" tempfile = "3.1.0" regex = "1.11.1" # The "gzip" feature for reqwest is enabled for an integration test. reqwest = { version = "0.12", features = ["gzip"] } -http = "1.1.0" [[test]] name = "get_range_file" diff --git a/object_store/src/aws/builder.rs b/object_store/src/aws/builder.rs index 29b2eefa8021..5e3d32e8bc38 100644 --- a/object_store/src/aws/builder.rs +++ b/object_store/src/aws/builder.rs @@ -23,7 +23,7 @@ use crate::aws::{ AmazonS3, AwsCredential, AwsCredentialProvider, Checksum, S3ConditionalPut, S3CopyIfNotExists, STORE, }; -use crate::client::TokenCredentialProvider; +use crate::client::{HttpConnector, ReqwestConnector, TokenCredentialProvider}; use crate::config::ConfigValue; use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider}; use base64::prelude::BASE64_STANDARD; @@ -171,6 +171,8 @@ pub struct AmazonS3Builder { encryption_customer_key_base64: Option, /// When set to true, charge requester for bucket operations request_payer: ConfigValue, + /// The [`HttpConnector`] to use + http_connector: Option>, } /// Configuration keys for [`AmazonS3Builder`] @@ -882,6 +884,12 @@ impl AmazonS3Builder { self } + /// Overrides the [`HttpConnector`], by default uses [`ReqwestConnector`] + pub fn with_http_connector(mut self, connector: C) -> Self { + self.http_connector = Some(Arc::new(connector)); + self + } + /// Create a [`AmazonS3`] instance from the provided values, /// consuming `self`. pub fn build(mut self) -> Result { @@ -889,6 +897,10 @@ impl AmazonS3Builder { self.parse_url(&url)?; } + let http = self + .http_connector + .unwrap_or_else(|| Arc::new(ReqwestConnector::default())); + let bucket = self.bucket_name.ok_or(Error::MissingBucketName)?; let region = self.region.unwrap_or_else(|| "us-east-1".to_string()); let checksum = self.checksum_algorithm.map(|x| x.get()).transpose()?; @@ -925,11 +937,7 @@ impl AmazonS3Builder { let endpoint = format!("https://sts.{region}.amazonaws.com"); // Disallow non-HTTPs requests - let client = self - .client_options - .clone() - .with_allow_http(false) - .client()?; + let options = self.client_options.clone().with_allow_http(false); let token = WebIdentityProvider { token_path, @@ -940,16 +948,19 @@ impl AmazonS3Builder { Arc::new(TokenCredentialProvider::new( token, - client, + http.connect(&options)?, self.retry_config.clone(), )) as _ } else if let Some(uri) = self.container_credentials_relative_uri { info!("Using Task credential provider"); + + let options = self.client_options.clone().with_allow_http(true); + Arc::new(TaskCredentialProvider { url: format!("http://169.254.170.2{uri}"), retry: self.retry_config.clone(), // The instance metadata endpoint is access over HTTP - client: self.client_options.clone().with_allow_http(true).client()?, + client: http.connect(&options)?, cache: Default::default(), }) as _ } else { @@ -964,7 +975,7 @@ impl AmazonS3Builder { Arc::new(TokenCredentialProvider::new( token, - self.client_options.metadata_client()?, + http.connect(&self.client_options.metadata_options())?, self.retry_config.clone(), )) as _ }; @@ -986,7 +997,7 @@ impl AmazonS3Builder { region: region.clone(), credentials: Arc::clone(&credentials), }, - self.client_options.client()?, + http.connect(&self.client_options)?, self.retry_config.clone(), ) .with_min_ttl(Duration::from_secs(60)), // Credentials only valid for 5 minutes @@ -1039,7 +1050,8 @@ impl AmazonS3Builder { request_payer: self.request_payer.get()?, }; - let client = Arc::new(S3Client::new(config)?); + let http_client = http.connect(&config.client_options)?; + let client = Arc::new(S3Client::new(config, http_client)); Ok(AmazonS3 { client }) } diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 246f2779dd07..2cf808a9d49b 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -22,6 +22,7 @@ use crate::aws::{ AwsAuthorizer, AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists, COPY_SOURCE_HEADER, STORE, STRICT_PATH_ENCODE_SET, TAGS_HEADER, }; +use crate::client::builder::{HttpRequestBuilder, RequestBuilderError}; use crate::client::get::GetClient; use crate::client::header::{get_etag, HeaderConfig}; use crate::client::header::{get_put_result, get_version}; @@ -31,7 +32,7 @@ use crate::client::s3::{ CompleteMultipartUpload, CompleteMultipartUploadResult, CopyPartResult, InitiateMultipartUploadResult, ListResponse, PartMetadata, }; -use crate::client::GetOptionsExt; +use crate::client::{GetOptionsExt, HttpClient, HttpError, HttpResponse}; use crate::multipart::PartId; use crate::path::DELIMITER; use crate::{ @@ -42,17 +43,15 @@ use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::{Buf, Bytes}; -use hyper::header::{ +use http::header::{ CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LENGTH, CONTENT_TYPE, }; -use hyper::http::HeaderName; -use hyper::{http, HeaderMap}; +use http::{HeaderMap, HeaderName, Method}; use itertools::Itertools; use md5::{Digest, Md5}; use percent_encoding::{utf8_percent_encode, PercentEncode}; use quick_xml::events::{self as xml_events}; -use reqwest::{Client as ReqwestClient, Method, RequestBuilder, Response}; use ring::digest; use ring::digest::Context; use serde::{Deserialize, Serialize}; @@ -67,7 +66,9 @@ const ALGORITHM: &str = "x-amz-checksum-algorithm"; #[derive(Debug, thiserror::Error)] pub(crate) enum Error { #[error("Error performing DeleteObjects request: {}", source)] - DeleteObjectsRequest { source: crate::client::retry::Error }, + DeleteObjectsRequest { + source: crate::client::retry::RetryError, + }, #[error( "DeleteObjects request failed for key {}: {} (code: {})", @@ -82,7 +83,7 @@ pub(crate) enum Error { }, #[error("Error getting DeleteObjects response body: {}", source)] - DeleteObjectsResponse { source: reqwest::Error }, + DeleteObjectsResponse { source: HttpError }, #[error("Got invalid DeleteObjects response: {}", source)] InvalidDeleteObjectsResponse { @@ -90,22 +91,24 @@ pub(crate) enum Error { }, #[error("Error performing list request: {}", source)] - ListRequest { source: crate::client::retry::Error }, + ListRequest { + source: crate::client::retry::RetryError, + }, #[error("Error getting list response body: {}", source)] - ListResponseBody { source: reqwest::Error }, + ListResponseBody { source: HttpError }, #[error("Error getting create multipart response body: {}", source)] - CreateMultipartResponseBody { source: reqwest::Error }, + CreateMultipartResponseBody { source: HttpError }, #[error("Error performing complete multipart request: {}: {}", path, source)] CompleteMultipartRequest { - source: crate::client::retry::Error, + source: crate::client::retry::RetryError, path: String, }, #[error("Error getting complete multipart response body: {}", source)] - CompleteMultipartResponseBody { source: reqwest::Error }, + CompleteMultipartResponseBody { source: HttpError }, #[error("Got invalid list response: {}", source)] InvalidListResponse { source: quick_xml::de::DeError }, @@ -272,7 +275,7 @@ pub enum RequestError { #[error("Retry")] Retry { - source: crate::client::retry::Error, + source: crate::client::retry::RetryError, path: String, }, } @@ -290,7 +293,7 @@ impl From for crate::Error { pub(crate) struct Request<'a> { path: &'a Path, config: &'a S3Config, - builder: RequestBuilder, + builder: HttpRequestBuilder, payload_sha256: Option, payload: Option, use_session_creds: bool, @@ -307,8 +310,8 @@ impl Request<'_> { pub(crate) fn header(self, k: K, v: &str) -> Self where - HeaderName: TryFrom, - >::Error: Into, + K: TryInto, + K::Error: Into, { let builder = self.builder.header(k, v); Self { builder, ..self } @@ -408,7 +411,7 @@ impl Request<'_> { self } - pub(crate) async fn send(self) -> Result { + pub(crate) async fn send(self) -> Result { let credential = match self.use_session_creds { true => self.config.get_session_credential().await?, false => SessionCredential { @@ -446,13 +449,12 @@ impl Request<'_> { #[derive(Debug)] pub(crate) struct S3Client { pub config: S3Config, - pub client: ReqwestClient, + pub client: HttpClient, } impl S3Client { - pub(crate) fn new(config: S3Config) -> Result { - let client = config.client_options.client()?; - Ok(Self { config, client }) + pub(crate) fn new(config: S3Config, client: HttpClient) -> Self { + Self { config, client } } pub(crate) fn request<'a>(&'a self, method: Method, path: &'a Path) -> Request<'a> { @@ -544,6 +546,7 @@ impl S3Client { .send_retry(&self.config.retry_config) .await .map_err(|source| Error::DeleteObjectsRequest { source })? + .into_body() .bytes() .await .map_err(|source| Error::DeleteObjectsResponse { source })?; @@ -641,6 +644,7 @@ impl S3Client { .idempotent(true) .send() .await? + .into_body() .bytes() .await .map_err(|source| Error::CreateMultipartResponseBody { source })?; @@ -683,17 +687,17 @@ impl S3Client { // If SSE-C is used, we must include the encryption headers in every upload request. request = request.with_encryption_headers(); } - let response = request.send().await?; - let checksum_sha256 = response - .headers() + let (parts, body) = request.send().await?.into_parts(); + let checksum_sha256 = parts + .headers .get(SHA256_CHECKSUM) .and_then(|v| v.to_str().ok()) .map(|v| v.to_string()); let e_tag = match is_copy { - false => get_etag(response.headers()).map_err(|source| Error::Metadata { source })?, + false => get_etag(&parts.headers).map_err(|source| Error::Metadata { source })?, true => { - let response = response + let response = body .bytes() .await .map_err(|source| Error::CreateMultipartResponseBody { source })?; @@ -756,7 +760,7 @@ impl S3Client { let request = self .client - .request(Method::POST, url) + .post(url) .query(&[("uploadId", upload_id)]) .body(body) .with_aws_sigv4(credential.authorizer(), None); @@ -781,6 +785,7 @@ impl S3Client { .map_err(|source| Error::Metadata { source })?; let data = response + .into_body() .bytes() .await .map_err(|source| Error::CompleteMultipartResponseBody { source })?; @@ -795,7 +800,7 @@ impl S3Client { } #[cfg(test)] - pub(crate) async fn get_object_tagging(&self, path: &Path) -> Result { + pub(crate) async fn get_object_tagging(&self, path: &Path) -> Result { let credential = self.config.get_session_credential().await?; let url = format!("{}?tagging", self.config.path_url(path)); let response = self @@ -821,7 +826,7 @@ impl GetClient for S3Client { }; /// Make an S3 GET request - async fn get_request(&self, path: &Path, options: GetOptions) -> Result { + async fn get_request(&self, path: &Path, options: GetOptions) -> Result { let credential = self.config.get_session_credential().await?; let url = self.config.path_url(path); let method = match options.head { @@ -895,6 +900,7 @@ impl ListClient for Arc { .send_retry(&self.config.retry_config) .await .map_err(|source| Error::ListRequest { source })? + .into_body() .bytes() .await .map_err(|source| Error::ListResponseBody { source })?; diff --git a/object_store/src/aws/credential.rs b/object_store/src/aws/credential.rs index 9c74e1c6526a..1b628429e797 100644 --- a/object_store/src/aws/credential.rs +++ b/object_store/src/aws/credential.rs @@ -16,18 +16,18 @@ // under the License. use crate::aws::{AwsCredentialProvider, STORE, STRICT_ENCODE_SET, STRICT_PATH_ENCODE_SET}; +use crate::client::builder::HttpRequestBuilder; use crate::client::retry::RetryExt; use crate::client::token::{TemporaryToken, TokenCache}; -use crate::client::TokenProvider; +use crate::client::{HttpClient, HttpError, HttpRequest, TokenProvider}; use crate::util::{hex_digest, hex_encode, hmac_sha256}; use crate::{CredentialProvider, Result, RetryConfig}; use async_trait::async_trait; use bytes::Buf; use chrono::{DateTime, Utc}; -use hyper::header::HeaderName; +use http::header::{HeaderMap, HeaderName, HeaderValue, AUTHORIZATION}; +use http::{Method, StatusCode}; use percent_encoding::utf8_percent_encode; -use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION}; -use reqwest::{Client, Method, Request, RequestBuilder, StatusCode}; use serde::Deserialize; use std::collections::BTreeMap; use std::sync::Arc; @@ -39,10 +39,12 @@ use url::Url; #[allow(clippy::enum_variant_names)] enum Error { #[error("Error performing CreateSession request: {source}")] - CreateSessionRequest { source: crate::client::retry::Error }, + CreateSessionRequest { + source: crate::client::retry::RetryError, + }, #[error("Error getting CreateSession response: {source}")] - CreateSessionResponse { source: reqwest::Error }, + CreateSessionResponse { source: HttpError }, #[error("Invalid CreateSessionOutput response: {source}")] CreateSessionOutput { source: quick_xml::DeError }, @@ -89,7 +91,7 @@ impl AwsCredential { } } -/// Authorize a [`Request`] with an [`AwsCredential`] using [AWS SigV4] +/// Authorize a [`HttpRequest`] with an [`AwsCredential`] using [AWS SigV4] /// /// [AWS SigV4]: https://docs.aws.amazon.com/general/latest/gr/sigv4-calculate-signature.html #[derive(Debug)] @@ -158,14 +160,16 @@ impl<'a> AwsAuthorizer<'a> { /// * Otherwise it is set to the hex encoded SHA256 of the request body /// /// [AWS SigV4]: https://docs.aws.amazon.com/IAM/latest/UserGuide/create-signed-request.html - pub fn authorize(&self, request: &mut Request, pre_calculated_digest: Option<&[u8]>) { + pub fn authorize(&self, request: &mut HttpRequest, pre_calculated_digest: Option<&[u8]>) { + let url = Url::parse(&request.uri().to_string()).unwrap(); + if let Some(ref token) = self.credential.token { let token_val = HeaderValue::from_str(token).unwrap(); let header = self.token_header.as_ref().unwrap_or(&TOKEN_HEADER); request.headers_mut().insert(header, token_val); } - let host = &request.url()[url::Position::BeforeHost..url::Position::AfterPort]; + let host = &url[url::Position::BeforeHost..url::Position::AfterPort]; let host_val = HeaderValue::from_str(host).unwrap(); request.headers_mut().insert("host", host_val); @@ -178,9 +182,9 @@ impl<'a> AwsAuthorizer<'a> { false => UNSIGNED_PAYLOAD.to_string(), true => match pre_calculated_digest { Some(digest) => hex_encode(digest), - None => match request.body() { - None => EMPTY_SHA256_HASH.to_string(), - Some(body) => match body.as_bytes() { + None => match request.body().is_empty() { + true => EMPTY_SHA256_HASH.to_string(), + false => match request.body().as_bytes() { Some(bytes) => hex_digest(bytes), None => STREAMING_PAYLOAD.to_string(), }, @@ -208,7 +212,7 @@ impl<'a> AwsAuthorizer<'a> { date, &scope, request.method(), - request.url(), + &url, &canonical_headers, &signed_headers, &digest, @@ -350,7 +354,7 @@ pub(crate) trait CredentialExt { ) -> Self; } -impl CredentialExt for RequestBuilder { +impl CredentialExt for HttpRequestBuilder { fn with_aws_sigv4( self, authorizer: Option>, @@ -358,7 +362,7 @@ impl CredentialExt for RequestBuilder { ) -> Self { match authorizer { Some(authorizer) => { - let (client, request) = self.build_split(); + let (client, request) = self.into_parts(); let mut request = request.expect("request valid"); authorizer.authorize(&mut request, payload_sha256); @@ -461,7 +465,7 @@ impl TokenProvider for InstanceCredentialProvider { async fn fetch_token( &self, - client: &Client, + client: &HttpClient, retry: &RetryConfig, ) -> Result>> { instance_creds(client, retry, &self.metadata_endpoint, self.imdsv1_fallback) @@ -490,7 +494,7 @@ impl TokenProvider for WebIdentityProvider { async fn fetch_token( &self, - client: &Client, + client: &HttpClient, retry: &RetryConfig, ) -> Result>> { web_identity( @@ -530,7 +534,7 @@ impl From for AwsCredential { /// async fn instance_creds( - client: &Client, + client: &HttpClient, retry_config: &RetryConfig, endpoint: &str, imdsv1_fallback: bool, @@ -549,7 +553,7 @@ async fn instance_creds( .await; let token = match token_result { - Ok(t) => Some(t.text().await?), + Ok(t) => Some(t.into_body().text().await?), Err(e) if imdsv1_fallback && matches!(e.status(), Some(StatusCode::FORBIDDEN)) => { warn!("received 403 from metadata endpoint, falling back to IMDSv1"); None @@ -564,7 +568,12 @@ async fn instance_creds( role_request = role_request.header(AWS_EC2_METADATA_TOKEN_HEADER, token); } - let role = role_request.send_retry(retry_config).await?.text().await?; + let role = role_request + .send_retry(retry_config) + .await? + .into_body() + .text() + .await?; let creds_url = format!("{endpoint}/{CREDENTIALS_PATH}/{role}"); let mut creds_request = client.request(Method::GET, creds_url); @@ -572,7 +581,12 @@ async fn instance_creds( creds_request = creds_request.header(AWS_EC2_METADATA_TOKEN_HEADER, token); } - let creds: InstanceCredentials = creds_request.send_retry(retry_config).await?.json().await?; + let creds: InstanceCredentials = creds_request + .send_retry(retry_config) + .await? + .into_body() + .json() + .await?; let now = Utc::now(); let ttl = (creds.expiration - now).to_std().unwrap_or_default(); @@ -615,7 +629,7 @@ impl From for AwsCredential { /// async fn web_identity( - client: &Client, + client: &HttpClient, retry_config: &RetryConfig, token_path: &str, role_arn: &str, @@ -626,7 +640,7 @@ async fn web_identity( .map_err(|e| format!("Failed to read token file '{token_path}': {e}"))?; let bytes = client - .request(Method::POST, endpoint) + .post(endpoint) .query(&[ ("Action", "AssumeRoleWithWebIdentity"), ("DurationSeconds", "3600"), @@ -640,6 +654,7 @@ async fn web_identity( .sensitive(true) .send() .await? + .into_body() .bytes() .await?; @@ -663,7 +678,7 @@ async fn web_identity( pub(crate) struct TaskCredentialProvider { pub url: String, pub retry: RetryConfig, - pub client: Client, + pub client: HttpClient, pub cache: TokenCache>, } @@ -684,11 +699,17 @@ impl CredentialProvider for TaskCredentialProvider { /// async fn task_credential( - client: &Client, + client: &HttpClient, retry: &RetryConfig, url: &str, ) -> Result>, StdError> { - let creds: InstanceCredentials = client.get(url).send_retry(retry).await?.json().await?; + let creds: InstanceCredentials = client + .get(url) + .send_retry(retry) + .await? + .into_body() + .json() + .await?; let now = Utc::now(); let ttl = (creds.expiration - now).to_std().unwrap_or_default(); @@ -714,7 +735,7 @@ impl TokenProvider for SessionProvider { async fn fetch_token( &self, - client: &Client, + client: &HttpClient, retry: &RetryConfig, ) -> Result>> { let creds = self.credentials.get_credential().await?; @@ -726,6 +747,7 @@ impl TokenProvider for SessionProvider { .send_retry(retry) .await .map_err(|source| Error::CreateSessionRequest { source })? + .into_body() .bytes() .await .map_err(|source| Error::CreateSessionResponse { source })?; @@ -752,14 +774,15 @@ struct CreateSessionOutput { mod tests { use super::*; use crate::client::mock_server::MockServer; - use hyper::Response; + use crate::client::HttpClient; + use http::Response; use reqwest::{Client, Method}; use std::env; // Test generated using https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html #[test] fn test_sign_with_signed_payload() { - let client = Client::new(); + let client = HttpClient::new(Client::new()); // Test credentials from https://docs.aws.amazon.com/AmazonS3/latest/userguide/RESTAuthentication.html let credential = AwsCredential { @@ -780,7 +803,8 @@ mod tests { let mut request = client .request(Method::GET, "https://ec2.amazon.com/") - .build() + .into_parts() + .1 .unwrap(); let signer = AwsAuthorizer { @@ -799,7 +823,7 @@ mod tests { #[test] fn test_sign_with_signed_payload_request_payer() { - let client = Client::new(); + let client = HttpClient::new(Client::new()); // Test credentials from https://docs.aws.amazon.com/AmazonS3/latest/userguide/RESTAuthentication.html let credential = AwsCredential { @@ -820,7 +844,8 @@ mod tests { let mut request = client .request(Method::GET, "https://ec2.amazon.com/") - .build() + .into_parts() + .1 .unwrap(); let signer = AwsAuthorizer { @@ -839,7 +864,7 @@ mod tests { #[test] fn test_sign_with_unsigned_payload() { - let client = Client::new(); + let client = HttpClient::new(Client::new()); // Test credentials from https://docs.aws.amazon.com/AmazonS3/latest/userguide/RESTAuthentication.html let credential = AwsCredential { @@ -860,7 +885,8 @@ mod tests { let mut request = client .request(Method::GET, "https://ec2.amazon.com/") - .build() + .into_parts() + .1 .unwrap(); let authorizer = AwsAuthorizer { @@ -962,7 +988,7 @@ mod tests { #[test] fn test_sign_port() { - let client = Client::new(); + let client = HttpClient::new(Client::new()); let credential = AwsCredential { key_id: "H20ABqCkLZID4rLe".to_string(), @@ -982,7 +1008,8 @@ mod tests { ("list-type", "2"), ("prefix", ""), ]) - .build() + .into_parts() + .1 .unwrap(); let authorizer = AwsAuthorizer { @@ -1008,15 +1035,15 @@ mod tests { // For example https://github.com/aws/amazon-ec2-metadata-mock let endpoint = env::var("EC2_METADATA_ENDPOINT").unwrap(); - let client = Client::new(); + let client = HttpClient::new(Client::new()); let retry_config = RetryConfig::default(); // Verify only allows IMDSv2 - let resp = client + let (client, req) = client .request(Method::GET, format!("{endpoint}/latest/meta-data/ami-id")) - .send() - .await - .unwrap(); + .into_parts(); + + let resp = client.execute(req.unwrap()).await.unwrap(); assert_eq!( resp.status(), @@ -1048,7 +1075,7 @@ mod tests { let token = "TOKEN"; let endpoint = server.url(); - let client = Client::new(); + let client = HttpClient::new(Client::new()); let retry_config = RetryConfig::default(); // Test IMDSv2 diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs index 6283e76c1f87..73380aa653b5 100644 --- a/object_store/src/aws/dynamo.rs +++ b/object_store/src/aws/dynamo.rs @@ -23,7 +23,7 @@ use std::future::Future; use std::time::{Duration, Instant}; use chrono::Utc; -use reqwest::{Response, StatusCode}; +use http::{Method, StatusCode}; use serde::ser::SerializeMap; use serde::{Deserialize, Serialize, Serializer}; @@ -31,8 +31,8 @@ use crate::aws::client::S3Client; use crate::aws::credential::CredentialExt; use crate::aws::{AwsAuthorizer, AwsCredential}; use crate::client::get::GetClientExt; -use crate::client::retry::Error as RetryError; use crate::client::retry::RetryExt; +use crate::client::retry::{RequestError, RetryError}; use crate::path::Path; use crate::{Error, GetOptions, Result}; @@ -317,20 +317,20 @@ impl DynamoCommit { cred: Option<&AwsCredential>, target: &str, req: R, - ) -> Result { + ) -> Result { let region = &s3.config.region; let authorizer = cred.map(|x| AwsAuthorizer::new(x, "dynamodb", region)); let builder = match &s3.config.endpoint { - Some(e) => s3.client.post(e), + Some(e) => s3.client.request(Method::POST, e), None => { let url = format!("https://dynamodb.{region}.amazonaws.com"); - s3.client.post(url) + s3.client.request(Method::POST, url) } }; + // TODO: Timeout builder - .timeout(Duration::from_millis(self.timeout)) .json(&req) .header("X-Amz-Target", target) .with_aws_sigv4(authorizer, None) @@ -383,8 +383,8 @@ async fn check_precondition(client: &S3Client, path: &Path, etag: Option<&str>) /// Parses the error response if any fn parse_error_response(e: &RetryError) -> Option> { - match e { - RetryError::Client { + match e.inner() { + RequestError::Status { status: StatusCode::BAD_REQUEST, body: Some(b), } => serde_json::from_str(b).ok(), @@ -518,6 +518,7 @@ mod number { } } +use crate::client::HttpResponse; /// Re-export integration_test to be called by s3_test #[cfg(test)] pub(crate) use tests::integration_test; diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index b065927b6486..0625ae140b73 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -140,7 +140,7 @@ impl Signer for AmazonS3 { .with_request_payer(self.client.config.request_payer); let path_url = self.path_url(path); - let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic { + let mut url = path_url.parse().map_err(|e| Error::Generic { store: STORE, source: format!("Unable to parse url {path_url}: {e}").into(), })?; @@ -489,7 +489,7 @@ mod tests { use crate::ClientOptions; use base64::prelude::BASE64_STANDARD; use base64::Engine; - use hyper::HeaderMap; + use http::HeaderMap; const NON_EXISTENT_NAME: &str = "nonexistentname"; diff --git a/object_store/src/azure/builder.rs b/object_store/src/azure/builder.rs index f0572ebe6358..ab0a484f5463 100644 --- a/object_store/src/azure/builder.rs +++ b/object_store/src/azure/builder.rs @@ -21,7 +21,7 @@ use crate::azure::credential::{ ImdsManagedIdentityProvider, WorkloadIdentityOAuthProvider, }; use crate::azure::{AzureCredential, AzureCredentialProvider, MicrosoftAzure, STORE}; -use crate::client::TokenCredentialProvider; +use crate::client::{HttpConnector, ReqwestConnector, TokenCredentialProvider}; use crate::config::ConfigValue; use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider}; use percent_encoding::percent_decode_str; @@ -178,6 +178,8 @@ pub struct MicrosoftAzureBuilder { fabric_session_token: Option, /// Fabric cluster identifier fabric_cluster_identifier: Option, + /// The [`HttpConnector`] to use + http_connector: Option>, } /// Configuration keys for [`MicrosoftAzureBuilder`] @@ -887,6 +889,12 @@ impl MicrosoftAzureBuilder { self } + /// Overrides the [`HttpConnector`], by default uses [`ReqwestConnector`] + pub fn with_http_connector(mut self, connector: C) -> Self { + self.http_connector = Some(Arc::new(connector)); + self + } + /// Configure a connection to container with given name on Microsoft Azure Blob store. pub fn build(mut self) -> Result { if let Some(url) = self.url.take() { @@ -899,6 +907,10 @@ impl MicrosoftAzureBuilder { Arc::new(StaticCredentialProvider::new(credential)) }; + let http = self + .http_connector + .unwrap_or_else(|| Arc::new(ReqwestConnector::default())); + let (is_emulator, storage_url, auth, account) = if self.use_emulator.get()? { let account_name = self .account_name @@ -960,7 +972,7 @@ impl MicrosoftAzureBuilder { ); Arc::new(TokenCredentialProvider::new( fabric_credential, - self.client_options.client()?, + http.connect(&self.client_options)?, self.retry_config.clone(), )) as _ } else if let Some(bearer_token) = self.bearer_token { @@ -979,7 +991,7 @@ impl MicrosoftAzureBuilder { ); Arc::new(TokenCredentialProvider::new( client_credential, - self.client_options.client()?, + http.connect(&self.client_options)?, self.retry_config.clone(), )) as _ } else if let (Some(client_id), Some(client_secret), Some(tenant_id)) = @@ -993,7 +1005,7 @@ impl MicrosoftAzureBuilder { ); Arc::new(TokenCredentialProvider::new( client_credential, - self.client_options.client()?, + http.connect(&self.client_options)?, self.retry_config.clone(), )) as _ } else if let Some(query_pairs) = self.sas_query_pairs { @@ -1011,7 +1023,7 @@ impl MicrosoftAzureBuilder { ); Arc::new(TokenCredentialProvider::new( msi_credential, - self.client_options.metadata_client()?, + http.connect(&self.client_options.metadata_options())?, self.retry_config.clone(), )) as _ }; @@ -1030,7 +1042,8 @@ impl MicrosoftAzureBuilder { credentials: auth, }; - let client = Arc::new(AzureClient::new(config)?); + let http_client = http.connect(&config.client_options)?; + let client = Arc::new(AzureClient::new(config, http_client)); Ok(MicrosoftAzure { client }) } diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 2c2e27ea4179..13e40bbf9688 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -18,11 +18,12 @@ use super::credential::AzureCredential; use crate::azure::credential::*; use crate::azure::{AzureCredentialProvider, STORE}; +use crate::client::builder::HttpRequestBuilder; use crate::client::get::GetClient; use crate::client::header::{get_put_result, HeaderConfig}; use crate::client::list::ListClient; use crate::client::retry::RetryExt; -use crate::client::GetOptionsExt; +use crate::client::{GetOptionsExt, HttpClient, HttpError, HttpRequest, HttpResponse}; use crate::multipart::PartId; use crate::path::DELIMITER; use crate::util::{deserialize_rfc1123, GetRange}; @@ -35,12 +36,11 @@ use base64::prelude::{BASE64_STANDARD, BASE64_STANDARD_NO_PAD}; use base64::Engine; use bytes::{Buf, Bytes}; use chrono::{DateTime, Utc}; -use hyper::http::HeaderName; -use rand::Rng as _; -use reqwest::{ +use http::{ header::{HeaderMap, HeaderValue, CONTENT_LENGTH, CONTENT_TYPE, IF_MATCH, IF_NONE_MATCH}, - Client as ReqwestClient, Method, RequestBuilder, Response, + HeaderName, Method, }; +use rand::Rng as _; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; @@ -63,27 +63,29 @@ static TAGS_HEADER: HeaderName = HeaderName::from_static("x-ms-tags"); pub(crate) enum Error { #[error("Error performing get request {}: {}", path, source)] GetRequest { - source: crate::client::retry::Error, + source: crate::client::retry::RetryError, path: String, }, #[error("Error performing put request {}: {}", path, source)] PutRequest { - source: crate::client::retry::Error, + source: crate::client::retry::RetryError, path: String, }, #[error("Error performing delete request {}: {}", path, source)] DeleteRequest { - source: crate::client::retry::Error, + source: crate::client::retry::RetryError, path: String, }, #[error("Error performing bulk delete request: {}", source)] - BulkDeleteRequest { source: crate::client::retry::Error }, + BulkDeleteRequest { + source: crate::client::retry::RetryError, + }, #[error("Error receiving bulk delete request body: {}", source)] - BulkDeleteRequestBody { source: reqwest::Error }, + BulkDeleteRequestBody { source: HttpError }, #[error( "Bulk delete request failed due to invalid input: {} (code: {})", @@ -108,10 +110,12 @@ pub(crate) enum Error { }, #[error("Error performing list request: {}", source)] - ListRequest { source: crate::client::retry::Error }, + ListRequest { + source: crate::client::retry::RetryError, + }, #[error("Error getting list response body: {}", source)] - ListResponseBody { source: reqwest::Error }, + ListResponseBody { source: HttpError }, #[error("Got invalid list response: {}", source)] InvalidListResponse { source: quick_xml::de::DeError }, @@ -125,10 +129,12 @@ pub(crate) enum Error { MissingETag, #[error("Error requesting user delegation key: {}", source)] - DelegationKeyRequest { source: crate::client::retry::Error }, + DelegationKeyRequest { + source: crate::client::retry::RetryError, + }, #[error("Error getting user delegation key response body: {}", source)] - DelegationKeyResponseBody { source: reqwest::Error }, + DelegationKeyResponseBody { source: HttpError }, #[error("Got invalid user delegation key response: {}", source)] DelegationKeyResponse { source: quick_xml::de::DeError }, @@ -194,7 +200,7 @@ struct PutRequest<'a> { path: &'a Path, config: &'a AzureConfig, payload: PutPayload, - builder: RequestBuilder, + builder: HttpRequestBuilder, idempotent: bool, } @@ -251,7 +257,7 @@ impl PutRequest<'_> { Self { builder, ..self } } - async fn send(self) -> Result { + async fn send(self) -> Result { let credential = self.config.get_credential().await?; let sensitive = credential .as_deref() @@ -317,7 +323,7 @@ fn serialize_part_delete_request( dst: &mut Vec, boundary: &str, idx: usize, - request: reqwest::Request, + request: HttpRequest, relative_url: String, ) { // Encode start marker for part @@ -349,7 +355,7 @@ fn serialize_part_delete_request( extend(dst, b"\r\n"); } -fn parse_multipart_response_boundary(response: &Response) -> Result { +fn parse_multipart_response_boundary(response: &HttpResponse) -> Result { let invalid_response = |msg: &str| Error::InvalidBulkDeleteResponse { reason: msg.to_string(), }; @@ -496,14 +502,13 @@ async fn parse_blob_batch_delete_body( #[derive(Debug)] pub(crate) struct AzureClient { config: AzureConfig, - client: ReqwestClient, + client: HttpClient, } impl AzureClient { /// create a new instance of [AzureClient] - pub(crate) fn new(config: AzureConfig) -> Result { - let client = config.client_options.client()?; - Ok(Self { config, client }) + pub(crate) fn new(config: AzureConfig, client: HttpClient) -> Self { + Self { config, client } } /// Returns the config @@ -517,7 +522,7 @@ impl AzureClient { fn put_request<'a>(&'a self, path: &'a Path, payload: PutPayload) -> PutRequest<'a> { let url = self.config.path_url(path); - let builder = self.client.request(Method::PUT, url); + let builder = self.client.request(Method::PUT, url.as_str()); PutRequest { path, @@ -614,7 +619,7 @@ impl AzureClient { .map(|c| c.sensitive_request()) .unwrap_or_default(); self.client - .request(Method::DELETE, url) + .delete(url.as_str()) .query(query) .header(&DELETE_SNAPSHOTS, "include") .with_azure_authorization(&credential, &self.config.account) @@ -644,17 +649,20 @@ impl AzureClient { // Build subrequest with proper authorization let request = self .client - .request(Method::DELETE, url) + .delete(url.as_str()) .header(CONTENT_LENGTH, HeaderValue::from(0)) // Each subrequest must be authorized individually [1] and we use // the CredentialExt for this. // [1]: https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id#request-body .with_azure_authorization(credential, &self.config.account) - .build() + .into_parts() + .1 .unwrap(); + let url: Url = request.uri().to_string().parse().unwrap(); + // Url for part requests must be relative and without base - let relative_url = self.config.service.make_relative(request.url()).unwrap(); + let relative_url = self.config.service.make_relative(&url).unwrap(); serialize_part_delete_request(&mut body_bytes, boundary, idx, request, relative_url) } @@ -684,7 +692,7 @@ impl AzureClient { let url = self.config.path_url(&Path::from("/")); let batch_response = self .client - .request(Method::POST, url) + .post(url.as_str()) .query(&[("restype", "container"), ("comp", "batch")]) .header( CONTENT_TYPE, @@ -701,6 +709,7 @@ impl AzureClient { let boundary = parse_multipart_response_boundary(&batch_response)?; let batch_body = batch_response + .into_body() .bytes() .await .map_err(|source| Error::BulkDeleteRequestBody { source })?; @@ -724,7 +733,7 @@ impl AzureClient { let mut builder = self .client - .request(Method::PUT, url) + .request(Method::PUT, url.as_str()) .header(©_SOURCE, source.to_string()) .header(CONTENT_LENGTH, HeaderValue::from_static("0")); @@ -772,9 +781,10 @@ impl AzureClient { .as_deref() .map(|c| c.sensitive_request()) .unwrap_or_default(); + let response = self .client - .request(Method::POST, url) + .post(url.as_str()) .body(body) .query(&[("restype", "service"), ("comp", "userdelegationkey")]) .with_azure_authorization(&credential, &self.config.account) @@ -784,6 +794,7 @@ impl AzureClient { .send() .await .map_err(|source| Error::DelegationKeyRequest { source })? + .into_body() .bytes() .await .map_err(|source| Error::DelegationKeyResponseBody { source })?; @@ -829,7 +840,7 @@ impl AzureClient { } #[cfg(test)] - pub(crate) async fn get_blob_tagging(&self, path: &Path) -> Result { + pub(crate) async fn get_blob_tagging(&self, path: &Path) -> Result { let credential = self.get_credential().await?; let url = self.config.path_url(path); let sensitive = credential @@ -838,7 +849,7 @@ impl AzureClient { .unwrap_or_default(); let response = self .client - .request(Method::GET, url) + .get(url.as_str()) .query(&[("comp", "tags")]) .with_azure_authorization(&credential, &self.config.account) .retryable(&self.config.retry_config) @@ -868,7 +879,7 @@ impl GetClient for AzureClient { /// Make an Azure GET request /// /// - async fn get_request(&self, path: &Path, options: GetOptions) -> Result { + async fn get_request(&self, path: &Path, options: GetOptions) -> Result { // As of 2024-01-02, Azure does not support suffix requests, // so we should fail fast here rather than sending one if let Some(GetRange::Suffix(_)) = options.range.as_ref() { @@ -886,7 +897,7 @@ impl GetClient for AzureClient { let mut builder = self .client - .request(method, url) + .request(method, url.as_str()) .header(CONTENT_LENGTH, HeaderValue::from_static("0")) .body(Bytes::new()); @@ -961,7 +972,7 @@ impl ListClient for Arc { .unwrap_or_default(); let response = self .client - .request(Method::GET, url) + .get(url.as_str()) .query(&query) .with_azure_authorization(&credential, &self.config.account) .retryable(&self.config.retry_config) @@ -969,6 +980,7 @@ impl ListClient for Arc { .send() .await .map_err(|source| Error::ListRequest { source })? + .into_body() .bytes() .await .map_err(|source| Error::ListResponseBody { source })?; @@ -1147,11 +1159,11 @@ pub(crate) struct UserDelegationKey { #[cfg(test)] mod tests { - use bytes::Bytes; - use regex::bytes::Regex; - use super::*; use crate::StaticCredentialProvider; + use bytes::Bytes; + use regex::bytes::Regex; + use reqwest::Client; #[test] fn deserde_azure() { @@ -1360,7 +1372,7 @@ mod tests { client_options: Default::default(), }; - let client = AzureClient::new(config).unwrap(); + let client = AzureClient::new(config, HttpClient::new(Client::new())); let credential = client.get_credential().await.unwrap(); let paths = &[Path::from("a"), Path::from("b"), Path::from("c")]; @@ -1454,7 +1466,7 @@ RequestId:778fdc83-801e-0000-62ff-0334671e2852 Time:2018-06-14T16:46:54.6040685Z\r --batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed--\r\n"; - let response: reqwest::Response = http::Response::builder() + let response: HttpResponse = http::Response::builder() .status(202) .header("Transfer-Encoding", "chunked") .header( @@ -1463,12 +1475,11 @@ Time:2018-06-14T16:46:54.6040685Z\r ) .header("x-ms-request-id", "778fdc83-801e-0000-62ff-033467000000") .header("x-ms-version", "2018-11-09") - .body(Bytes::from(response_body.as_slice())) - .unwrap() - .into(); + .body(Bytes::from(response_body.as_slice()).into()) + .unwrap(); let boundary = parse_multipart_response_boundary(&response).unwrap(); - let body = response.bytes().await.unwrap(); + let body = response.into_body().bytes().await.unwrap(); let paths = &[Path::from("a"), Path::from("b"), Path::from("c")]; diff --git a/object_store/src/azure/credential.rs b/object_store/src/azure/credential.rs index c9e6ac640b4a..27f8776bcb5f 100644 --- a/object_store/src/azure/credential.rs +++ b/object_store/src/azure/credential.rs @@ -15,22 +15,24 @@ // specific language governing permissions and limitations // under the License. +use super::client::UserDelegationKey; use crate::azure::STORE; +use crate::client::builder::{add_query_pairs, HttpRequestBuilder}; use crate::client::retry::RetryExt; use crate::client::token::{TemporaryToken, TokenCache}; -use crate::client::{CredentialProvider, TokenProvider}; +use crate::client::{CredentialProvider, HttpClient, HttpError, HttpRequest, TokenProvider}; use crate::util::hmac_sha256; use crate::RetryConfig; use async_trait::async_trait; use base64::prelude::{BASE64_STANDARD, BASE64_URL_SAFE_NO_PAD}; use base64::Engine; use chrono::{DateTime, SecondsFormat, Utc}; -use reqwest::header::{ +use http::header::{ HeaderMap, HeaderName, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LENGTH, CONTENT_TYPE, DATE, IF_MATCH, IF_MODIFIED_SINCE, IF_NONE_MATCH, IF_UNMODIFIED_SINCE, RANGE, }; -use reqwest::{Client, Method, Request, RequestBuilder}; +use http::Method; use serde::Deserialize; use std::borrow::Cow; use std::collections::HashMap; @@ -42,8 +44,6 @@ use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; use url::Url; -use super::client::UserDelegationKey; - static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2023-11-03"); static VERSION: HeaderName = HeaderName::from_static("x-ms-version"); pub(crate) static BLOB_TYPE: HeaderName = HeaderName::from_static("x-ms-blob-type"); @@ -73,10 +73,12 @@ const AZURE_STORAGE_RESOURCE: &str = "https://storage.azure.com"; #[derive(Debug, thiserror::Error)] pub enum Error { #[error("Error performing token request: {}", source)] - TokenRequest { source: crate::client::retry::Error }, + TokenRequest { + source: crate::client::retry::RetryError, + }, #[error("Error getting token response body: {}", source)] - TokenResponseBody { source: reqwest::Error }, + TokenResponseBody { source: HttpError }, #[error("Error reading federated token file ")] FederatedTokenFile, @@ -206,7 +208,7 @@ impl AzureSigner { } } -fn add_date_and_version_headers(request: &mut Request) { +fn add_date_and_version_headers(request: &mut HttpRequest) { // rfc2822 string should never contain illegal characters let date = Utc::now(); let date_str = date.format(RFC1123_FMT).to_string(); @@ -218,7 +220,7 @@ fn add_date_and_version_headers(request: &mut Request) { .insert(&VERSION, AZURE_VERSION.clone()); } -/// Authorize a [`Request`] with an [`AzureAuthorizer`] +/// Authorize a [`HttpRequest`] with an [`AzureAuthorizer`] #[derive(Debug)] pub struct AzureAuthorizer<'a> { credential: &'a AzureCredential, @@ -235,14 +237,15 @@ impl<'a> AzureAuthorizer<'a> { } /// Authorize `request` - pub fn authorize(&self, request: &mut Request) { + pub fn authorize(&self, request: &mut HttpRequest) { add_date_and_version_headers(request); match self.credential { AzureCredential::AccessKey(key) => { + let url = Url::parse(&request.uri().to_string()).unwrap(); let signature = generate_authorization( request.headers(), - request.url(), + &url, request.method(), self.account, key, @@ -262,10 +265,7 @@ impl<'a> AzureAuthorizer<'a> { ); } AzureCredential::SASToken(query_pairs) => { - request - .url_mut() - .query_pairs_mut() - .extend_pairs(query_pairs); + add_query_pairs(request.uri_mut(), query_pairs); } } } @@ -281,13 +281,13 @@ pub(crate) trait CredentialExt { ) -> Self; } -impl CredentialExt for RequestBuilder { +impl CredentialExt for HttpRequestBuilder { fn with_azure_authorization( self, credential: &Option>, account: &str, ) -> Self { - let (client, request) = self.build_split(); + let (client, request) = self.into_parts(); let mut request = request.expect("request valid"); match credential.as_deref() { @@ -622,13 +622,13 @@ impl TokenProvider for ClientSecretOAuthProvider { /// Fetch a token async fn fetch_token( &self, - client: &Client, + client: &HttpClient, retry: &RetryConfig, ) -> crate::Result>> { let response: OAuthTokenResponse = client .request(Method::POST, &self.token_url) .header(ACCEPT, HeaderValue::from_static(CONTENT_TYPE_JSON)) - .form(&[ + .form([ ("client_id", self.client_id.as_str()), ("client_secret", self.client_secret.as_str()), ("scope", AZURE_STORAGE_SCOPE), @@ -639,6 +639,7 @@ impl TokenProvider for ClientSecretOAuthProvider { .send() .await .map_err(|source| Error::TokenRequest { source })? + .into_body() .json() .await .map_err(|source| Error::TokenResponseBody { source })?; @@ -712,7 +713,7 @@ impl TokenProvider for ImdsManagedIdentityProvider { /// Fetch a token async fn fetch_token( &self, - client: &Client, + client: &HttpClient, retry: &RetryConfig, ) -> crate::Result>> { let mut query_items = vec![ @@ -747,6 +748,7 @@ impl TokenProvider for ImdsManagedIdentityProvider { .send_retry(retry) .await .map_err(|source| Error::TokenRequest { source })? + .into_body() .json() .await .map_err(|source| Error::TokenResponseBody { source })?; @@ -798,7 +800,7 @@ impl TokenProvider for WorkloadIdentityOAuthProvider { /// Fetch a token async fn fetch_token( &self, - client: &Client, + client: &HttpClient, retry: &RetryConfig, ) -> crate::Result>> { let token_str = std::fs::read_to_string(&self.federated_token_file) @@ -808,7 +810,7 @@ impl TokenProvider for WorkloadIdentityOAuthProvider { let response: OAuthTokenResponse = client .request(Method::POST, &self.token_url) .header(ACCEPT, HeaderValue::from_static(CONTENT_TYPE_JSON)) - .form(&[ + .form([ ("client_id", self.client_id.as_str()), ( "client_assertion_type", @@ -823,6 +825,7 @@ impl TokenProvider for WorkloadIdentityOAuthProvider { .send() .await .map_err(|source| Error::TokenRequest { source })? + .into_body() .json() .await .map_err(|source| Error::TokenResponseBody { source })?; @@ -1009,7 +1012,7 @@ impl TokenProvider for FabricTokenOAuthProvider { /// Fetch a token async fn fetch_token( &self, - client: &Client, + client: &HttpClient, retry: &RetryConfig, ) -> crate::Result>> { if let Some(storage_access_token) = &self.storage_access_token { @@ -1037,6 +1040,7 @@ impl TokenProvider for FabricTokenOAuthProvider { .send() .await .map_err(|source| Error::TokenRequest { source })? + .into_body() .text() .await .map_err(|source| Error::TokenResponseBody { source })?; @@ -1061,8 +1065,8 @@ impl CredentialProvider for AzureCliCredential { #[cfg(test)] mod tests { use futures::executor::block_on; + use http::{Response, StatusCode}; use http_body_util::BodyExt; - use hyper::{Response, StatusCode}; use reqwest::{Client, Method}; use tempfile::NamedTempFile; @@ -1078,7 +1082,7 @@ mod tests { std::env::set_var(MSI_SECRET_ENV_KEY, "env-secret"); let endpoint = server.url(); - let client = Client::new(); + let client = HttpClient::new(Client::new()); let retry_config = RetryConfig::default(); // Test IMDS @@ -1137,7 +1141,7 @@ mod tests { std::fs::write(tokenfile.path(), "federated-token").unwrap(); let endpoint = server.url(); - let client = Client::new(); + let client = HttpClient::new(Client::new()); let retry_config = RetryConfig::default(); // Test IMDS diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index bbecba57177a..b4243dda8bdc 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -58,7 +58,7 @@ const STORE: &str = "MicrosoftAzure"; /// Interface for [Microsoft Azure Blob Storage](https://azure.microsoft.com/en-us/services/storage/blobs/). #[derive(Debug)] pub struct MicrosoftAzure { - client: Arc, + client: Arc, } impl MicrosoftAzure { @@ -68,7 +68,7 @@ impl MicrosoftAzure { } /// Create a full URL to the resource specified by `path` with this instance's configuration. - fn path_url(&self, path: &Path) -> url::Url { + fn path_url(&self, path: &Path) -> Url { self.client.config().path_url(path) } } diff --git a/object_store/src/client/body.rs b/object_store/src/client/body.rs new file mode 100644 index 000000000000..549b3e4daf20 --- /dev/null +++ b/object_store/src/client/body.rs @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::client::connection::{HttpError, HttpErrorKind}; +use crate::{collect_bytes, PutPayload}; +use bytes::Bytes; +use futures::stream::BoxStream; +use futures::StreamExt; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Full}; +use hyper::body::{Body, Frame, SizeHint}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// An HTTP Request +pub type HttpRequest = http::Request; + +/// The [`Body`] of an [`HttpRequest`] +#[derive(Debug, Clone)] +pub struct HttpRequestBody(Inner); + +impl HttpRequestBody { + /// An empty [`HttpRequestBody`] + pub fn empty() -> Self { + Self(Inner::Bytes(Bytes::new())) + } + + pub(crate) fn into_reqwest(self) -> reqwest::Body { + match self.0 { + Inner::Bytes(b) => b.into(), + Inner::PutPayload(_, payload) => reqwest::Body::wrap_stream(futures::stream::iter( + payload.into_iter().map(Ok::<_, HttpError>), + )), + } + } + + /// Returns true if this body is empty + pub fn is_empty(&self) -> bool { + match &self.0 { + Inner::Bytes(x) => x.is_empty(), + Inner::PutPayload(_, x) => x.iter().any(|x| !x.is_empty()), + } + } + + /// Returns the total length of the [`Bytes`] in this body + pub fn content_length(&self) -> usize { + match &self.0 { + Inner::Bytes(x) => x.len(), + Inner::PutPayload(_, x) => x.content_length(), + } + } + + /// If this body consists of a single contiguous [`Bytes`], returns it + pub fn as_bytes(&self) -> Option<&Bytes> { + match &self.0 { + Inner::Bytes(x) => Some(x), + _ => None, + } + } +} + +impl From for HttpRequestBody { + fn from(value: Bytes) -> Self { + Self(Inner::Bytes(value)) + } +} + +impl From> for HttpRequestBody { + fn from(value: Vec) -> Self { + Self(Inner::Bytes(value.into())) + } +} + +impl From for HttpRequestBody { + fn from(value: String) -> Self { + Self(Inner::Bytes(value.into())) + } +} + +impl From for HttpRequestBody { + fn from(value: PutPayload) -> Self { + Self(Inner::PutPayload(0, value)) + } +} + +#[derive(Debug, Clone)] +enum Inner { + Bytes(Bytes), + PutPayload(usize, PutPayload), +} + +impl Body for HttpRequestBody { + type Data = Bytes; + type Error = HttpError; + + fn poll_frame( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + Poll::Ready(match &mut self.0 { + Inner::Bytes(bytes) => { + let out = bytes.split_off(0); + if out.is_empty() { + None + } else { + Some(Ok(Frame::data(out))) + } + } + Inner::PutPayload(offset, payload) => { + let slice = payload.as_ref(); + if *offset == slice.len() { + None + } else { + Some(Ok(Frame::data( + slice[std::mem::replace(offset, *offset + 1)].clone(), + ))) + } + } + }) + } + + fn is_end_stream(&self) -> bool { + match self.0 { + Inner::Bytes(ref bytes) => bytes.is_empty(), + Inner::PutPayload(offset, ref body) => offset == body.as_ref().len(), + } + } + + fn size_hint(&self) -> SizeHint { + match self.0 { + Inner::Bytes(ref bytes) => SizeHint::with_exact(bytes.len() as u64), + Inner::PutPayload(offset, ref payload) => { + let iter = payload.as_ref().iter().skip(offset); + SizeHint::with_exact(iter.map(|x| x.len() as u64).sum()) + } + } + } +} + +/// An HTTP response +pub type HttpResponse = http::Response; + +/// The body of an [`HttpResponse`] +#[derive(Debug)] +pub struct HttpResponseBody(BoxBody); + +impl HttpResponseBody { + /// Create an [`HttpResponseBody`] from the provided [`Body`] + /// + /// Note: [`BodyExt::map_err`] can be used to alter error variants + pub fn new(body: B) -> Self + where + B: Body + Send + Sync + 'static, + { + Self(BoxBody::new(body)) + } + + /// Collects this response into a [`Bytes`] + pub async fn bytes(self) -> Result { + let size_hint = self.0.size_hint().lower(); + let s = self.0.into_data_stream(); + collect_bytes(s, Some(size_hint)).await + } + + /// Returns a stream of this response data + pub fn bytes_stream(self) -> BoxStream<'static, Result> { + self.0.into_data_stream().boxed() + } + + /// Returns the response as a [`String`] + pub(crate) async fn text(self) -> Result { + let b = self.bytes().await?; + String::from_utf8(b.into()).map_err(|e| HttpError::new(HttpErrorKind::Decode, e)) + } + + #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] + pub(crate) async fn json(self) -> Result { + let b = self.bytes().await?; + serde_json::from_slice(&b).map_err(|e| HttpError::new(HttpErrorKind::Decode, e)) + } +} + +impl From for HttpResponseBody { + fn from(value: Bytes) -> Self { + Self::new(Full::new(value).map_err(|e| match e {})) + } +} + +impl From> for HttpResponseBody { + fn from(value: Vec) -> Self { + Bytes::from(value).into() + } +} + +impl From for HttpResponseBody { + fn from(value: String) -> Self { + Bytes::from(value).into() + } +} diff --git a/object_store/src/client/builder.rs b/object_store/src/client/builder.rs new file mode 100644 index 000000000000..0fbc12fd9484 --- /dev/null +++ b/object_store/src/client/builder.rs @@ -0,0 +1,279 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::client::connection::HttpErrorKind; +use crate::client::{HttpClient, HttpError, HttpRequest, HttpRequestBody}; +use http::header::{InvalidHeaderName, InvalidHeaderValue}; +use http::uri::InvalidUri; +use http::{HeaderName, HeaderValue, Method, Uri}; + +#[derive(Debug, thiserror::Error)] +pub(crate) enum RequestBuilderError { + #[error("Invalid URI")] + InvalidUri(#[from] InvalidUri), + + #[error("Invalid Header Value")] + InvalidHeaderValue(#[from] InvalidHeaderValue), + + #[error("Invalid Header Name")] + InvalidHeaderName(#[from] InvalidHeaderName), + + #[error("JSON serialization error")] + SerdeJson(#[from] serde_json::Error), + + #[error("URL serialization error")] + SerdeUrl(#[from] serde_urlencoded::ser::Error), +} + +impl From for HttpError { + fn from(value: RequestBuilderError) -> Self { + Self::new(HttpErrorKind::Request, value) + } +} + +impl From for RequestBuilderError { + fn from(value: std::convert::Infallible) -> Self { + match value {} + } +} + +pub(crate) struct HttpRequestBuilder { + client: HttpClient, + request: Result, +} + +impl HttpRequestBuilder { + pub(crate) fn new(client: HttpClient) -> Self { + Self { + client, + request: Ok(HttpRequest::new(HttpRequestBody::empty())), + } + } + + #[cfg(any(feature = "aws", feature = "azure"))] + pub(crate) fn from_parts(client: HttpClient, request: HttpRequest) -> Self { + Self { + client, + request: Ok(request), + } + } + + pub(crate) fn method(mut self, method: Method) -> Self { + if let Ok(r) = &mut self.request { + *r.method_mut() = method; + } + self + } + + pub(crate) fn uri(mut self, url: U) -> Self + where + U: TryInto, + U::Error: Into, + { + match (url.try_into(), &mut self.request) { + (Ok(uri), Ok(r)) => *r.uri_mut() = uri, + (Err(e), Ok(_)) => self.request = Err(e.into()), + (_, Err(_)) => {} + } + self + } + + pub(crate) fn header(mut self, name: K, value: V) -> Self + where + K: TryInto, + K::Error: Into, + V: TryInto, + V::Error: Into, + { + match (name.try_into(), value.try_into(), &mut self.request) { + (Ok(name), Ok(value), Ok(r)) => { + r.headers_mut().insert(name, value); + } + (Err(e), _, Ok(_)) => self.request = Err(e.into()), + (_, Err(e), Ok(_)) => self.request = Err(e.into()), + (_, _, Err(_)) => {} + } + self + } + + #[cfg(feature = "aws")] + pub(crate) fn headers(mut self, headers: http::HeaderMap) -> Self { + use http::header::{Entry, OccupiedEntry}; + + if let Ok(ref mut req) = self.request { + // IntoIter of HeaderMap yields (Option, HeaderValue). + // The first time a name is yielded, it will be Some(name), and if + // there are more values with the same name, the next yield will be + // None. + + let mut prev_entry: Option> = None; + for (key, value) in headers { + match key { + Some(key) => match req.headers_mut().entry(key) { + Entry::Occupied(mut e) => { + e.insert(value); + prev_entry = Some(e); + } + Entry::Vacant(e) => { + let e = e.insert_entry(value); + prev_entry = Some(e); + } + }, + None => match prev_entry { + Some(ref mut entry) => { + entry.append(value); + } + None => unreachable!("HeaderMap::into_iter yielded None first"), + }, + } + } + } + self + } + + #[cfg(feature = "gcp")] + pub(crate) fn bearer_auth(mut self, token: &str) -> Self { + let value = HeaderValue::try_from(format!("Bearer {}", token)); + match (value, &mut self.request) { + (Ok(mut v), Ok(r)) => { + v.set_sensitive(true); + r.headers_mut().insert(http::header::AUTHORIZATION, v); + } + (Err(e), Ok(_)) => self.request = Err(e.into()), + (_, Err(_)) => {} + } + self + } + + #[cfg(any(feature = "aws", feature = "gcp"))] + pub(crate) fn json(mut self, s: S) -> Self { + match (serde_json::to_vec(&s), &mut self.request) { + (Ok(json), Ok(request)) => { + *request.body_mut() = json.into(); + } + (Err(e), Ok(_)) => self.request = Err(e.into()), + (_, Err(_)) => {} + } + self + } + + #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] + pub(crate) fn query(mut self, query: &T) -> Self { + let mut error = None; + if let Ok(ref mut req) = self.request { + let mut out = format!("{}?", req.uri().path()); + let mut encoder = form_urlencoded::Serializer::new(&mut out); + let serializer = serde_urlencoded::Serializer::new(&mut encoder); + + if let Err(err) = query.serialize(serializer) { + error = Some(err.into()); + } + + match http::uri::PathAndQuery::from_maybe_shared(out) { + Ok(p) => { + let mut parts = req.uri().clone().into_parts(); + parts.path_and_query = Some(p); + *req.uri_mut() = Uri::from_parts(parts).unwrap(); + } + Err(err) => error = Some(err.into()), + } + } + if let Some(err) = error { + self.request = Err(err); + } + self + } + + #[cfg(any(feature = "gcp", feature = "azure"))] + pub(crate) fn form(mut self, form: T) -> Self { + let mut error = None; + if let Ok(ref mut req) = self.request { + match serde_urlencoded::to_string(form) { + Ok(body) => { + req.headers_mut().insert( + http::header::CONTENT_TYPE, + HeaderValue::from_static("application/x-www-form-urlencoded"), + ); + *req.body_mut() = body.into(); + } + Err(err) => error = Some(err.into()), + } + } + if let Some(err) = error { + self.request = Err(err); + } + self + } + + #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] + pub(crate) fn body(mut self, b: impl Into) -> Self { + if let Ok(r) = &mut self.request { + *r.body_mut() = b.into(); + } + self + } + + pub(crate) fn into_parts(self) -> (HttpClient, Result) { + (self.client, self.request) + } +} + +#[cfg(any(test, feature = "azure"))] +pub(crate) fn add_query_pairs(uri: &mut Uri, query_pairs: I) +where + I: IntoIterator, + I::Item: std::borrow::Borrow<(K, V)>, + K: AsRef, + V: AsRef, +{ + let mut parts = uri.clone().into_parts(); + + let mut out = match parts.path_and_query { + Some(p) => match p.query() { + Some(x) => format!("{}?{}", p.path(), x), + None => format!("{}?", p.path()), + }, + None => "/?".to_string(), + }; + let mut serializer = form_urlencoded::Serializer::new(&mut out); + serializer.extend_pairs(query_pairs); + + parts.path_and_query = Some(out.try_into().unwrap()); + *uri = Uri::from_parts(parts).unwrap(); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_add_query_pairs() { + let mut uri = Uri::from_static("https://foo@example.com/bananas?foo=1"); + + add_query_pairs(&mut uri, [("bingo", "foo"), ("auth", "test")]); + assert_eq!( + uri.to_string(), + "https://foo@example.com/bananas?foo=1&bingo=foo&auth=test" + ); + + add_query_pairs(&mut uri, [("t1", "funky shenanigans"), ("a", "😀")]); + assert_eq!( + uri.to_string(), + "https://foo@example.com/bananas?foo=1&bingo=foo&auth=test&t1=funky+shenanigans&a=%F0%9F%98%80" + ); + } +} diff --git a/object_store/src/client/connection.rs b/object_store/src/client/connection.rs new file mode 100644 index 000000000000..8b631694a57b --- /dev/null +++ b/object_store/src/client/connection.rs @@ -0,0 +1,237 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::client::body::{HttpRequest, HttpResponse}; +use crate::client::builder::{HttpRequestBuilder, RequestBuilderError}; +use crate::client::HttpResponseBody; +use crate::ClientOptions; +use async_trait::async_trait; +use http::{Method, Uri}; +use http_body_util::BodyExt; +use std::error::Error; +use std::sync::Arc; + +/// An HTTP protocol error +/// +/// Clients should return this when an HTTP request fails to be completed, e.g. because +/// of a connection issue. This does **not** include HTTP requests that are return +/// non 2xx Status Codes, as these should instead be returned as an [`HttpResponse`] +/// with the appropriate status code set. +#[derive(Debug, thiserror::Error)] +#[error("HTTP error: {source}")] +pub struct HttpError { + kind: HttpErrorKind, + #[source] + source: Box, +} + +/// Identifies the kind of [`HttpError`] +/// +/// This is used, among other things, to determine if a request can be retried +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[non_exhaustive] +pub enum HttpErrorKind { + /// An error occurred whilst connecting to the remote + /// + /// Will be automatically retried + Connect, + /// An error occurred whilst making the request + /// + /// Will be automatically retried + Request, + /// Request timed out + /// + /// Will be automatically retried if the request is idempotent + Timeout, + /// The request was aborted + /// + /// Will be automatically retried if the request is idempotent + Interrupted, + /// An error occurred whilst decoding the response + /// + /// Will not be automatically retried + Decode, + /// An unknown error occurred + /// + /// Will not be automatically retried + Unknown, +} + +impl HttpError { + /// Create a new [`HttpError`] with the optional status code + pub fn new(kind: HttpErrorKind, e: E) -> Self + where + E: Error + Send + Sync + 'static, + { + Self { + kind, + source: Box::new(e), + } + } + + pub(crate) fn reqwest(e: reqwest::Error) -> Self { + let mut kind = if e.is_timeout() { + HttpErrorKind::Timeout + } else if e.is_connect() { + HttpErrorKind::Connect + } else if e.is_decode() { + HttpErrorKind::Decode + } else { + HttpErrorKind::Unknown + }; + + // Reqwest error variants aren't great, attempt to refine them + let mut source = e.source(); + while let Some(e) = source { + if let Some(e) = e.downcast_ref::() { + if e.is_closed() || e.is_incomplete_message() || e.is_body_write_aborted() { + kind = HttpErrorKind::Request; + } else if e.is_timeout() { + kind = HttpErrorKind::Timeout; + } + break; + } + if let Some(e) = e.downcast_ref::() { + match e.kind() { + std::io::ErrorKind::TimedOut => kind = HttpErrorKind::Timeout, + std::io::ErrorKind::ConnectionAborted + | std::io::ErrorKind::BrokenPipe + | std::io::ErrorKind::UnexpectedEof => kind = HttpErrorKind::Interrupted, + _ => {} + } + break; + } + source = e.source(); + } + Self { + kind, + // We strip URL as it will be included by RetryError if not sensitive + source: Box::new(e.without_url()), + } + } + + /// Returns the [`HttpErrorKind`] + pub fn kind(&self) -> HttpErrorKind { + self.kind + } +} + +/// An asynchronous function from a [`HttpRequest`] to a [`HttpResponse`]. +#[async_trait] +pub trait HttpService: std::fmt::Debug + Send + Sync + 'static { + /// Perform [`HttpRequest`] returning [`HttpResponse`] + async fn call(&self, req: HttpRequest) -> Result; +} + +/// An HTTP client +#[derive(Debug, Clone)] +pub struct HttpClient(Arc); + +impl HttpClient { + /// Create a new [`HttpClient`] from an [`HttpService`] + pub fn new(service: impl HttpService + 'static) -> Self { + Self(Arc::new(service)) + } + + /// Performs [`HttpRequest`] using this client + pub async fn execute(&self, request: HttpRequest) -> Result { + self.0.call(request).await + } + + #[allow(unused)] + pub(crate) fn get(&self, url: U) -> HttpRequestBuilder + where + U: TryInto, + U::Error: Into, + { + self.request(Method::GET, url) + } + + #[allow(unused)] + pub(crate) fn post(&self, url: U) -> HttpRequestBuilder + where + U: TryInto, + U::Error: Into, + { + self.request(Method::POST, url) + } + + #[allow(unused)] + pub(crate) fn put(&self, url: U) -> HttpRequestBuilder + where + U: TryInto, + U::Error: Into, + { + self.request(Method::PUT, url) + } + + #[allow(unused)] + pub(crate) fn delete(&self, url: U) -> HttpRequestBuilder + where + U: TryInto, + U::Error: Into, + { + self.request(Method::DELETE, url) + } + + pub(crate) fn request(&self, method: Method, url: U) -> HttpRequestBuilder + where + U: TryInto, + U::Error: Into, + { + HttpRequestBuilder::new(self.clone()) + .uri(url) + .method(method) + } +} + +#[async_trait] +impl HttpService for reqwest::Client { + async fn call(&self, req: HttpRequest) -> Result { + let (parts, body) = req.into_parts(); + + let url = parts.uri.to_string().parse().unwrap(); + let mut req = reqwest::Request::new(parts.method, url); + *req.headers_mut() = parts.headers; + *req.body_mut() = Some(body.into_reqwest()); + + let r = self.execute(req).await.map_err(HttpError::reqwest)?; + let res: http::Response = r.into(); + let (parts, body) = res.into_parts(); + + let body = HttpResponseBody::new(body.map_err(HttpError::reqwest)); + Ok(HttpResponse::from_parts(parts, body)) + } +} + +/// A factory for [`HttpClient`] +pub trait HttpConnector: std::fmt::Debug + Send + Sync + 'static { + /// Create a new [`HttpClient`] with the provided [`ClientOptions`] + fn connect(&self, options: &ClientOptions) -> crate::Result; +} + +/// [`HttpConnector`] using [`reqwest::Client`] +#[derive(Debug, Default)] +#[allow(missing_copy_implementations)] +pub struct ReqwestConnector {} + +impl HttpConnector for ReqwestConnector { + fn connect(&self, options: &ClientOptions) -> crate::Result { + let client = options.client()?; + Ok(HttpClient::new(client)) + } +} diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs index f252dd9c2a29..4c65c6d17d8a 100644 --- a/object_store/src/client/get.rs +++ b/object_store/src/client/get.rs @@ -18,17 +18,17 @@ use std::ops::Range; use crate::client::header::{header_meta, HeaderConfig}; +use crate::client::HttpResponse; use crate::path::Path; use crate::{Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, Result}; use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; -use hyper::header::{ +use http::header::{ CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_RANGE, CONTENT_TYPE, }; -use hyper::StatusCode; +use http::StatusCode; use reqwest::header::ToStrError; -use reqwest::Response; /// A client that can perform a get request #[async_trait] @@ -38,7 +38,7 @@ pub(crate) trait GetClient: Send + Sync + 'static { /// Configure the [`HeaderConfig`] for this client const HEADER_CONFIG: HeaderConfig; - async fn get_request(&self, path: &Path, options: GetOptions) -> Result; + async fn get_request(&self, path: &Path, options: GetOptions) -> Result; } /// Extension trait for [`GetClient`] that adds common retrieval functionality @@ -148,7 +148,7 @@ enum GetResultError { fn get_result( location: &Path, range: Option, - response: Response, + response: HttpResponse, ) -> Result { let mut meta = header_meta(location, response.headers(), T::HEADER_CONFIG)?; @@ -241,6 +241,7 @@ fn get_result( } let stream = response + .into_body() .bytes_stream() .map_err(|source| crate::Error::Generic { store: T::STORE, @@ -259,8 +260,7 @@ fn get_result( #[cfg(test)] mod tests { use super::*; - use hyper::http; - use hyper::http::header::*; + use http::header::*; struct TestClient {} @@ -275,7 +275,7 @@ mod tests { user_defined_metadata_prefix: Some("x-test-meta-"), }; - async fn get_request(&self, _: &Path, _: GetOptions) -> Result { + async fn get_request(&self, _: &Path, _: GetOptions) -> Result { unimplemented!() } } @@ -286,7 +286,7 @@ mod tests { status: StatusCode, content_range: Option<&str>, headers: Option>, - ) -> Response { + ) -> HttpResponse { let mut builder = http::Response::builder(); if let Some(range) = content_range { builder = builder.header(CONTENT_RANGE, range); @@ -306,9 +306,8 @@ mod tests { builder .status(status) .header(CONTENT_LENGTH, object_size) - .body(body) + .body(body.into()) .unwrap() - .into() } #[tokio::test] diff --git a/object_store/src/client/header.rs b/object_store/src/client/header.rs index db06da6345d5..d7e14b3fb54a 100644 --- a/object_store/src/client/header.rs +++ b/object_store/src/client/header.rs @@ -20,8 +20,8 @@ use crate::path::Path; use crate::ObjectMeta; use chrono::{DateTime, TimeZone, Utc}; -use hyper::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED}; -use hyper::HeaderMap; +use http::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED}; +use http::HeaderMap; #[derive(Debug, Copy, Clone)] /// Configuration for header extraction diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index 11f8b3e8e48f..4fe3cff159a2 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -42,19 +42,28 @@ pub(crate) mod header; #[cfg(any(feature = "aws", feature = "gcp"))] pub(crate) mod s3; +mod body; +pub use body::{HttpRequest, HttpRequestBody, HttpResponse, HttpResponseBody}; + +pub(crate) mod builder; + +mod connection; +pub use connection::{ + HttpClient, HttpConnector, HttpError, HttpErrorKind, HttpService, ReqwestConnector, +}; + #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] pub(crate) mod parts; use async_trait::async_trait; +use reqwest::header::{HeaderMap, HeaderValue}; +use reqwest::{Client, ClientBuilder, NoProxy, Proxy}; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use reqwest::header::{HeaderMap, HeaderValue}; -use reqwest::{Client, ClientBuilder, NoProxy, Proxy, RequestBuilder}; -use serde::{Deserialize, Serialize}; - use crate::config::{fmt_duration, ConfigValue}; use crate::path::Path; use crate::{GetOptions, Result}; @@ -593,17 +602,16 @@ impl ClientOptions { } } - /// Create a [`Client`] with overrides optimised for metadata endpoint access + /// Returns a copy of this [`ClientOptions`] with overrides necessary for metadata endpoint access /// /// In particular: /// * Allows HTTP as metadata endpoints do not use TLS /// * Configures a low connection timeout to provide quick feedback if not present #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] - pub(crate) fn metadata_client(&self) -> Result { + pub(crate) fn metadata_options(&self) -> Self { self.clone() .with_allow_http(true) .with_connect_timeout(Duration::from_secs(1)) - .client() } pub(crate) fn client(&self) -> Result { @@ -706,7 +714,7 @@ pub(crate) trait GetOptionsExt { fn with_get_options(self, options: GetOptions) -> Self; } -impl GetOptionsExt for RequestBuilder { +impl GetOptionsExt for HttpRequestBuilder { fn with_get_options(mut self, options: GetOptions) -> Self { use hyper::header::*; @@ -782,13 +790,13 @@ mod cloud { #[derive(Debug)] pub(crate) struct TokenCredentialProvider { inner: T, - client: Client, + client: HttpClient, retry: RetryConfig, cache: TokenCache>, } impl TokenCredentialProvider { - pub(crate) fn new(inner: T, client: Client, retry: RetryConfig) -> Self { + pub(crate) fn new(inner: T, client: HttpClient, retry: RetryConfig) -> Self { Self { inner, client, @@ -822,12 +830,13 @@ mod cloud { async fn fetch_token( &self, - client: &Client, + client: &HttpClient, retry: &RetryConfig, ) -> Result>>; } } +use crate::client::builder::HttpRequestBuilder; #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))] pub(crate) use cloud::*; diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs index a3f8fcb78cfc..96244aac9b0f 100644 --- a/object_store/src/client/retry.rs +++ b/object_store/src/client/retry.rs @@ -18,60 +18,118 @@ //! A shared HTTP client implementation incorporating retries use crate::client::backoff::{Backoff, BackoffConfig}; +use crate::client::builder::HttpRequestBuilder; +use crate::client::connection::HttpErrorKind; +use crate::client::{HttpClient, HttpError, HttpRequest, HttpResponse}; use crate::PutPayload; use futures::future::BoxFuture; +use http::{Method, Uri}; use reqwest::header::LOCATION; -use reqwest::{Client, Request, Response, StatusCode}; -use std::error::Error as StdError; +use reqwest::StatusCode; use std::time::{Duration, Instant}; -use tracing::{debug, info}; +use tracing::info; /// Retry request error #[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("Received redirect without LOCATION, this normally indicates an incorrectly configured region")] +pub struct RetryError { + method: Method, + uri: Option, + retries: usize, + max_retries: usize, + elapsed: Duration, + retry_timeout: Duration, + inner: RequestError, +} + +impl std::fmt::Display for RetryError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Error performing {} ", self.method)?; + match &self.uri { + Some(uri) => write!(f, "{uri} ")?, + None => write!(f, "REDACTED ")?, + } + write!(f, "in {:?}", self.elapsed)?; + if self.retries != 0 { + write!( + f, + ", after {} retries, max_retries: {}, retry_timeout: {:?} ", + self.retries, self.max_retries, self.retry_timeout + )?; + } + write!(f, " - {}", self.inner) + } +} + +/// Context of the retry loop +struct RetryContext { + method: Method, + uri: Option, + retries: usize, + max_retries: usize, + start: Instant, + retry_timeout: Duration, +} + +impl RetryContext { + fn err(self, error: RequestError) -> RetryError { + RetryError { + uri: self.uri, + method: self.method, + retries: self.retries, + max_retries: self.max_retries, + elapsed: self.start.elapsed(), + retry_timeout: self.retry_timeout, + inner: error, + } + } + + fn exhausted(&self) -> bool { + self.retries == self.max_retries || self.start.elapsed() > self.retry_timeout + } +} + +/// The reason a request failed +#[derive(Debug, thiserror::Error)] +pub enum RequestError { + #[error("Received redirect without LOCATION, this normally indicates an incorrectly configured region" + )] BareRedirect, - #[error("Server error, body contains Error, with status {status}: {}", body.as_deref().unwrap_or("No Body"))] - Server { + #[error("Server returned non-2xx status code: {status}: {}", body.as_deref().unwrap_or(""))] + Status { status: StatusCode, body: Option, }, - #[error("Client error with status {status}: {}", body.as_deref().unwrap_or("No Body"))] - Client { - status: StatusCode, - body: Option, - }, + #[error("Server returned error response: {body}")] + Response { status: StatusCode, body: String }, - #[error("Error after {retries} retries in {elapsed:?}, max_retries:{max_retries}, retry_timeout:{retry_timeout:?}, source:{source}")] - Reqwest { - retries: usize, - max_retries: usize, - elapsed: Duration, - retry_timeout: Duration, - source: reqwest::Error, - }, + #[error(transparent)] + Http(#[from] HttpError), } -impl Error { +impl RetryError { + /// Returns the underlying [`RequestError`] + pub fn inner(&self) -> &RequestError { + &self.inner + } + /// Returns the status code associated with this error if any pub fn status(&self) -> Option { - match self { - Self::BareRedirect => None, - Self::Server { status, .. } => Some(*status), - Self::Client { status, .. } => Some(*status), - Self::Reqwest { source, .. } => source.status(), + match &self.inner { + RequestError::Status { status, .. } | RequestError::Response { status, .. } => { + Some(*status) + } + RequestError::BareRedirect | RequestError::Http(_) => None, } } /// Returns the error body if any pub fn body(&self) -> Option<&str> { - match self { - Self::Client { body, .. } => body.as_deref(), - Self::Server { body, .. } => body.as_deref(), - Self::BareRedirect => None, - Self::Reqwest { .. } => None, + match &self.inner { + RequestError::Status { body, .. } => body.as_deref(), + RequestError::Response { body, .. } => Some(body), + RequestError::BareRedirect | RequestError::Http(_) => None, } } @@ -109,34 +167,29 @@ impl Error { } } -impl From for std::io::Error { - fn from(err: Error) -> Self { +impl From for std::io::Error { + fn from(err: RetryError) -> Self { use std::io::ErrorKind; - match &err { - Error::Client { - status: StatusCode::NOT_FOUND, - .. - } => Self::new(ErrorKind::NotFound, err), - Error::Client { - status: StatusCode::BAD_REQUEST, - .. - } => Self::new(ErrorKind::InvalidInput, err), - Error::Client { - status: StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN, - .. - } => Self::new(ErrorKind::PermissionDenied, err), - Error::Reqwest { source, .. } if source.is_timeout() => { - Self::new(ErrorKind::TimedOut, err) + let kind = match err.status() { + Some(StatusCode::NOT_FOUND) => ErrorKind::NotFound, + Some(StatusCode::BAD_REQUEST) => ErrorKind::InvalidInput, + Some(StatusCode::UNAUTHORIZED) | Some(StatusCode::FORBIDDEN) => { + ErrorKind::PermissionDenied } - Error::Reqwest { source, .. } if source.is_connect() => { - Self::new(ErrorKind::NotConnected, err) - } - _ => Self::new(ErrorKind::Other, err), - } + _ => match &err.inner { + RequestError::Http(h) => match h.kind() { + HttpErrorKind::Timeout => ErrorKind::TimedOut, + HttpErrorKind::Connect => ErrorKind::NotConnected, + _ => ErrorKind::Other, + }, + _ => ErrorKind::Other, + }, + }; + Self::new(kind, err) } } -pub(crate) type Result = std::result::Result; +pub(crate) type Result = std::result::Result; /// The configuration for how to respond to request errors /// @@ -190,8 +243,8 @@ fn body_contains_error(response_body: &str) -> bool { } pub(crate) struct RetryableRequest { - client: Client, - request: Request, + client: HttpClient, + request: HttpRequest, max_retries: usize, retry_timeout: Duration, @@ -247,35 +300,32 @@ impl RetryableRequest { } } - pub(crate) async fn send(self) -> Result { - let max_retries = self.max_retries; - let retry_timeout = self.retry_timeout; - let mut retries = 0; - let now = Instant::now(); + pub(crate) async fn send(self) -> Result { + let mut ctx = RetryContext { + retries: 0, + uri: (!self.sensitive).then(|| self.request.uri().clone()), + method: self.request.method().clone(), + max_retries: self.max_retries, + start: Instant::now(), + retry_timeout: self.retry_timeout, + }; let mut backoff = self.backoff; let is_idempotent = self .idempotent .unwrap_or_else(|| self.request.method().is_safe()); - let sanitize_err = move |e: reqwest::Error| match self.sensitive { - true => e.without_url(), - false => e, - }; - loop { - let mut request = self - .request - .try_clone() - .expect("request body must be cloneable"); + let mut request = self.request.clone(); if let Some(payload) = &self.payload { - *request.body_mut() = Some(payload.body()); + *request.body_mut() = payload.clone().into(); } match self.client.execute(request).await { - Ok(r) => match r.error_for_status_ref() { - Ok(_) if r.status().is_success() => { + Ok(r) => { + let status = r.status(); + if status.is_success() { // For certain S3 requests, 200 response may contain `InternalError` or // `SlowDown` in the message. These responses should be handled similarly // to r5xx errors. @@ -284,164 +334,95 @@ impl RetryableRequest { return Ok(r); } - let status = r.status(); - let headers = r.headers().clone(); - - let bytes = r.bytes().await.map_err(|e| Error::Reqwest { - retries, - max_retries, - elapsed: now.elapsed(), - retry_timeout, - source: e, - })?; - - let response_body = String::from_utf8_lossy(&bytes); - debug!("Checking for error in response_body: {}", response_body); + let (parts, body) = r.into_parts(); + let body = match body.text().await { + Ok(body) => body, + Err(e) => return Err(ctx.err(RequestError::Http(e))), + }; - if !body_contains_error(&response_body) { + if !body_contains_error(&body) { // Success response and no error, clone and return response - let mut success_response = hyper::Response::new(bytes); - *success_response.status_mut() = status; - *success_response.headers_mut() = headers; - - return Ok(reqwest::Response::from(success_response)); + return Ok(HttpResponse::from_parts(parts, body.into())); } else { // Retry as if this was a 5xx response - if retries == max_retries || now.elapsed() > retry_timeout { - return Err(Error::Server { - body: Some(response_body.into_owned()), - status, - }); + if ctx.exhausted() { + return Err(ctx.err(RequestError::Response { body, status })); } let sleep = backoff.next(); - retries += 1; + ctx.retries += 1; info!( "Encountered a response status of {} but body contains Error, backing off for {} seconds, retry {} of {}", status, sleep.as_secs_f32(), - retries, - max_retries, + ctx.retries, + ctx.max_retries, ); tokio::time::sleep(sleep).await; } - } - Ok(r) if r.status() == StatusCode::NOT_MODIFIED => { - return Err(Error::Client { - body: None, - status: StatusCode::NOT_MODIFIED, - }) - } - Ok(r) => { - let is_bare_redirect = - r.status().is_redirection() && !r.headers().contains_key(LOCATION); + } else if status == StatusCode::NOT_MODIFIED { + return Err(ctx.err(RequestError::Status { status, body: None })); + } else if status.is_redirection() { + let is_bare_redirect = !r.headers().contains_key(LOCATION); return match is_bare_redirect { - true => Err(Error::BareRedirect), - // Not actually sure if this is reachable, but here for completeness - false => Err(Error::Client { + true => Err(ctx.err(RequestError::BareRedirect)), + false => Err(ctx.err(RequestError::Status { body: None, status: r.status(), - }), + })), }; - } - Err(e) => { - let e = sanitize_err(e); + } else { let status = r.status(); - if retries == max_retries - || now.elapsed() > retry_timeout + if ctx.exhausted() || !(status.is_server_error() || (self.retry_on_conflict && status == StatusCode::CONFLICT)) { - return Err(match status.is_client_error() { - true => match r.text().await { - Ok(body) => Error::Client { - body: Some(body).filter(|b| !b.is_empty()), + let source = match status.is_client_error() { + true => match r.into_body().text().await { + Ok(body) => RequestError::Status { status, + body: Some(body), }, - Err(e) => Error::Reqwest { - retries, - max_retries, - elapsed: now.elapsed(), - retry_timeout, - source: e, - }, + Err(e) => RequestError::Http(e), }, - false => Error::Reqwest { - retries, - max_retries, - elapsed: now.elapsed(), - retry_timeout, - source: e, - }, - }); - } + false => RequestError::Status { status, body: None }, + }; + return Err(ctx.err(source)); + }; let sleep = backoff.next(); - retries += 1; + ctx.retries += 1; info!( - "Encountered server error, backing off for {} seconds, retry {} of {}: {}", + "Encountered server error, backing off for {} seconds, retry {} of {}", sleep.as_secs_f32(), - retries, - max_retries, - e, + ctx.retries, + ctx.max_retries, ); tokio::time::sleep(sleep).await; } - }, + } Err(e) => { - let e = sanitize_err(e); + // let e = sanitize_err(e); - let mut do_retry = false; - if e.is_connect() - || e.is_body() - || (e.is_request() && !e.is_timeout()) - || (is_idempotent && e.is_timeout()) - { - do_retry = true - } else { - let mut source = e.source(); - while let Some(e) = source { - if let Some(e) = e.downcast_ref::() { - do_retry = e.is_closed() - || e.is_incomplete_message() - || e.is_body_write_aborted() - || (is_idempotent && e.is_timeout()); - break; - } - if let Some(e) = e.downcast_ref::() { - if e.kind() == std::io::ErrorKind::TimedOut { - do_retry = is_idempotent; - } else { - do_retry = matches!( - e.kind(), - std::io::ErrorKind::ConnectionReset - | std::io::ErrorKind::ConnectionAborted - | std::io::ErrorKind::BrokenPipe - | std::io::ErrorKind::UnexpectedEof - ); - } - break; - } - source = e.source(); - } - } + let do_retry = match e.kind() { + HttpErrorKind::Connect | HttpErrorKind::Request => true, // Request not sent, can retry + HttpErrorKind::Timeout | HttpErrorKind::Interrupted => is_idempotent, + HttpErrorKind::Unknown | HttpErrorKind::Decode => false, + }; - if retries == max_retries || now.elapsed() > retry_timeout || !do_retry { - return Err(Error::Reqwest { - retries, - max_retries, - elapsed: now.elapsed(), - retry_timeout, - source: e, - }); + if ctx.retries == ctx.max_retries + || ctx.start.elapsed() > ctx.retry_timeout + || !do_retry + { + return Err(ctx.err(RequestError::Http(e))); } let sleep = backoff.next(); - retries += 1; + ctx.retries += 1; info!( "Encountered transport error backing off for {} seconds, retry {} of {}: {}", sleep.as_secs_f32(), - retries, - max_retries, + ctx.retries, + ctx.max_retries, e, ); tokio::time::sleep(sleep).await; @@ -460,12 +441,12 @@ pub(crate) trait RetryExt { /// # Panic /// /// This will panic if the request body is a stream - fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result>; + fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result>; } -impl RetryExt for reqwest::RequestBuilder { +impl RetryExt for HttpRequestBuilder { fn retryable(self, config: &RetryConfig) -> RetryableRequest { - let (client, request) = self.build_split(); + let (client, request) = self.into_parts(); let request = request.expect("request must be valid"); RetryableRequest { @@ -482,7 +463,7 @@ impl RetryExt for reqwest::RequestBuilder { } } - fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result> { + fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result> { let request = self.retryable(config); Box::pin(async move { request.send().await }) } @@ -491,7 +472,8 @@ impl RetryExt for reqwest::RequestBuilder { #[cfg(test)] mod tests { use crate::client::mock_server::MockServer; - use crate::client::retry::{body_contains_error, Error, RetryExt}; + use crate::client::retry::{body_contains_error, RequestError, RetryExt}; + use crate::client::HttpClient; use crate::RetryConfig; use hyper::header::LOCATION; use hyper::Response; @@ -522,10 +504,12 @@ mod tests { retry_timeout: Duration::from_secs(1000), }; - let client = Client::builder() - .timeout(Duration::from_millis(100)) - .build() - .unwrap(); + let client = HttpClient::new( + Client::builder() + .timeout(Duration::from_millis(100)) + .build() + .unwrap(), + ); let do_request = || client.request(Method::GET, mock.url()).send_retry(&retry); @@ -545,24 +529,24 @@ mod tests { assert_eq!(e.status().unwrap(), StatusCode::BAD_REQUEST); assert_eq!(e.body(), Some("cupcakes")); assert_eq!( - e.to_string(), - "Client error with status 400 Bad Request: cupcakes" + e.inner().to_string(), + "Server returned non-2xx status code: 400 Bad Request: cupcakes" ); // Handles client errors with no payload mock.push( Response::builder() .status(StatusCode::BAD_REQUEST) - .body(String::new()) + .body("NAUGHTY NAUGHTY".to_string()) .unwrap(), ); let e = do_request().await.unwrap_err(); assert_eq!(e.status().unwrap(), StatusCode::BAD_REQUEST); - assert_eq!(e.body(), None); + assert_eq!(e.body(), Some("NAUGHTY NAUGHTY")); assert_eq!( - e.to_string(), - "Client error with status 400 Bad Request: No Body" + e.inner().to_string(), + "Server returned non-2xx status code: 400 Bad Request: NAUGHTY NAUGHTY" ); // Should retry server error request @@ -598,7 +582,6 @@ mod tests { let r = do_request().await.unwrap(); assert_eq!(r.status(), StatusCode::OK); - assert_eq!(r.url().path(), "/foo"); // Follows 401 redirects mock.push( @@ -611,7 +594,6 @@ mod tests { let r = do_request().await.unwrap(); assert_eq!(r.status(), StatusCode::OK); - assert_eq!(r.url().path(), "/bar"); // Handles redirect loop for _ in 0..10 { @@ -625,7 +607,7 @@ mod tests { } let e = do_request().await.unwrap_err().to_string(); - assert!(e.contains("error following redirect for url"), "{}", e); + assert!(e.contains("error following redirect"), "{}", e); // Handles redirect missing location mock.push( @@ -636,8 +618,8 @@ mod tests { ); let e = do_request().await.unwrap_err(); - assert!(matches!(e, Error::BareRedirect)); - assert_eq!(e.to_string(), "Received redirect without LOCATION, this normally indicates an incorrectly configured region"); + assert!(matches!(e.inner, RequestError::BareRedirect)); + assert_eq!(e.inner().to_string(), "Received redirect without LOCATION, this normally indicates an incorrectly configured region"); // Gives up after the retrying the specified number of times for _ in 0..=retry.max_retries { @@ -651,8 +633,7 @@ mod tests { let e = do_request().await.unwrap_err().to_string(); assert!( - e.contains("Error after 2 retries in") && - e.contains("max_retries:2, retry_timeout:1000s, source:HTTP status server error (502 Bad Gateway) for url"), + e.contains(" after 2 retries, max_retries: 2, retry_timeout: 1000s - Server returned non-2xx status code: 502 Bad Gateway"), "{e}" ); @@ -667,10 +648,7 @@ mod tests { } let e = do_request().await.unwrap_err().to_string(); assert!( - e.contains("Error after 2 retries in") - && e.contains( - "max_retries:2, retry_timeout:1000s, source:error sending request for url" - ), + e.contains("after 2 retries, max_retries: 2, retry_timeout: 1000s - HTTP error: error sending request"), "{e}" ); @@ -689,7 +667,7 @@ mod tests { let res = client.request(Method::PUT, mock.url()).send_retry(&retry); let e = res.await.unwrap_err().to_string(); assert!( - e.contains("Error after 0 retries in") && e.contains("error sending request for url"), + !e.contains("retries") && e.contains("error sending request"), "{e}" ); @@ -750,7 +728,8 @@ mod tests { let r = req.send().await.unwrap(); assert_eq!(r.status(), StatusCode::OK); // Response with InternalError should have been retried - assert!(!r.text().await.unwrap().contains("InternalError")); + let b = r.into_body().text().await.unwrap(); + assert!(!b.contains("InternalError")); // Should not retry success response with no error in body mock.push( @@ -766,7 +745,8 @@ mod tests { .retry_error_body(true); let r = req.send().await.unwrap(); assert_eq!(r.status(), StatusCode::OK); - assert!(r.text().await.unwrap().contains("success")); + let b = r.into_body().text().await.unwrap(); + assert!(b.contains("success")); // Shutdown mock.shutdown().await diff --git a/object_store/src/gcp/builder.rs b/object_store/src/gcp/builder.rs index cc5c1e1a0745..793978355767 100644 --- a/object_store/src/gcp/builder.rs +++ b/object_store/src/gcp/builder.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::client::TokenCredentialProvider; +use crate::client::{HttpConnector, ReqwestConnector, TokenCredentialProvider}; use crate::gcp::client::{GoogleCloudStorageClient, GoogleCloudStorageConfig}; use crate::gcp::credential::{ ApplicationDefaultCredentials, InstanceCredentialProvider, ServiceAccountCredentials, @@ -111,6 +111,8 @@ pub struct GoogleCloudStorageBuilder { credentials: Option, /// Credentials for sign url signing_credentials: Option, + /// The [`HttpConnector`] to use + http_connector: Option>, } /// Configuration keys for [`GoogleCloudStorageBuilder`] @@ -207,6 +209,7 @@ impl Default for GoogleCloudStorageBuilder { url: None, credentials: None, signing_credentials: None, + http_connector: None, } } } @@ -424,6 +427,12 @@ impl GoogleCloudStorageBuilder { self } + /// Overrides the [`HttpConnector`], by default uses [`ReqwestConnector`] + pub fn with_http_connector(mut self, connector: C) -> Self { + self.http_connector = Some(Arc::new(connector)); + self + } + /// Configure a connection to Google Cloud Storage, returning a /// new [`GoogleCloudStorage`] and consuming `self` pub fn build(mut self) -> Result { @@ -433,6 +442,10 @@ impl GoogleCloudStorageBuilder { let bucket_name = self.bucket_name.ok_or(Error::MissingBucketName {})?; + let http = self + .http_connector + .unwrap_or_else(|| Arc::new(ReqwestConnector::default())); + // First try to initialize from the service account information. let service_account_credentials = match (self.service_account_path, self.service_account_key) { @@ -471,7 +484,7 @@ impl GoogleCloudStorageBuilder { } else if let Some(credentials) = service_account_credentials.clone() { Arc::new(TokenCredentialProvider::new( credentials.token_provider()?, - self.client_options.client()?, + http.connect(&self.client_options)?, self.retry_config.clone(), )) as _ } else if let Some(credentials) = application_default_credentials.clone() { @@ -479,7 +492,7 @@ impl GoogleCloudStorageBuilder { ApplicationDefaultCredentials::AuthorizedUser(token) => Arc::new( TokenCredentialProvider::new( token, - self.client_options.client()?, + http.connect(&self.client_options)?, self.retry_config.clone(), ) .with_min_ttl(TOKEN_MIN_TTL), @@ -487,7 +500,7 @@ impl GoogleCloudStorageBuilder { ApplicationDefaultCredentials::ServiceAccount(token) => { Arc::new(TokenCredentialProvider::new( token.token_provider()?, - self.client_options.client()?, + http.connect(&self.client_options)?, self.retry_config.clone(), )) as _ } @@ -496,7 +509,7 @@ impl GoogleCloudStorageBuilder { Arc::new( TokenCredentialProvider::new( InstanceCredentialProvider::default(), - self.client_options.metadata_client()?, + http.connect(&self.client_options.metadata_options())?, self.retry_config.clone(), ) .with_min_ttl(TOKEN_MIN_TTL), @@ -517,7 +530,7 @@ impl GoogleCloudStorageBuilder { ApplicationDefaultCredentials::AuthorizedUser(token) => { Arc::new(TokenCredentialProvider::new( AuthorizedUserSigningCredentials::from(token)?, - self.client_options.client()?, + http.connect(&self.client_options)?, self.retry_config.clone(), )) as _ } @@ -528,7 +541,7 @@ impl GoogleCloudStorageBuilder { } else { Arc::new(TokenCredentialProvider::new( InstanceSigningCredentialProvider::default(), - self.client_options.metadata_client()?, + http.connect(&self.client_options.metadata_options())?, self.retry_config.clone(), )) as _ }; @@ -542,8 +555,9 @@ impl GoogleCloudStorageBuilder { self.client_options, ); + let http_client = http.connect(&config.client_options)?; Ok(GoogleCloudStorage { - client: Arc::new(GoogleCloudStorageClient::new(config)?), + client: Arc::new(GoogleCloudStorageClient::new(config, http_client)?), }) } } diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index 8dd1c69802a8..a52ad3663fdf 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::client::builder::HttpRequestBuilder; use crate::client::get::GetClient; use crate::client::header::{get_put_result, get_version, HeaderConfig}; use crate::client::list::ListClient; @@ -23,7 +24,7 @@ use crate::client::s3::{ CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult, ListResponse, }; -use crate::client::GetOptionsExt; +use crate::client::{GetOptionsExt, HttpClient, HttpError, HttpResponse}; use crate::gcp::{GcpCredential, GcpCredentialProvider, GcpSigningCredentialProvider, STORE}; use crate::multipart::PartId; use crate::path::{Path, DELIMITER}; @@ -36,13 +37,12 @@ use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::Buf; -use hyper::header::{ +use http::header::{ CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LENGTH, CONTENT_TYPE, }; +use http::{HeaderName, Method, StatusCode}; use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC}; -use reqwest::header::HeaderName; -use reqwest::{Client, Method, RequestBuilder, Response, StatusCode}; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -55,28 +55,30 @@ static VERSION_MATCH: HeaderName = HeaderName::from_static("x-goog-if-generation #[derive(Debug, thiserror::Error)] enum Error { #[error("Error performing list request: {}", source)] - ListRequest { source: crate::client::retry::Error }, + ListRequest { + source: crate::client::retry::RetryError, + }, #[error("Error getting list response body: {}", source)] - ListResponseBody { source: reqwest::Error }, + ListResponseBody { source: HttpError }, #[error("Got invalid list response: {}", source)] InvalidListResponse { source: quick_xml::de::DeError }, #[error("Error performing get request {}: {}", path, source)] GetRequest { - source: crate::client::retry::Error, + source: crate::client::retry::RetryError, path: String, }, #[error("Error performing request {}: {}", path, source)] Request { - source: crate::client::retry::Error, + source: crate::client::retry::RetryError, path: String, }, #[error("Error getting put response body: {}", source)] - PutResponseBody { source: reqwest::Error }, + PutResponseBody { source: HttpError }, #[error("Got invalid put request: {}", source)] InvalidPutRequest { source: quick_xml::se::SeError }, @@ -93,19 +95,23 @@ enum Error { MissingVersion, #[error("Error performing complete multipart request: {}", source)] - CompleteMultipartRequest { source: crate::client::retry::Error }, + CompleteMultipartRequest { + source: crate::client::retry::RetryError, + }, #[error("Error getting complete multipart response body: {}", source)] - CompleteMultipartResponseBody { source: reqwest::Error }, + CompleteMultipartResponseBody { source: HttpError }, #[error("Got invalid multipart response: {}", source)] InvalidMultipartResponse { source: quick_xml::de::DeError }, #[error("Error signing blob: {}", source)] - SignBlobRequest { source: crate::client::retry::Error }, + SignBlobRequest { + source: crate::client::retry::RetryError, + }, #[error("Got invalid signing blob response: {}", source)] - InvalidSignBlobResponse { source: reqwest::Error }, + InvalidSignBlobResponse { source: HttpError }, #[error("Got invalid signing blob signature: {}", source)] InvalidSignBlobSignature { source: base64::DecodeError }, @@ -169,7 +175,7 @@ pub(crate) struct Request<'a> { path: &'a Path, config: &'a GoogleCloudStorageConfig, payload: Option, - builder: RequestBuilder, + builder: HttpRequestBuilder, idempotent: bool, } @@ -225,7 +231,7 @@ impl Request<'_> { } } - async fn send(self) -> Result { + async fn send(self) -> Result { let credential = self.config.credentials.get_credential().await?; let resp = self .builder @@ -268,7 +274,7 @@ struct SignBlobResponse { pub(crate) struct GoogleCloudStorageClient { config: GoogleCloudStorageConfig, - client: Client, + client: HttpClient, bucket_name_encoded: String, @@ -277,8 +283,7 @@ pub(crate) struct GoogleCloudStorageClient { } impl GoogleCloudStorageClient { - pub(crate) fn new(config: GoogleCloudStorageConfig) -> Result { - let client = config.client_options.client()?; + pub(crate) fn new(config: GoogleCloudStorageConfig, client: HttpClient) -> Result { let bucket_name_encoded = percent_encode(config.bucket_name.as_bytes(), NON_ALPHANUMERIC).to_string(); @@ -337,10 +342,8 @@ impl GoogleCloudStorageClient { .idempotent(true) .send() .await - .map_err(|source| Error::SignBlobRequest { source })?; - - //If successful, the signature is returned in the signedBlob field in the response. - let response = response + .map_err(|source| Error::SignBlobRequest { source })? + .into_body() .json::() .await .map_err(|source| Error::InvalidSignBlobResponse { source })?; @@ -445,6 +448,7 @@ impl GoogleCloudStorageClient { .await?; let data = response + .into_body() .bytes() .await .map_err(|source| Error::PutResponseBody { source })?; @@ -527,6 +531,7 @@ impl GoogleCloudStorageClient { .map_err(|source| Error::Metadata { source })?; let data = response + .into_body() .bytes() .await .map_err(|source| Error::CompleteMultipartResponseBody { source })?; @@ -600,7 +605,7 @@ impl GetClient for GoogleCloudStorageClient { }; /// Perform a get request - async fn get_request(&self, path: &Path, options: GetOptions) -> Result { + async fn get_request(&self, path: &Path, options: GetOptions) -> Result { let credential = self.get_credential().await?; let url = self.object_url(path); @@ -675,6 +680,7 @@ impl ListClient for Arc { .send_retry(&self.config.retry_config) .await .map_err(|source| Error::ListRequest { source })? + .into_body() .bytes() .await .map_err(|source| Error::ListResponseBody { source })?; diff --git a/object_store/src/gcp/credential.rs b/object_store/src/gcp/credential.rs index 4b21ad1d3eab..373c2c205cdc 100644 --- a/object_store/src/gcp/credential.rs +++ b/object_store/src/gcp/credential.rs @@ -18,7 +18,7 @@ use super::client::GoogleCloudStorageClient; use crate::client::retry::RetryExt; use crate::client::token::TemporaryToken; -use crate::client::TokenProvider; +use crate::client::{HttpClient, HttpError, TokenProvider}; use crate::gcp::{GcpSigningCredentialProvider, STORE}; use crate::util::{hex_digest, hex_encode, STRICT_ENCODE_SET}; use crate::{RetryConfig, StaticCredentialProvider}; @@ -27,10 +27,9 @@ use base64::prelude::BASE64_URL_SAFE_NO_PAD; use base64::Engine; use chrono::{DateTime, Utc}; use futures::TryFutureExt; -use hyper::HeaderMap; +use http::{HeaderMap, Method}; use itertools::Itertools; use percent_encoding::utf8_percent_encode; -use reqwest::{Client, Method}; use ring::signature::RsaKeyPair; use serde::Deserialize; use std::collections::BTreeMap; @@ -83,10 +82,12 @@ pub enum Error { UnsupportedKey { encoding: String }, #[error("Error performing token request: {}", source)] - TokenRequest { source: crate::client::retry::Error }, + TokenRequest { + source: crate::client::retry::RetryError, + }, #[error("Error getting token response body: {}", source)] - TokenResponseBody { source: reqwest::Error }, + TokenResponseBody { source: HttpError }, } impl From for crate::Error { @@ -259,7 +260,7 @@ impl TokenProvider for SelfSignedJwt { /// Fetch a fresh token async fn fetch_token( &self, - _client: &Client, + _client: &HttpClient, _retry: &RetryConfig, ) -> crate::Result>> { let now = seconds_since_epoch(); @@ -395,19 +396,20 @@ pub(crate) struct InstanceCredentialProvider {} /// Make a request to the metadata server to fetch a token, using a a given hostname. async fn make_metadata_request( - client: &Client, + client: &HttpClient, hostname: &str, retry: &RetryConfig, ) -> crate::Result { let url = format!("http://{hostname}/computeMetadata/v1/instance/service-accounts/default/token"); let response: TokenResponse = client - .request(Method::GET, url) + .get(url) .header("Metadata-Flavor", "Google") .query(&[("audience", "https://www.googleapis.com/oauth2/v4/token")]) .send_retry(retry) .await .map_err(|source| Error::TokenRequest { source })? + .into_body() .json() .await .map_err(|source| Error::TokenResponseBody { source })?; @@ -426,7 +428,7 @@ impl TokenProvider for InstanceCredentialProvider { /// References: async fn fetch_token( &self, - client: &Client, + client: &HttpClient, retry: &RetryConfig, ) -> crate::Result>> { let metadata_host = if let Ok(host) = env::var("GCE_METADATA_HOST") { @@ -459,18 +461,19 @@ impl TokenProvider for InstanceCredentialProvider { /// Make a request to the metadata server to fetch the client email, using a given hostname. async fn make_metadata_request_for_email( - client: &Client, + client: &HttpClient, hostname: &str, retry: &RetryConfig, ) -> crate::Result { let url = format!("http://{hostname}/computeMetadata/v1/instance/service-accounts/default/email",); let response = client - .request(Method::GET, url) + .get(url) .header("Metadata-Flavor", "Google") .send_retry(retry) .await .map_err(|source| Error::TokenRequest { source })? + .into_body() .text() .await .map_err(|source| Error::TokenResponseBody { source })?; @@ -495,7 +498,7 @@ impl TokenProvider for InstanceSigningCredentialProvider { /// References: async fn fetch_token( &self, - client: &Client, + client: &HttpClient, retry: &RetryConfig, ) -> crate::Result>> { let metadata_host = if let Ok(host) = env::var("GCE_METADATA_HOST") { @@ -605,13 +608,18 @@ impl AuthorizedUserSigningCredentials { Ok(Self { credential }) } - async fn client_email(&self, client: &Client, retry: &RetryConfig) -> crate::Result { + async fn client_email( + &self, + client: &HttpClient, + retry: &RetryConfig, + ) -> crate::Result { let response = client - .request(Method::GET, "https://oauth2.googleapis.com/tokeninfo") + .get("https://oauth2.googleapis.com/tokeninfo") .query(&[("access_token", &self.credential.refresh_token)]) .send_retry(retry) .await .map_err(|source| Error::TokenRequest { source })? + .into_body() .json::() .await .map_err(|source| Error::TokenResponseBody { source })?; @@ -626,7 +634,7 @@ impl TokenProvider for AuthorizedUserSigningCredentials { async fn fetch_token( &self, - client: &Client, + client: &HttpClient, retry: &RetryConfig, ) -> crate::Result>> { let email = self.client_email(client, retry).await?; @@ -647,12 +655,12 @@ impl TokenProvider for AuthorizedUserCredentials { async fn fetch_token( &self, - client: &Client, + client: &HttpClient, retry: &RetryConfig, ) -> crate::Result>> { let response = client - .request(Method::POST, DEFAULT_TOKEN_GCP_URI) - .form(&[ + .post(DEFAULT_TOKEN_GCP_URI) + .form([ ("grant_type", "refresh_token"), ("client_id", &self.client_id), ("client_secret", &self.client_secret), @@ -663,6 +671,7 @@ impl TokenProvider for AuthorizedUserCredentials { .send() .await .map_err(|source| Error::TokenRequest { source })? + .into_body() .json::() .await .map_err(|source| Error::TokenResponseBody { source })?; diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 2f6630d28c1c..5f8c67dd7266 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -48,7 +48,7 @@ use crate::{ use async_trait::async_trait; use client::GoogleCloudStorageClient; use futures::stream::BoxStream; -use hyper::Method; +use http::Method; use url::Url; use crate::client::get::GetClientExt; @@ -414,7 +414,7 @@ mod test { .unwrap_err() .to_string(); assert!( - err.contains("Client error with status 404 Not Found"), + err.contains("Server returned non-2xx status code: 404 Not Found"), "{}", err ) diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index 9983fdff5cd6..652d3268fd6e 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -18,29 +18,29 @@ use crate::client::get::GetClient; use crate::client::header::HeaderConfig; use crate::client::retry::{self, RetryConfig, RetryExt}; -use crate::client::GetOptionsExt; +use crate::client::{GetOptionsExt, HttpClient, HttpError, HttpResponse}; use crate::path::{Path, DELIMITER}; use crate::util::deserialize_rfc1123; use crate::{Attribute, Attributes, ClientOptions, GetOptions, ObjectMeta, PutPayload, Result}; use async_trait::async_trait; use bytes::Buf; use chrono::{DateTime, Utc}; -use hyper::header::{ +use http::header::{ CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LENGTH, CONTENT_TYPE, }; use percent_encoding::percent_decode_str; -use reqwest::{Method, Response, StatusCode}; +use reqwest::{Method, StatusCode}; use serde::Deserialize; use url::Url; #[derive(Debug, thiserror::Error)] enum Error { #[error("Request error: {}", source)] - Request { source: retry::Error }, + Request { source: retry::RetryError }, #[error("Request error: {}", source)] - Reqwest { source: reqwest::Error }, + Reqwest { source: HttpError }, #[error("Range request not supported by {}", href)] RangeNotSupported { href: String }, @@ -86,7 +86,7 @@ impl From for crate::Error { #[derive(Debug)] pub(crate) struct Client { url: Url, - client: reqwest::Client, + client: HttpClient, retry_config: RetryConfig, client_options: ClientOptions, } @@ -94,26 +94,26 @@ pub(crate) struct Client { impl Client { pub(crate) fn new( url: Url, + client: HttpClient, client_options: ClientOptions, retry_config: RetryConfig, - ) -> Result { - let client = client_options.client()?; - Ok(Self { + ) -> Self { + Self { url, retry_config, client_options, client, - }) + } } pub(crate) fn base_url(&self) -> &Url { &self.url } - fn path_url(&self, location: &Path) -> Url { + fn path_url(&self, location: &Path) -> String { let mut url = self.url.clone(); url.path_segments_mut().unwrap().extend(location.parts()); - url + url.to_string() } /// Create a directory with `path` using MKCOL @@ -125,7 +125,7 @@ impl Client { .extend(path.split(DELIMITER)); self.client - .request(method, url) + .request(method, String::from(url)) .send_retry(&self.retry_config) .await .map_err(|source| Error::Request { source })?; @@ -167,7 +167,7 @@ impl Client { location: &Path, payload: PutPayload, attributes: Attributes, - ) -> Result { + ) -> Result { let mut retry = false; loop { let url = self.path_url(location); @@ -222,7 +222,7 @@ impl Client { pub(crate) async fn list(&self, location: Option<&Path>, depth: &str) -> Result { let url = location .map(|path| self.path_url(path)) - .unwrap_or_else(|| self.url.clone()); + .unwrap_or_else(|| self.url.to_string()); let method = Method::from_bytes(b"PROPFIND").unwrap(); let result = self @@ -236,6 +236,7 @@ impl Client { let response = match result { Ok(result) => result + .into_body() .bytes() .await .map_err(|source| Error::Reqwest { source })?, @@ -332,7 +333,7 @@ impl GetClient for Client { user_defined_metadata_prefix: None, }; - async fn get_request(&self, path: &Path, options: GetOptions) -> Result { + async fn get_request(&self, path: &Path, options: GetOptions) -> Result { let url = self.path_url(path); let method = match options.head { true => Method::HEAD, diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index 899740d36db9..8fba4d7c67c7 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -41,6 +41,7 @@ use url::Url; use crate::client::get::GetClientExt; use crate::client::header::get_etag; +use crate::client::{HttpConnector, ReqwestConnector}; use crate::http::client::Client; use crate::path::Path; use crate::{ @@ -203,6 +204,7 @@ pub struct HttpBuilder { url: Option, client_options: ClientOptions, retry_config: RetryConfig, + http_connector: Option>, } impl HttpBuilder { @@ -235,13 +237,29 @@ impl HttpBuilder { self } + /// Overrides the [`HttpConnector`], by default uses [`ReqwestConnector`] + pub fn with_http_connector(mut self, connector: C) -> Self { + self.http_connector = Some(Arc::new(connector)); + self + } + /// Build an [`HttpStore`] with the configured options pub fn build(self) -> Result { let url = self.url.ok_or(Error::MissingUrl)?; let parsed = Url::parse(&url).map_err(|source| Error::UnableToParseUrl { url, source })?; + let client = match self.http_connector { + None => ReqwestConnector::default().connect(&self.client_options)?, + Some(x) => x.connect(&self.client_options)?, + }; + Ok(HttpStore { - client: Arc::new(Client::new(parsed, self.client_options, self.retry_config)?), + client: Arc::new(Client::new( + parsed, + client, + self.client_options, + self.retry_config, + )), }) } } diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index cffcbbdd4353..58f757b2972a 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -526,7 +526,7 @@ pub mod signer; pub mod throttle; #[cfg(feature = "cloud")] -mod client; +pub mod client; #[cfg(feature = "cloud")] pub use client::{ @@ -1411,7 +1411,7 @@ mod tests { pub(crate) async fn tagging(storage: Arc, validate: bool, get_tags: F) where F: Fn(Path) -> Fut + Send + Sync, - Fut: std::future::Future> + Send, + Fut: std::future::Future> + Send, { use bytes::Buf; use serde::Deserialize; @@ -1477,7 +1477,7 @@ mod tests { for path in [path, multi_path, buf_path] { let resp = get_tags(path.clone()).await.unwrap(); - let body = resp.bytes().await.unwrap(); + let body = resp.into_body().bytes().await.unwrap(); let mut resp: Tagging = quick_xml::de::from_reader(body.reader()).unwrap(); resp.list.tags.sort_by(|a, b| a.key.cmp(&b.key)); diff --git a/object_store/src/parse.rs b/object_store/src/parse.rs index bc65a0b8d1c8..4e67e5946a82 100644 --- a/object_store/src/parse.rs +++ b/object_store/src/parse.rs @@ -345,7 +345,7 @@ mod tests { #[cfg(feature = "http")] async fn test_url_http() { use crate::client::mock_server::MockServer; - use hyper::{header::USER_AGENT, Response}; + use http::{header::USER_AGENT, Response}; let server = MockServer::new().await; diff --git a/object_store/src/payload.rs b/object_store/src/payload.rs index d71f016bcd0d..055336b6a3c1 100644 --- a/object_store/src/payload.rs +++ b/object_store/src/payload.rs @@ -44,13 +44,6 @@ impl PutPayload { s.into() } - #[cfg(feature = "cloud")] - pub(crate) fn body(&self) -> reqwest::Body { - reqwest::Body::wrap_stream(futures::stream::iter( - self.clone().into_iter().map(Ok::<_, crate::Error>), - )) - } - /// Returns the total length of the [`Bytes`] in this payload pub fn content_length(&self) -> usize { self.0.iter().map(|b| b.len()).sum()