-
Notifications
You must be signed in to change notification settings - Fork 131
feat: add pending request channel size configuration to ConnectionConfig #1328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -287,6 +287,7 @@ | |
pub(crate) authenticator: Option<Arc<dyn AuthenticatorProvider>>, | ||
pub(crate) address_translator: Option<Arc<dyn AddressTranslator>>, | ||
pub(crate) write_coalescing_delay: Option<WriteCoalescingDelay>, | ||
pub(crate) pending_request_channel_size: Option<usize>, | ||
|
||
pub(crate) keepalive_interval: Option<Duration>, | ||
pub(crate) keepalive_timeout: Option<Duration>, | ||
|
@@ -322,17 +323,44 @@ | |
authenticator: self.authenticator.clone(), | ||
address_translator: self.address_translator.clone(), | ||
write_coalescing_delay: self.write_coalescing_delay.clone(), | ||
pending_request_channel_size: self.pending_request_channel_size, | ||
|
||
keepalive_interval: self.keepalive_interval, | ||
keepalive_timeout: self.keepalive_timeout, | ||
tablet_sender: self.tablet_sender.clone(), | ||
identity: self.identity.clone(), | ||
} | ||
} | ||
|
||
/// Set the size of the pending request channel for each connection. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `size` - The maximum number of pending requests per connection. | ||
/// | ||
/// # Notes | ||
/// | ||
/// - This is different from cpp-driver's implementation, which uses a per-RequestProcessor queue. | ||
/// - The default is 2048, a balanced value between performance and memory usage. | ||
/// - Adjust based on your specific workload and system resources. | ||
/// | ||
/// # Example | ||
/// | ||
/// ``` | ||
/// let session = SessionBuilder::new() | ||
/// .connection_config( | ||
/// ConnectionConfig::new() | ||
/// .with_pending_request_channel_size(4096) | ||
/// ) | ||
/// .build() | ||
/// .await?; | ||
/// ``` | ||
pub fn with_pending_request_channel_size(mut self, size: usize) -> Self { | ||
self.pending_request_channel_size = Some(size); | ||
self | ||
} | ||
} | ||
|
||
/// Configuration used for new connections, customized for a specific endpoint. | ||
/// | ||
/// Created from [ConnectionConfig] using [ConnectionConfig::to_host_connection_config]. | ||
#[derive(Clone)] | ||
pub(crate) struct HostConnectionConfig { | ||
pub(crate) local_ip_address: Option<IpAddr>, | ||
|
@@ -349,6 +377,7 @@ | |
pub(crate) authenticator: Option<Arc<dyn AuthenticatorProvider>>, | ||
pub(crate) address_translator: Option<Arc<dyn AddressTranslator>>, | ||
pub(crate) write_coalescing_delay: Option<WriteCoalescingDelay>, | ||
pub(crate) pending_request_channel_size: Option<usize>, | ||
|
||
pub(crate) keepalive_interval: Option<Duration>, | ||
pub(crate) keepalive_timeout: Option<Duration>, | ||
|
@@ -357,6 +386,13 @@ | |
pub(crate) identity: SelfIdentity<'static>, | ||
} | ||
|
||
#[cfg(test)] | ||
impl HostConnectionConfig { | ||
fn is_tls(&self) -> bool { | ||
self.tls_config.is_some() | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
impl Default for HostConnectionConfig { | ||
fn default() -> Self { | ||
|
@@ -382,6 +418,7 @@ | |
tablet_sender: None, | ||
|
||
identity: SelfIdentity::default(), | ||
pending_request_channel_size: Some(2048), | ||
} | ||
} | ||
} | ||
Comment on lines
418
to
424
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The fact that in connection you wrote Why use an option here? I don't see anything optional - we need to create the channel, and it needs to have some size. Let's make it just usize. |
||
|
@@ -411,19 +448,11 @@ | |
tablet_sender: None, | ||
|
||
identity: SelfIdentity::default(), | ||
pending_request_channel_size: Some(2048), | ||
} | ||
} | ||
Comment on lines
449
to
453
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
} | ||
|
||
impl HostConnectionConfig { | ||
fn is_tls(&self) -> bool { | ||
self.tls_config.is_some() | ||
} | ||
} | ||
|
||
// Used to listen for fatal error in connection | ||
pub(crate) type ErrorReceiver = tokio::sync::oneshot::Receiver<ConnectionError>; | ||
|
||
impl Connection { | ||
// Returns new connection and ErrorReceiver which can be used to wait for a fatal error | ||
/// Opens a connection and makes it ready to send/receive CQL frames on it, | ||
|
@@ -432,7 +461,7 @@ | |
connect_address: SocketAddr, | ||
source_port: Option<u16>, | ||
config: HostConnectionConfig, | ||
) -> Result<(Self, ErrorReceiver), ConnectionError> { | ||
Check failure on line 464 in scylla/src/network/connection.rs
|
||
let stream_connector = tokio::time::timeout( | ||
config.connect_timeout, | ||
connect_with_source_ip_and_port(connect_address, config.local_ip_address, source_port), | ||
|
@@ -451,7 +480,7 @@ | |
} | ||
|
||
// TODO: What should be the size of the channel? | ||
let (sender, receiver) = mpsc::channel(1024); | ||
let (sender, receiver) = mpsc::channel(config.pending_request_channel_size.unwrap_or(1024)); | ||
let (error_sender, error_receiver) = tokio::sync::oneshot::channel(); | ||
Comment on lines
481
to
484
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove the TODO comment. |
||
// Unbounded because it allows for synchronous pushes | ||
let (orphan_notification_sender, orphan_notification_receiver) = mpsc::unbounded_channel(); | ||
|
@@ -1906,7 +1935,7 @@ | |
endpoint: &UntranslatedEndpoint, | ||
source_port: Option<u16>, | ||
config: &HostConnectionConfig, | ||
) -> Result<(Connection, ErrorReceiver), ConnectionError> { | ||
Check failure on line 1938 in scylla/src/network/connection.rs
|
||
/* Translate the address, if applicable. */ | ||
let addr = maybe_translated_addr(endpoint, config.address_translator.as_deref()).await?; | ||
|
||
|
@@ -1919,7 +1948,7 @@ | |
// Get OPTIONS SUPPORTED by the cluster. | ||
let mut supported = connection.get_options().await?; | ||
|
||
let shard_aware_port_key = match config.is_tls() { | ||
Check failure on line 1951 in scylla/src/network/connection.rs
|
||
true => options::SCYLLA_SHARD_AWARE_PORT_SSL, | ||
false => options::SCYLLA_SHARD_AWARE_PORT, | ||
}; | ||
|
@@ -2025,7 +2054,7 @@ | |
shard: Shard, | ||
sharder: Sharder, | ||
config: &HostConnectionConfig, | ||
) -> Result<(Connection, ErrorReceiver), ConnectionError> { | ||
Check failure on line 2057 in scylla/src/network/connection.rs
|
||
// Create iterator over all possible source ports for this shard | ||
let source_port_iter = | ||
sharder.iter_source_ports_for_shard_from_range(shard, &config.shard_aware_local_port_range); | ||
|
@@ -2307,7 +2336,7 @@ | |
use crate::statement::unprepared::Statement; | ||
use crate::test_utils::setup_tracing; | ||
use crate::utils::test_utils::{resolve_hostname, unique_keyspace_name, PerformDDL}; | ||
use futures::{StreamExt, TryStreamExt}; | ||
Check failure on line 2339 in scylla/src/network/connection.rs
|
||
use std::collections::HashMap; | ||
use std::net::SocketAddr; | ||
use std::sync::Arc; | ||
|
@@ -2337,7 +2366,7 @@ | |
) | ||
.await | ||
.unwrap(); | ||
let connection = Arc::new(connection); | ||
|
||
let ks = unique_keyspace_name(); | ||
|
||
|
@@ -2469,7 +2498,7 @@ | |
) | ||
.await | ||
.unwrap(); | ||
let connection = Arc::new(connection); | ||
|
||
connection | ||
.use_keyspace(&super::VerifiedKeyspaceName::new(ks, false).unwrap()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is worth describing what a "pending request" is. I think users may think that it is a request that was sent, but we did not yet receive response to (while in reality it is a request that has not yet been sent).