Skip to content

Commit d3a875f

Browse files
authored
feat(object_store): random IP address selection (#7123)
* feat(object_store): random IP address selection Closes #7117. * refactor: directly call stdlib w/o hyper-util
1 parent ef7d753 commit d3a875f

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

object_store/src/client/dns.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::net::ToSocketAddrs;
19+
20+
use rand::prelude::SliceRandom;
21+
use reqwest::dns::{Addrs, Name, Resolve, Resolving};
22+
use tokio::task::JoinSet;
23+
24+
type DynErr = Box<dyn std::error::Error + Send + Sync>;
25+
26+
#[derive(Debug)]
27+
pub(crate) struct ShuffleResolver;
28+
29+
impl Resolve for ShuffleResolver {
30+
fn resolve(&self, name: Name) -> Resolving {
31+
Box::pin(async move {
32+
// use `JoinSet` to propagate cancelation
33+
let mut tasks = JoinSet::new();
34+
tasks.spawn_blocking(move || {
35+
let it = (name.as_str(), 0).to_socket_addrs()?;
36+
let mut addrs = it.collect::<Vec<_>>();
37+
38+
addrs.shuffle(&mut rand::rng());
39+
40+
Ok(Box::new(addrs.into_iter()) as Addrs)
41+
});
42+
43+
tasks
44+
.join_next()
45+
.await
46+
.expect("spawned on task")
47+
.map_err(|err| Box::new(err) as DynErr)?
48+
})
49+
}
50+
}

object_store/src/client/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
2020
pub(crate) mod backoff;
2121

22+
mod dns;
23+
2224
#[cfg(test)]
2325
pub(crate) mod mock_server;
2426

@@ -110,6 +112,10 @@ pub enum ClientConfigKey {
110112
ProxyCaCertificate,
111113
/// List of hosts that bypass proxy
112114
ProxyExcludes,
115+
/// Randomize order addresses that the DNS resolution yields.
116+
///
117+
/// This will spread the connections accross more servers.
118+
RandomizeAddresses,
113119
/// Request timeout
114120
///
115121
/// The timeout is applied from when the request starts connecting until the
@@ -137,6 +143,7 @@ impl AsRef<str> for ClientConfigKey {
137143
Self::ProxyUrl => "proxy_url",
138144
Self::ProxyCaCertificate => "proxy_ca_certificate",
139145
Self::ProxyExcludes => "proxy_excludes",
146+
Self::RandomizeAddresses => "randomize_addresses",
140147
Self::Timeout => "timeout",
141148
Self::UserAgent => "user_agent",
142149
}
@@ -163,6 +170,7 @@ impl FromStr for ClientConfigKey {
163170
"proxy_url" => Ok(Self::ProxyUrl),
164171
"proxy_ca_certificate" => Ok(Self::ProxyCaCertificate),
165172
"proxy_excludes" => Ok(Self::ProxyExcludes),
173+
"randomize_addresses" => Ok(Self::RandomizeAddresses),
166174
"timeout" => Ok(Self::Timeout),
167175
"user_agent" => Ok(Self::UserAgent),
168176
_ => Err(super::Error::UnknownConfigurationKey {
@@ -245,6 +253,7 @@ pub struct ClientOptions {
245253
http2_max_frame_size: Option<ConfigValue<u32>>,
246254
http1_only: ConfigValue<bool>,
247255
http2_only: ConfigValue<bool>,
256+
randomize_addresses: ConfigValue<bool>,
248257
}
249258

250259
impl Default for ClientOptions {
@@ -280,6 +289,7 @@ impl Default for ClientOptions {
280289
// https://github.com/apache/arrow-rs/issues/5194
281290
http1_only: true.into(),
282291
http2_only: Default::default(),
292+
randomize_addresses: true.into(),
283293
}
284294
}
285295
}
@@ -322,6 +332,9 @@ impl ClientOptions {
322332
ClientConfigKey::ProxyUrl => self.proxy_url = Some(value.into()),
323333
ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate = Some(value.into()),
324334
ClientConfigKey::ProxyExcludes => self.proxy_excludes = Some(value.into()),
335+
ClientConfigKey::RandomizeAddresses => {
336+
self.randomize_addresses.parse(value);
337+
}
325338
ClientConfigKey::Timeout => self.timeout = Some(ConfigValue::Deferred(value.into())),
326339
ClientConfigKey::UserAgent => {
327340
self.user_agent = Some(ConfigValue::Deferred(value.into()))
@@ -358,6 +371,7 @@ impl ClientOptions {
358371
ClientConfigKey::ProxyUrl => self.proxy_url.clone(),
359372
ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate.clone(),
360373
ClientConfigKey::ProxyExcludes => self.proxy_excludes.clone(),
374+
ClientConfigKey::RandomizeAddresses => Some(self.randomize_addresses.to_string()),
361375
ClientConfigKey::Timeout => self.timeout.as_ref().map(fmt_duration),
362376
ClientConfigKey::UserAgent => self
363377
.user_agent
@@ -675,6 +689,10 @@ impl ClientOptions {
675689
// transparently decompress the body via the non-default `gzip` feature.
676690
builder = builder.no_gzip();
677691

692+
if self.randomize_addresses.get()? {
693+
builder = builder.dns_resolver(Arc::new(dns::ShuffleResolver));
694+
}
695+
678696
builder
679697
.https_only(!self.allow_http.get()?)
680698
.build()

0 commit comments

Comments
 (0)