diff --git a/Cargo.toml b/Cargo.toml index 450b4372..8194adc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ aerospike-sync = {path = "./aerospike-sync", optional = true} aerospike-macro = {path = "./aerospike-macro", optional = true} [features] -default = ["async", "serialization"] +default = ["async", "serialization", "rt-tokio"] serialization = ["aerospike-core/serialization"] async = ["aerospike-core"] sync = ["aerospike-sync"] @@ -50,6 +50,7 @@ bencher = "0.1" serde_json = "1.0" rand = "0.7" lazy_static = "1.4" +ripemd = "0.1" aerospike-macro = {path = "./aerospike-macro"} aerospike-rt = {path = "./aerospike-rt"} futures = {version = "0.3.16" } diff --git a/aerospike-core/Cargo.toml b/aerospike-core/Cargo.toml index c34737da..843c34c8 100644 --- a/aerospike-core/Cargo.toml +++ b/aerospike-core/Cargo.toml @@ -8,13 +8,13 @@ edition = "2018" [dependencies] log = "0.4" byteorder = "1.3" -ripemd160 = "0.8" -base64 = "0.11" -crossbeam-queue = "0.2" -rand = "0.7" +ripemd = "0.1" +base64 = "0.13" +crossbeam-queue = "0.3" +rand = "0.8" lazy_static = "1.4" error-chain = "0.12" -pwhash = "0.3" +pwhash = "1.0" serde = { version = "1.0", features = ["derive"], optional = true } aerospike-rt = {path = "../aerospike-rt"} futures = {version = "0.3.16" } @@ -26,7 +26,7 @@ rt-tokio = ["aerospike-rt/rt-tokio"] rt-async-std = ["aerospike-rt/rt-async-std"] [dev-dependencies] -env_logger = "0.9.3" +env_logger = "0.9" hex = "0.4" bencher = "0.1" serde_json = "1.0" diff --git a/aerospike-core/src/batch/batch_executor.rs b/aerospike-core/src/batch/batch_executor.rs index 7acf8ae7..3d4ac583 100644 --- a/aerospike-core/src/batch/batch_executor.rs +++ b/aerospike-core/src/batch/batch_executor.rs @@ -13,18 +13,16 @@ // License for the specific language governing permissions and limitations under // the License. -use std::cmp; use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use crate::batch::BatchRead; use crate::cluster::partition::Partition; use crate::cluster::{Cluster, Node}; use crate::commands::BatchReadCommand; -use crate::errors::{Error, Result}; +use crate::errors::Result; use crate::policy::{BatchPolicy, Concurrency}; use crate::Key; -use futures::lock::Mutex; pub struct BatchExecutor { cluster: Arc, @@ -40,80 +38,47 @@ impl BatchExecutor { policy: &BatchPolicy, batch_reads: Vec, ) -> Result> { - let mut batch_nodes = self.get_batch_nodes(&batch_reads).await?; + let batch_nodes = self.get_batch_nodes(&batch_reads, policy.replica).await?; let jobs = batch_nodes - .drain() + .into_iter() .map(|(node, reads)| BatchReadCommand::new(policy, node, reads)) .collect(); - let reads = self.execute_batch_jobs(jobs, &policy.concurrency).await?; - let mut res: Vec = vec![]; - for mut read in reads { - res.append(&mut read.batch_reads); - } - Ok(res) + let reads = self.execute_batch_jobs(jobs, policy.concurrency).await?; + let mut all_results: Vec<_> = reads.into_iter().flat_map(|cmd|cmd.batch_reads).collect(); + all_results.sort_by_key(|(_, i)|*i); + Ok(all_results.into_iter().map(|(b, _)|b).collect()) } async fn execute_batch_jobs( &self, jobs: Vec, - concurrency: &Concurrency, + concurrency: Concurrency, ) -> Result> { - let threads = match *concurrency { - Concurrency::Sequential => 1, - Concurrency::Parallel => jobs.len(), - Concurrency::MaxThreads(max) => cmp::min(max, jobs.len()), - }; - let size = jobs.len() / threads; - let mut overhead = jobs.len() % threads; - let last_err: Arc>> = Arc::default(); - let mut slice_index = 0; - let mut handles = vec![]; - let res = Arc::new(Mutex::new(vec![])); - for _ in 0..threads { - let mut thread_size = size; - if overhead >= 1 { - thread_size += 1; - overhead -= 1; - } - let slice = Vec::from(&jobs[slice_index..slice_index + thread_size]); - slice_index = thread_size + 1; - let last_err = last_err.clone(); - let res = res.clone(); - let handle = aerospike_rt::spawn(async move { - //let next_job = async { jobs.lock().await.next().await}; - for mut cmd in slice { - if let Err(err) = cmd.execute().await { - *last_err.lock().await = Some(err); - }; - res.lock().await.push(cmd); - } - }); - handles.push(handle); - } - futures::future::join_all(handles).await; - match Arc::try_unwrap(last_err).unwrap().into_inner() { - None => Ok(res.lock().await.to_vec()), - Some(err) => Err(err), + let handles = jobs.into_iter().map(|job|job.execute(self.cluster.clone())); + match concurrency { + Concurrency::Sequential => futures::future::join_all(handles).await.into_iter().collect(), + Concurrency::Parallel => futures::future::join_all(handles.map(aerospike_rt::spawn)).await.into_iter().map(|value|value.map_err(|e|e.to_string())?).collect(), } } async fn get_batch_nodes( &self, batch_reads: &[BatchRead], - ) -> Result, Vec>> { + replica: crate::policy::Replica, + ) -> Result, Vec<(BatchRead, usize)>>> { let mut map = HashMap::new(); - for (_, batch_read) in batch_reads.iter().enumerate() { - let node = self.node_for_key(&batch_read.key).await?; + for (index, batch_read) in batch_reads.iter().enumerate() { + let node = self.node_for_key(&batch_read.key, replica).await?; map.entry(node) .or_insert_with(Vec::new) - .push(batch_read.clone()); + .push((batch_read.clone(), index)); } Ok(map) } - async fn node_for_key(&self, key: &Key) -> Result> { + async fn node_for_key(&self, key: &Key, replica: crate::policy::Replica) -> Result> { let partition = Partition::new_by_key(key); - let node = self.cluster.get_node(&partition).await?; + let node = self.cluster.get_node(&partition, replica, Weak::new()).await?; Ok(node) } } diff --git a/aerospike-core/src/client.rs b/aerospike-core/src/client.rs index f9e0751d..63b6d39d 100644 --- a/aerospike-core/src/client.rs +++ b/aerospike-core/src/client.rs @@ -184,7 +184,7 @@ impl Client { T: Into + Send + Sync + 'static, { let bins = bins.into(); - let mut command = ReadCommand::new(policy, self.cluster.clone(), key, bins); + let mut command = ReadCommand::new(&policy.base_policy, self.cluster.clone(), key, bins, policy.replica); command.execute().await?; Ok(command.record.unwrap()) } diff --git a/aerospike-core/src/cluster/mod.rs b/aerospike-core/src/cluster/mod.rs index a3127649..d0f9b300 100644 --- a/aerospike-core/src/cluster/mod.rs +++ b/aerospike-core/src/cluster/mod.rs @@ -21,7 +21,7 @@ pub mod partition_tokenizer; use aerospike_rt::time::{Duration, Instant}; use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::vec::Vec; pub use self::node::Node; @@ -30,6 +30,7 @@ use self::node_validator::NodeValidator; use self::partition::Partition; use self::partition_tokenizer::PartitionTokenizer; +use crate::commands::Message; use crate::errors::{ErrorKind, Result}; use crate::net::Host; use crate::policy::ClientPolicy; @@ -38,6 +39,65 @@ use futures::channel::mpsc; use futures::channel::mpsc::{Receiver, Sender}; use futures::lock::Mutex; +#[derive(Debug)] +pub struct PartitionForNamespace { + nodes: Vec<(u32, Option>)>, + replicas: usize, +} +type PartitionTable = HashMap; + +impl Default for PartitionForNamespace { + fn default() -> Self { + Self { nodes: Vec::default(), replicas: 0 } + } +} + +impl PartitionForNamespace { + fn all_replicas(&self, index: usize) -> impl Iterator>> + '_ { + (0..self.replicas).map(move |i|self.nodes.get(i * node::PARTITIONS + index).and_then(|(_, item)|item.clone())) + } + + async fn get_node(&self, cluster: &Cluster, partition: &Partition<'_>, replica: crate::policy::Replica, last_tried: Weak) -> Result> { + fn get_next_in_sequence>, F: Fn()->I>(get_sequence: F, last_tried: Weak) -> Option> { + if let Some(last_tried) = last_tried.upgrade() { + // If this isn't the first attempt, try the replica immediately after in sequence (that is actually valid) + let mut replicas = get_sequence(); + while let Some(replica) = replicas.next() { + if Arc::ptr_eq(&replica, &last_tried) { + if let Some(in_sequence_after) = replicas.next() { + return Some(in_sequence_after) + } + + // No more after this? Drop through to try from the beginning. + break; + } + } + } + // If we get here, we're on the first attempt, the last node is already gone, or there are no more nodes in sequence. Just find the next populated option. + get_sequence().next() + } + + + let node = match replica { + crate::policy::Replica::Master => self.all_replicas(partition.partition_id).next().flatten(), + crate::policy::Replica::Sequence => { + get_next_in_sequence(||self.all_replicas(partition.partition_id).flatten(), last_tried) + }, + crate::policy::Replica::PreferRack => { + let rack_ids = cluster.client_policy.rack_ids.as_ref().ok_or_else(||"Attempted to use Replica::PreferRack without configuring racks in client policy".to_string())?; + get_next_in_sequence(|| + self + .all_replicas(partition.partition_id) + .flatten() + .filter(|node|node.is_in_rack(partition.namespace, rack_ids)), last_tried.clone()) + .or_else(||get_next_in_sequence(||self.all_replicas(partition.partition_id).flatten(), last_tried)) + }, + }; + + node.ok_or_else(||format!("Cannot get appropriate node for namespace: {} partition: {}", partition.namespace, partition.partition_id).into()) + } +} + // Cluster encapsulates the aerospike cluster nodes and manages // them. #[derive(Debug)] @@ -51,8 +111,8 @@ pub struct Cluster { // Active nodes in cluster. nodes: Arc>>>, - // Hints for best node for a partition - partition_write_map: Arc>>>>, + // Which partition contains the key. + partition_write_map: RwLock, // Random node index. node_index: AtomicIsize, @@ -73,7 +133,7 @@ impl Cluster { aliases: Arc::new(RwLock::new(HashMap::new())), nodes: Arc::new(RwLock::new(vec![])), - partition_write_map: Arc::new(RwLock::new(HashMap::new())), + partition_write_map: RwLock::new(HashMap::default()), node_index: AtomicIsize::new(0), tend_channel: Mutex::new(tx), @@ -137,6 +197,7 @@ impl Cluster { // Refresh all known nodes. for node in nodes { let old_gen = node.partition_generation(); + let old_rebalance_gen = node.rebalance_generation(); if node.is_active() { match node.refresh(self.aliases().await).await { Ok(friends) => { @@ -147,7 +208,11 @@ impl Cluster { } if old_gen != node.partition_generation() { - self.update_partitions(node.clone()).await?; + self.update_partitions(&node).await?; + } + + if old_rebalance_gen != node.rebalance_generation() { + self.update_rack_ids(&node).await?; } } Err(err) => { @@ -231,23 +296,14 @@ impl Cluster { Ok(aliases.contains_key(host)) } - async fn set_partitions(&self, partitions: HashMap>>) { - let mut partition_map = self.partition_write_map.write().await; - *partition_map = partitions; - } - - fn partitions(&self) -> Arc>>>> { - self.partition_write_map.clone() - } pub async fn node_partitions(&self, node: &Node, namespace: &str) -> Vec { let mut res: Vec = vec![]; - let partitions = self.partitions(); - let partitions = partitions.read().await; + let partitions = self.partition_write_map.read().await; if let Some(node_array) = partitions.get(namespace) { - for (i, tnode) in node_array.iter().enumerate() { - if node == tnode.as_ref() { + for (i, (_, tnode)) in node_array.nodes.iter().enumerate().take(node::PARTITIONS) { + if tnode.as_ref().map_or(false, |tnode|tnode.as_ref() == node) { res.push(i as u16); } } @@ -256,15 +312,29 @@ impl Cluster { res } - pub async fn update_partitions(&self, node: Arc) -> Result<()> { + pub async fn update_partitions(&self, node: &Arc) -> Result<()> { let mut conn = node.get_connection().await?; - let tokens = PartitionTokenizer::new(&mut conn).await.map_err(|e| { + let tokens = PartitionTokenizer::new(&mut conn, node).await.map_err(|e| { conn.invalidate(); e })?; - let nmap = tokens.update_partition(self.partitions(), node).await?; - self.set_partitions(nmap).await; + let mut partitions = self.partition_write_map.write().await; + tokens.update_partition(&mut partitions, node)?; + + Ok(()) + } + + pub async fn update_rack_ids(&self, node: &Arc) -> Result<()> { + const RACK_IDS: &str = "rack-ids"; + let mut conn = node.get_connection().await?; + let info_map = Message::info(&mut conn, &[RACK_IDS, node::REBALANCE_GENERATION]).await?; + if let Some(buf) = info_map.get(RACK_IDS) { + node.parse_rack(buf.as_str())?; + } + + // We re-update the rebalance generation right now (in case its changed since it was last polled) + node.update_rebalance_generation(&info_map)?; Ok(()) } @@ -440,10 +510,11 @@ impl Cluster { } async fn find_node_in_partition_map(&self, filter: Arc) -> bool { + let filter = Some(filter); let partitions = self.partition_write_map.read().await; (*partitions) .values() - .any(|map| map.iter().any(|node| *node == filter)) + .any(|map| map.nodes.iter().any(|(_, node)| *node == filter)) } async fn add_nodes(&self, friend_list: &[Arc]) { @@ -492,17 +563,14 @@ impl Cluster { *nodes = new_nodes; } - pub async fn get_node(&self, partition: &Partition<'_>) -> Result> { - let partitions = self.partitions(); - let partitions = partitions.read().await; - - if let Some(node_array) = partitions.get(partition.namespace) { - if let Some(node) = node_array.get(partition.partition_id) { - return Ok(node.clone()); - } - } + pub async fn get_node(&self, partition: &Partition<'_>, replica: crate::policy::Replica, last_tried: Weak) -> Result> { + let partitions = self.partition_write_map.read().await; - self.get_random_node().await + let namespace = partitions + .get(partition.namespace) + .ok_or_else(||format!("Cannot get appropriate node for namespace: {}", partition.namespace))?; + + namespace.get_node(self, partition, replica, last_tried).await } pub async fn get_random_node(&self) -> Result> { diff --git a/aerospike-core/src/cluster/node.rs b/aerospike-core/src/cluster/node.rs index 8ea6b89a..174458b2 100644 --- a/aerospike-core/src/cluster/node.rs +++ b/aerospike-core/src/cluster/node.rs @@ -13,7 +13,7 @@ // License for the specific language governing permissions and limitations under // the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt; use std::hash::{Hash, Hasher}; use std::result::Result as StdResult; @@ -21,7 +21,7 @@ use std::str::FromStr; use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; use std::sync::Arc; -use crate::cluster::node_validator::NodeValidator; +use crate::cluster::node_validator::{NodeValidator, NodeFeatures}; use crate::commands::Message; use crate::errors::{ErrorKind, Result, ResultExt}; use crate::net::{ConnectionPool, Host, PooledConnection}; @@ -29,6 +29,8 @@ use crate::policy::ClientPolicy; use aerospike_rt::RwLock; pub const PARTITIONS: usize = 4096; +pub const PARTITION_GENERATION: &str = "partition-generation"; +pub const REBALANCE_GENERATION: &str = "rebalance-generation"; /// The node instance holding connections and node settings. /// Exposed for usage in the sync client interface. @@ -44,10 +46,14 @@ pub struct Node { failures: AtomicUsize, partition_generation: AtomicIsize, + rebalance_generation: AtomicIsize, + // Which racks are these things part of + rack_ids: std::sync::Mutex>, refresh_count: AtomicUsize, reference_count: AtomicUsize, responded: AtomicBool, - active: AtomicBool, + active: AtomicBool + features: NodeFeatures, } impl Node { @@ -60,6 +66,7 @@ impl Node { address: nv.address.clone(), host: nv.aliases[0].clone(), + rebalance_generation: AtomicIsize::new(if client_policy.rack_ids.is_some() {-1} else {0}), connection_pool: ConnectionPool::new(nv.aliases[0].clone(), client_policy), failures: AtomicUsize::new(0), partition_generation: AtomicIsize::new(-1), @@ -67,6 +74,8 @@ impl Node { reference_count: AtomicUsize::new(0), responded: AtomicBool::new(false), active: AtomicBool::new(true), + features: nv.features, + rack_ids: std::sync::Mutex::new(HashMap::new()), } } // Returns the Node address @@ -87,6 +96,10 @@ impl Node { pub fn host(&self) -> Host { self.host.clone() } + // Returns what the node can do + pub const fn features(&self) -> &NodeFeatures { + &self.features + } // Returns the reference count pub fn reference_count(&self) -> usize { @@ -98,12 +111,17 @@ impl Node { self.reference_count.store(0, Ordering::Relaxed); self.responded.store(false, Ordering::Relaxed); self.refresh_count.fetch_add(1, Ordering::Relaxed); - let commands = vec![ + let mut commands = vec![ "node", "cluster-name", - "partition-generation", + PARTITION_GENERATION, self.services_name(), ]; + + if self.client_policy.rack_ids.is_some() { + commands.push(REBALANCE_GENERATION); + } + let info_map = self .info(&commands) .await @@ -115,7 +133,9 @@ impl Node { .add_friends(current_aliases, &info_map) .chain_err(|| "Failed to add friends")?; self.update_partitions(&info_map) - .chain_err(|| "Failed to update partitions")?; + .chain_err(|| "Failed to update partition generation")?; + self.update_rebalance_generation(&info_map) + .chain_err(|| "Failed to update rebalance generation")?; self.reset_failures(); Ok(friends) } @@ -151,9 +171,7 @@ impl Node { } fn verify_cluster_name(&self, info_map: &HashMap) -> Result<()> { - match self.client_policy.cluster_name { - None => Ok(()), - Some(ref expected) => match info_map.get("cluster-name") { + self.client_policy.cluster_name.as_ref().map_or_else(|| Ok(()), |expected| match info_map.get("cluster-name") { None => Err(ErrorKind::InvalidNode("Missing cluster name".to_string()).into()), Some(info_name) if info_name == expected => Ok(()), Some(info_name) => { @@ -165,8 +183,7 @@ impl Node { )) .into()) } - }, - } + }) } fn add_friends( @@ -196,12 +213,7 @@ impl Node { let host = friend_info.next().unwrap(); let port = u16::from_str(friend_info.next().unwrap())?; - let alias = match self.client_policy.ip_map { - Some(ref ip_map) if ip_map.contains_key(host) => { - Host::new(ip_map.get(host).unwrap(), port) - } - _ => Host::new(host, port), - }; + let alias = Host::new(self.client_policy.ip_map.as_ref().and_then(|ip_map|ip_map.get(host)).map_or(host, String::as_str), port); if current_aliases.contains_key(&alias) { self.reference_count.fetch_add(1, Ordering::Relaxed); @@ -213,8 +225,8 @@ impl Node { Ok(friends) } - fn update_partitions(&self, info_map: &HashMap) -> Result<()> { - match info_map.get("partition-generation") { + pub(crate) fn update_partitions(&self, info_map: &HashMap) -> Result<()> { + match info_map.get(PARTITION_GENERATION) { None => bail!(ErrorKind::BadResponse( "Missing partition generation".to_string() )), @@ -227,6 +239,29 @@ impl Node { Ok(()) } + pub fn update_rebalance_generation(&self, info_map: &HashMap) -> Result<()> { + if let Some(gen_string) = info_map.get(REBALANCE_GENERATION) { + let gen = gen_string.parse::()?; + self.rebalance_generation.store(gen, Ordering::Relaxed); + } + + Ok(()) + } + + pub fn is_in_rack(&self, namespace: &str, rack_ids: &HashSet) -> bool { + self.rack_ids.lock().map_or(false, |locked| locked.get(namespace).map_or(false, |r|rack_ids.contains(r))) + } + + pub fn parse_rack(&self, buf: &str) -> Result<()> { + let new_table = buf.split(';').map(|entry|{ + let (key, val) = entry.split_once(':').ok_or("Invalid rack entry")?; + Ok((key.to_string(), val.parse::()?)) + }).collect::>>()?; + + *self.rack_ids.lock().map_err(|err|err.to_string())? = new_table; + Ok(()) + } + // Get a connection to the node from the connection pool pub async fn get_connection(&self) -> Result { self.connection_pool.get().await @@ -286,6 +321,11 @@ impl Node { pub fn partition_generation(&self) -> isize { self.partition_generation.load(Ordering::Relaxed) } + + // Get the rebalance generation + pub fn rebalance_generation(&self) -> isize { + self.rebalance_generation.load(Ordering::Relaxed) + } } impl Hash for Node { diff --git a/aerospike-core/src/cluster/node_validator.rs b/aerospike-core/src/cluster/node_validator.rs index 6739c4f6..2a0b226a 100644 --- a/aerospike-core/src/cluster/node_validator.rs +++ b/aerospike-core/src/cluster/node_validator.rs @@ -22,6 +22,17 @@ use crate::errors::{ErrorKind, Result, ResultExt}; use crate::net::{Connection, Host}; use crate::policy::ClientPolicy; + +#[allow(clippy::struct_excessive_bools)] +#[derive(Copy, Clone, Default, Debug)] +pub struct NodeFeatures { + pub supports_float: bool, + pub supports_batch_index: bool, + pub supports_replicas_all: bool, + pub supports_replicas: bool, + pub supports_geo: bool, +} + // Validates a Database server node #[allow(clippy::struct_excessive_bools)] #[derive(Clone)] @@ -31,10 +42,7 @@ pub struct NodeValidator { pub address: String, pub client_policy: ClientPolicy, pub use_new_info: bool, - pub supports_float: bool, - pub supports_batch_index: bool, - pub supports_replicas_all: bool, - pub supports_geo: bool, + pub features: NodeFeatures, } // Generates a node validator @@ -46,10 +54,7 @@ impl NodeValidator { address: "".to_string(), client_policy: cluster.client_policy().clone(), use_new_info: true, - supports_float: false, - supports_batch_index: false, - supports_replicas_all: false, - supports_geo: false, + features: NodeFeatures::default(), } } @@ -114,12 +119,15 @@ impl NodeValidator { self.address = alias.address(); if let Some(features) = info_map.get("features") { - self.set_features(features); + self.features.set_features(features); } Ok(()) } +} + +impl NodeFeatures { fn set_features(&mut self, features: &str) { let features = features.split(';'); for feature in features { @@ -128,6 +136,7 @@ impl NodeValidator { "batch-index" => self.supports_batch_index = true, "replicas-all" => self.supports_replicas_all = true, "geo" => self.supports_geo = true, + "replicas" => self.supports_replicas = true, _ => (), } } diff --git a/aerospike-core/src/cluster/partition.rs b/aerospike-core/src/cluster/partition.rs index ac08265e..dacd9deb 100644 --- a/aerospike-core/src/cluster/partition.rs +++ b/aerospike-core/src/cluster/partition.rs @@ -36,7 +36,7 @@ impl<'a> Partition<'a> { } pub fn new_by_key(key: &'a Key) -> Self { - let mut rdr = Cursor::new(&key.digest[0..4]); + let mut rdr = Cursor::new(&key.digest[0..2]); Partition { namespace: &key.namespace, @@ -44,7 +44,7 @@ impl<'a> Partition<'a> { // CAN'T USE MOD directly - mod will give negative numbers. // First AND makes positive and negative correctly, then mod. // For any x, y : x % 2^y = x & (2^y - 1); the second method is twice as fast - partition_id: rdr.read_u32::().unwrap() as usize & (node::PARTITIONS - 1), + partition_id: rdr.read_u16::().unwrap() as usize & (node::PARTITIONS - 1), } } } diff --git a/aerospike-core/src/cluster/partition_tokenizer.rs b/aerospike-core/src/cluster/partition_tokenizer.rs index 9d521dda..bfa4f7a5 100644 --- a/aerospike-core/src/cluster/partition_tokenizer.rs +++ b/aerospike-core/src/cluster/partition_tokenizer.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::Entry::{Occupied, Vacant}; -use std::collections::HashMap; use std::str; use std::sync::Arc; use std::vec::Vec; @@ -23,61 +21,111 @@ use crate::cluster::Node; use crate::commands::Message; use crate::errors::{ErrorKind, Result}; use crate::net::Connection; -use aerospike_rt::RwLock; -const REPLICAS_NAME: &str = "replicas-master"; +use super::{PartitionTable, PartitionForNamespace}; // Validates a Database server node #[derive(Debug, Clone)] pub struct PartitionTokenizer { buffer: Vec, + request_type: RequestedReplicas, +} + +#[derive(Debug, Clone)] +enum RequestedReplicas { + ReplicasMaster, // Ancient + ReplicasAll, // Old + Replicas, // Modern, +} + +impl RequestedReplicas { + const fn command(&self) -> &'static str { + match self { + RequestedReplicas::ReplicasMaster => "replicas-master", + RequestedReplicas::ReplicasAll => "replicas-all", + RequestedReplicas::Replicas => "replicas", + } + } } impl PartitionTokenizer { - pub async fn new(conn: &mut Connection) -> Result { - let info_map = Message::info(conn, &[REPLICAS_NAME]).await?; - if let Some(buf) = info_map.get(REPLICAS_NAME) { + pub async fn new(conn: &mut Connection, node: &Arc) -> Result { + let request_type = match (node.features().supports_replicas, node.features().supports_replicas_all) { + (true, _) => RequestedReplicas::Replicas, + (false, true) => RequestedReplicas::ReplicasAll, + (false, false) => RequestedReplicas::ReplicasMaster, + }; + + let command = request_type.command(); + let info_map = Message::info(conn, &[command, node::PARTITION_GENERATION]).await?; + if let Some(buf) = info_map.get(command) { return Ok(PartitionTokenizer { buffer: buf.as_bytes().to_owned(), + request_type, }); } + + // We re-update the partitions right now (in case its changed since it was last polled) + node.update_partitions(&info_map)?; + bail!(ErrorKind::BadResponse("Missing replicas info".to_string())) } - pub async fn update_partition( + pub fn update_partition( &self, - nmap: Arc>>>>, - node: Arc, - ) -> Result>>> { - let mut amap = nmap.read().await.clone(); - + nmap: &mut PartitionTable, + node: &Arc, + ) -> Result<()> { // :;:; ... let part_str = str::from_utf8(&self.buffer)?; - let mut parts = part_str.trim_end().split(|c| c == ':' || c == ';'); - loop { - match (parts.next(), parts.next()) { - (Some(ns), Some(part)) => { - let restore_buffer = base64::decode(part)?; - match amap.entry(ns.to_string()) { - Vacant(entry) => { - entry.insert(vec![node.clone(); node::PARTITIONS]); - } - Occupied(mut entry) => { - for (idx, item) in entry.get_mut().iter_mut().enumerate() { - if restore_buffer[idx >> 3] & (0x80 >> (idx & 7) as u8) != 0 { - *item = node.clone(); - } + for part in part_str.trim_end().split(';') { + match part.split_once(':') { + Some((ns, info)) => { + let mut info_section = info.split(','); + let reigime = if matches!(self.request_type, RequestedReplicas::Replicas) { + info_section + .next() + .ok_or_else(||ErrorKind::BadResponse("Missing reigime".to_string()))? + .parse() + .map_err(|err|ErrorKind::BadResponse(format!("Invalid reigime: {err}")))? + } else { + 0 + }; + + let n_replicas = if matches!(self.request_type, RequestedReplicas::Replicas | RequestedReplicas::ReplicasAll) { + info_section + .next() + .ok_or_else(||ErrorKind::BadResponse("Missing replicas count".to_string()))? + .parse() + .map_err(|err|ErrorKind::BadResponse(format!("Invalid replicas count: {err}")))? + } else { + 1 + }; + + let entry = nmap.entry(ns.to_string()).or_insert_with(PartitionForNamespace::default); + + if entry.replicas != n_replicas && reigime >= entry.nodes.iter().map(|(r, _)|*r).max().unwrap_or_default() { + let wanted_size = n_replicas * node::PARTITIONS; + entry.nodes.resize_with(wanted_size, ||(0, None)); + entry.replicas = n_replicas; + } + + for (section, replica) in info_section.zip(entry.nodes.chunks_mut(node::PARTITIONS)) { + let restore_buffer = base64::decode(section)?; + for (idx, (this_reigimes, item)) in replica.iter_mut().enumerate() { + if restore_buffer[idx >> 3] & (0x80 >> (idx & 7) as u8) != 0 && reigime >= *this_reigimes { + *item = Some(node.clone()); + *this_reigimes = reigime; } } } } - (None, None) => break, _ => bail!(ErrorKind::BadResponse( "Error parsing partition info".to_string() )), } } - Ok(amap) + Ok(()) } } diff --git a/aerospike-core/src/commands/admin_command.rs b/aerospike-core/src/commands/admin_command.rs index 19d60788..c2c1a604 100644 --- a/aerospike-core/src/commands/admin_command.rs +++ b/aerospike-core/src/commands/admin_command.rs @@ -268,6 +268,6 @@ impl AdminCommand { }, &password, ) - .map_err(|e| e.into()) + .map_err(std::convert::Into::into) } } diff --git a/aerospike-core/src/commands/batch_read_command.rs b/aerospike-core/src/commands/batch_read_command.rs index d6487bbd..0d5af709 100644 --- a/aerospike-core/src/commands/batch_read_command.rs +++ b/aerospike-core/src/commands/batch_read_command.rs @@ -12,15 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use aerospike_rt::time::{Duration, Instant}; +use aerospike_rt::time::Instant; use std::collections::HashMap; use std::sync::Arc; -use crate::cluster::Node; -use crate::commands::{self, Command}; +use crate::cluster::{Node, Cluster}; +use crate::cluster::partition::Partition; +use crate::commands; use crate::errors::{ErrorKind, Result, ResultExt}; use crate::net::Connection; -use crate::policy::{BatchPolicy, Policy, PolicyLike}; +use crate::policy::{BatchPolicy, Policy, PolicyLike, Replica}; use crate::{value, BatchRead, Record, ResultCode, Value}; use aerospike_rt::sleep; @@ -33,11 +34,11 @@ struct BatchRecord { pub struct BatchReadCommand { policy: BatchPolicy, pub node: Arc, - pub batch_reads: Vec, + pub batch_reads: Vec<(BatchRead, usize)>, } impl BatchReadCommand { - pub fn new(policy: &BatchPolicy, node: Arc, batch_reads: Vec) -> Self { + pub fn new(policy: &BatchPolicy, node: Arc, batch_reads: Vec<(BatchRead, usize)>) -> Self { BatchReadCommand { policy: policy.clone(), node, @@ -45,7 +46,7 @@ impl BatchReadCommand { } } - pub async fn execute(&mut self) -> Result<()> { + pub async fn execute(mut self, cluster: Arc) -> Result { let mut iterations = 0; let base_policy = self.policy.base().clone(); @@ -54,6 +55,30 @@ impl BatchReadCommand { // Execute command until successful, timed out or maximum iterations have been reached. loop { + let success = if iterations & 1 == 0 || matches!(self.policy.replica, Replica::Master) { + // For even iterations, we request all keys from the same node for efficiency. + Self::request_group(&mut self.batch_reads, &self.policy, self.node.clone()).await? + } else { + // However, for odd iterations try the second choice for each. Instead of re-sharding the batch (as the second choice may not correspond to the first), just try each by itself. + let mut all_successful = true; + for individual_read in self.batch_reads.chunks_mut(1) { + // Find somewhere else to try. + let partition = Partition::new_by_key(&individual_read[0].0.key); + let node = cluster.get_node(&partition, self.policy.replica, Arc::downgrade(&self.node)).await?; + + if !Self::request_group(individual_read, &self.policy, node).await? { + all_successful = false; + break; + } + } + all_successful + }; + + if success { + // command has completed successfully. Exit method. + return Ok(self); + } + iterations += 1; // too many retries @@ -67,98 +92,88 @@ impl BatchReadCommand { } // Sleep before trying again, after the first iteration - if iterations > 1 { - if let Some(sleep_between_retries) = base_policy.sleep_between_retries() { - sleep(sleep_between_retries).await; - } + if let Some(sleep_between_retries) = base_policy.sleep_between_retries() { + sleep(sleep_between_retries).await; } // check for command timeout if let Some(deadline) = deadline { if Instant::now() > deadline { - break; + bail!(ErrorKind::Connection("Timeout".to_string())); } } + } + } - // set command node, so when you return a record it has the node - let node = match self.get_node().await { - Ok(node) => node, - Err(_) => continue, // Node is currently inactive. Retry. - }; - - let mut conn = match node.get_connection().await { - Ok(conn) => conn, - Err(err) => { - warn!("Node {}: {}", node, err); - continue; - } - }; - - self.prepare_buffer(&mut conn) - .chain_err(|| "Failed to prepare send buffer")?; - self.write_timeout(&mut conn, base_policy.timeout()) - .await - .chain_err(|| "Failed to set timeout for send buffer")?; - - // Send command. - if let Err(err) = self.write_buffer(&mut conn).await { - // IO errors are considered temporary anomalies. Retry. - // Close socket to flush out possible garbage. Do not put back in pool. - conn.invalidate(); + async fn request_group(batch_reads: &mut [(BatchRead, usize)], policy: &BatchPolicy, node: Arc) -> Result { + let mut conn = match node.get_connection().await { + Ok(conn) => conn, + Err(err) => { warn!("Node {}: {}", node, err); - continue; - } - - // Parse results. - if let Err(err) = self.parse_result(&mut conn).await { - // close the connection - // cancelling/closing the batch/multi commands will return an error, which will - // close the connection to throw away its data and signal the server about the - // situation. We will not put back the connection in the buffer. - if !commands::keep_connection(&err) { - conn.invalidate(); - } - return Err(err); + return Ok(false); } + }; - // command has completed successfully. Exit method. - return Ok(()); + conn.buffer + .set_batch_read(policy, batch_reads) + .chain_err(|| "Failed to prepare send buffer")?; + + conn.buffer.write_timeout(policy.base().timeout()); + + // Send command. + if let Err(err) = conn.flush().await { + // IO errors are considered temporary anomalies. Retry. + // Close socket to flush out possible garbage. Do not put back in pool. + conn.invalidate(); + warn!("Node {}: {}", node, err); + return Ok(false); } - bail!(ErrorKind::Connection("Timeout".to_string())) + // Parse results. + if let Err(err) = Self::parse_result(batch_reads, &mut conn).await { + // close the connection + // cancelling/closing the batch/multi commands will return an error, which will + // close the connection to throw away its data and signal the server about the + // situation. We will not put back the connection in the buffer. + if !commands::keep_connection(&err) { + conn.invalidate(); + } + Err(err) + } else { + Ok(true) + } } - async fn parse_group(&mut self, conn: &mut Connection, size: usize) -> Result { + async fn parse_group(batch_reads: &mut [(BatchRead, usize)], conn: &mut Connection, size: usize) -> Result { while conn.bytes_read() < size { conn.read_buffer(commands::buffer::MSG_REMAINING_HEADER_SIZE as usize) .await?; - match self.parse_record(conn).await? { + match Self::parse_record(conn).await? { None => return Ok(false), Some(batch_record) => { - let batch_read = self - .batch_reads + let batch_read = batch_reads .get_mut(batch_record.batch_index) .expect("Invalid batch index"); - batch_read.record = batch_record.record; + batch_read.0.record = batch_record.record; } } } Ok(true) } - async fn parse_record(&mut self, conn: &mut Connection) -> Result> { - let found_key = match ResultCode::from(conn.buffer.read_u8(Some(5))) { - ResultCode::Ok => true, - ResultCode::KeyNotFoundError => false, - rc => bail!(ErrorKind::ServerError(rc)), - }; - + async fn parse_record(conn: &mut Connection) -> Result> { // if cmd is the end marker of the response, do not proceed further let info3 = conn.buffer.read_u8(Some(3)); if info3 & commands::buffer::INFO3_LAST == commands::buffer::INFO3_LAST { return Ok(None); } + let found_key = match ResultCode::from(conn.buffer.read_u8(Some(5))) { + ResultCode::Ok => true, + ResultCode::KeyNotFoundError => false, + rc => bail!(ErrorKind::ServerError(rc)), + }; + conn.buffer.skip(6); let generation = conn.buffer.read_u32(None); let expiration = conn.buffer.read_u32(None); @@ -196,38 +211,13 @@ impl BatchReadCommand { record, })) } -} - -#[async_trait::async_trait] -impl commands::Command for BatchReadCommand { - async fn write_timeout( - &mut self, - conn: &mut Connection, - timeout: Option, - ) -> Result<()> { - conn.buffer.write_timeout(timeout); - Ok(()) - } - - async fn write_buffer(&mut self, conn: &mut Connection) -> Result<()> { - conn.flush().await - } - - fn prepare_buffer(&mut self, conn: &mut Connection) -> Result<()> { - conn.buffer - .set_batch_read(&self.policy, self.batch_reads.clone()) - } - - async fn get_node(&self) -> Result> { - Ok(self.node.clone()) - } - async fn parse_result(&mut self, conn: &mut Connection) -> Result<()> { + async fn parse_result(batch_reads: &mut [(BatchRead, usize)], conn: &mut Connection) -> Result<()> { loop { conn.read_buffer(8).await?; let size = conn.buffer.read_msg_size(None); conn.bookmark(); - if size > 0 && !self.parse_group(conn, size as usize).await? { + if size > 0 && !Self::parse_group(batch_reads, conn, size as usize).await? { break; } } diff --git a/aerospike-core/src/commands/buffer.rs b/aerospike-core/src/commands/buffer.rs index 02f087e7..e00db4cc 100644 --- a/aerospike-core/src/commands/buffer.rs +++ b/aerospike-core/src/commands/buffer.rs @@ -23,7 +23,7 @@ use crate::expressions::FilterExpression; use crate::msgpack::encoder; use crate::operations::{Operation, OperationBin, OperationData, OperationType}; use crate::policy::{ - BatchPolicy, CommitLevel, ConsistencyLevel, GenerationPolicy, QueryPolicy, ReadPolicy, + BatchPolicy, CommitLevel, ConsistencyLevel, GenerationPolicy, QueryPolicy, BasePolicy, RecordExistsAction, ScanPolicy, WritePolicy, }; use crate::{BatchRead, Bin, Bins, CollectionIndexType, Key, Statement, Value}; @@ -263,7 +263,7 @@ impl Buffer { } // Writes the command for get operations - pub fn set_read(&mut self, policy: &ReadPolicy, key: &Key, bins: &Bins) -> Result<()> { + pub fn set_read(&mut self, policy: &BasePolicy, key: &Key, bins: &Bins) -> Result<()> { match bins { Bins::None => self.set_read_header(policy, key), Bins::All => self.set_read_for_key_only(policy, key), @@ -297,7 +297,7 @@ impl Buffer { } // Writes the command for getting metadata operations - pub fn set_read_header(&mut self, policy: &ReadPolicy, key: &Key) -> Result<()> { + pub fn set_read_header(&mut self, policy: &BasePolicy, key: &Key) -> Result<()> { self.begin(); let mut field_count = self.estimate_key_size(key, false); let filter_size = self.estimate_filter_size(policy.filter_expression()); @@ -319,7 +319,7 @@ impl Buffer { Ok(()) } - pub fn set_read_for_key_only(&mut self, policy: &ReadPolicy, key: &Key) -> Result<()> { + pub fn set_read_for_key_only(&mut self, policy: &BasePolicy, key: &Key) -> Result<()> { self.begin(); let mut field_count = self.estimate_key_size(key, false); @@ -344,7 +344,7 @@ impl Buffer { pub fn set_batch_read( &mut self, policy: &BatchPolicy, - batch_reads: Vec, + batch_reads: &[(BatchRead, usize)], ) -> Result<()> { let field_count_row = if policy.send_set_name { 2 } else { 1 }; @@ -358,7 +358,7 @@ impl Buffer { } let mut prev: Option<&BatchRead> = None; - for batch_read in &batch_reads { + for (batch_read, _) in batch_reads { self.data_offset += batch_read.key.digest.len() + 4; match prev { Some(prev) if batch_read.match_header(prev, policy.send_set_name) => { @@ -404,7 +404,7 @@ impl Buffer { self.write_u8(if policy.allow_inline { 1 } else { 0 }); prev = None; - for (idx, batch_read) in batch_reads.iter().enumerate() { + for (idx, (batch_read, _)) in batch_reads.iter().enumerate() { let key = &batch_read.key; self.write_u32(idx as u32); self.write_bytes(&key.digest); @@ -971,7 +971,7 @@ impl Buffer { fn write_header( &mut self, - policy: &ReadPolicy, + policy: &BasePolicy, read_attr: u8, write_attr: u8, field_count: u16, diff --git a/aerospike-core/src/commands/delete_command.rs b/aerospike-core/src/commands/delete_command.rs index f8ab5594..a9ea4737 100644 --- a/aerospike-core/src/commands/delete_command.rs +++ b/aerospike-core/src/commands/delete_command.rs @@ -31,7 +31,7 @@ pub struct DeleteCommand<'a> { impl<'a> DeleteCommand<'a> { pub fn new(policy: &'a WritePolicy, cluster: Arc, key: &'a Key) -> Self { DeleteCommand { - single_command: SingleCommand::new(cluster, key), + single_command: SingleCommand::new(cluster, key, crate::policy::Replica::Master), policy, existed: false, } @@ -61,7 +61,7 @@ impl<'a> Command for DeleteCommand<'a> { conn.buffer.set_delete(self.policy, self.single_command.key) } - async fn get_node(&self) -> Result> { + async fn get_node(&mut self) -> Result> { self.single_command.get_node().await } diff --git a/aerospike-core/src/commands/execute_udf_command.rs b/aerospike-core/src/commands/execute_udf_command.rs index 1db65621..7ff78777 100644 --- a/aerospike-core/src/commands/execute_udf_command.rs +++ b/aerospike-core/src/commands/execute_udf_command.rs @@ -41,7 +41,7 @@ impl<'a> ExecuteUDFCommand<'a> { args: Option<&'a [Value]>, ) -> Self { ExecuteUDFCommand { - read_command: ReadCommand::new(&policy.base_policy, cluster, key, Bins::All), + read_command: ReadCommand::new(&policy.base_policy, cluster, key, Bins::All, crate::policy::Replica::Master), policy, package_name, function_name, @@ -79,7 +79,7 @@ impl<'a> Command for ExecuteUDFCommand<'a> { ) } - async fn get_node(&self) -> Result> { + async fn get_node(&mut self) -> Result> { self.read_command.get_node().await } diff --git a/aerospike-core/src/commands/exists_command.rs b/aerospike-core/src/commands/exists_command.rs index 38083d8a..8da3b212 100644 --- a/aerospike-core/src/commands/exists_command.rs +++ b/aerospike-core/src/commands/exists_command.rs @@ -31,7 +31,7 @@ pub struct ExistsCommand<'a> { impl<'a> ExistsCommand<'a> { pub fn new(policy: &'a WritePolicy, cluster: Arc, key: &'a Key) -> Self { ExistsCommand { - single_command: SingleCommand::new(cluster, key), + single_command: SingleCommand::new(cluster, key, crate::policy::Replica::Master), policy, exists: false, } @@ -61,7 +61,7 @@ impl<'a> Command for ExistsCommand<'a> { conn.buffer.set_exists(self.policy, self.single_command.key) } - async fn get_node(&self) -> Result> { + async fn get_node(&mut self) -> Result> { self.single_command.get_node().await } diff --git a/aerospike-core/src/commands/mod.rs b/aerospike-core/src/commands/mod.rs index 915664c3..71e58baa 100644 --- a/aerospike-core/src/commands/mod.rs +++ b/aerospike-core/src/commands/mod.rs @@ -63,7 +63,7 @@ pub trait Command { timeout: Option, ) -> Result<()>; fn prepare_buffer(&mut self, conn: &mut Connection) -> Result<()>; - async fn get_node(&self) -> Result>; + async fn get_node(&mut self) -> Result>; async fn parse_result(&mut self, conn: &mut Connection) -> Result<()>; async fn write_buffer(&mut self, conn: &mut Connection) -> Result<()>; } diff --git a/aerospike-core/src/commands/operate_command.rs b/aerospike-core/src/commands/operate_command.rs index 8cf23985..14d18a18 100644 --- a/aerospike-core/src/commands/operate_command.rs +++ b/aerospike-core/src/commands/operate_command.rs @@ -37,7 +37,7 @@ impl<'a> OperateCommand<'a> { operations: &'a [Operation<'a>], ) -> Self { OperateCommand { - read_command: ReadCommand::new(&policy.base_policy, cluster, key, Bins::All), + read_command: ReadCommand::new(&policy.base_policy, cluster, key, Bins::All, crate::policy::Replica::Master), policy, operations, } @@ -71,7 +71,7 @@ impl<'a> Command for OperateCommand<'a> { ) } - async fn get_node(&self) -> Result> { + async fn get_node(&mut self) -> Result> { self.read_command.get_node().await } diff --git a/aerospike-core/src/commands/query_command.rs b/aerospike-core/src/commands/query_command.rs index 28a28dab..07d9ce14 100644 --- a/aerospike-core/src/commands/query_command.rs +++ b/aerospike-core/src/commands/query_command.rs @@ -75,7 +75,7 @@ impl<'a> Command for QueryCommand<'a> { ) } - async fn get_node(&self) -> Result> { + async fn get_node(&mut self) -> Result> { self.stream_command.get_node().await } diff --git a/aerospike-core/src/commands/read_command.rs b/aerospike-core/src/commands/read_command.rs index be79be1b..f08294bc 100644 --- a/aerospike-core/src/commands/read_command.rs +++ b/aerospike-core/src/commands/read_command.rs @@ -22,21 +22,21 @@ use crate::commands::buffer; use crate::commands::{Command, SingleCommand}; use crate::errors::{ErrorKind, Result}; use crate::net::Connection; -use crate::policy::ReadPolicy; +use crate::policy::{BasePolicy, Replica}; use crate::value::bytes_to_particle; use crate::{Bins, Key, Record, ResultCode, Value}; pub struct ReadCommand<'a> { pub single_command: SingleCommand<'a>, pub record: Option, - policy: &'a ReadPolicy, + policy: &'a BasePolicy, bins: Bins, } impl<'a> ReadCommand<'a> { - pub fn new(policy: &'a ReadPolicy, cluster: Arc, key: &'a Key, bins: Bins) -> Self { + pub fn new(policy: &'a BasePolicy, cluster: Arc, key: &'a Key, bins: Bins, replica: Replica) -> Self { ReadCommand { - single_command: SingleCommand::new(cluster, key), + single_command: SingleCommand::new(cluster, key, replica), bins, policy, record: None, @@ -115,7 +115,7 @@ impl<'a> Command for ReadCommand<'a> { .set_read(self.policy, self.single_command.key, &self.bins) } - async fn get_node(&self) -> Result> { + async fn get_node(&mut self) -> Result> { self.single_command.get_node().await } diff --git a/aerospike-core/src/commands/scan_command.rs b/aerospike-core/src/commands/scan_command.rs index 808ad434..6c5ae0f6 100644 --- a/aerospike-core/src/commands/scan_command.rs +++ b/aerospike-core/src/commands/scan_command.rs @@ -83,7 +83,7 @@ impl<'a> Command for ScanCommand<'a> { ) } - async fn get_node(&self) -> Result> { + async fn get_node(&mut self) -> Result> { self.stream_command.get_node().await } diff --git a/aerospike-core/src/commands/single_command.rs b/aerospike-core/src/commands/single_command.rs index 2eec7783..8c2a39e1 100644 --- a/aerospike-core/src/commands/single_command.rs +++ b/aerospike-core/src/commands/single_command.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::sync::{Arc, Weak}; use crate::cluster::partition::Partition; use crate::cluster::{Cluster, Node}; @@ -28,20 +28,26 @@ pub struct SingleCommand<'a> { cluster: Arc, pub key: &'a Key, partition: Partition<'a>, + last_tried: Weak, + replica: crate::policy::Replica, } impl<'a> SingleCommand<'a> { - pub fn new(cluster: Arc, key: &'a Key) -> Self { + pub fn new(cluster: Arc, key: &'a Key, replica: crate::policy::Replica,) -> Self { let partition = Partition::new_by_key(key); SingleCommand { cluster, key, partition, + last_tried: Weak::new(), + replica, } } - pub async fn get_node(&self) -> Result> { - self.cluster.get_node(&self.partition).await + pub async fn get_node(&mut self) -> Result> { + let this_time = self.cluster.get_node(&self.partition, self.replica, self.last_tried.clone()).await?; + self.last_tried = Arc::downgrade(&this_time); + Ok(this_time) } pub async fn empty_socket(conn: &mut Connection) -> Result<()> { diff --git a/aerospike-core/src/commands/stream_command.rs b/aerospike-core/src/commands/stream_command.rs index 05989f34..574898cd 100644 --- a/aerospike-core/src/commands/stream_command.rs +++ b/aerospike-core/src/commands/stream_command.rs @@ -201,7 +201,7 @@ impl Command for StreamCommand { unreachable!() } - async fn get_node(&self) -> Result> { + async fn get_node(&mut self) -> Result> { Ok(self.node.clone()) } diff --git a/aerospike-core/src/commands/touch_command.rs b/aerospike-core/src/commands/touch_command.rs index 18868b31..4e558620 100644 --- a/aerospike-core/src/commands/touch_command.rs +++ b/aerospike-core/src/commands/touch_command.rs @@ -31,7 +31,7 @@ pub struct TouchCommand<'a> { impl<'a> TouchCommand<'a> { pub fn new(policy: &'a WritePolicy, cluster: Arc, key: &'a Key) -> Self { TouchCommand { - single_command: SingleCommand::new(cluster, key), + single_command: SingleCommand::new(cluster, key, crate::policy::Replica::Master), policy, } } @@ -60,7 +60,7 @@ impl<'a> Command for TouchCommand<'a> { conn.buffer.set_touch(self.policy, self.single_command.key) } - async fn get_node(&self) -> Result> { + async fn get_node(&mut self) -> Result> { self.single_command.get_node().await } diff --git a/aerospike-core/src/commands/write_command.rs b/aerospike-core/src/commands/write_command.rs index e92695d1..384252ac 100644 --- a/aerospike-core/src/commands/write_command.rs +++ b/aerospike-core/src/commands/write_command.rs @@ -40,7 +40,7 @@ impl<'a> WriteCommand<'a> { operation: OperationType, ) -> Self { WriteCommand { - single_command: SingleCommand::new(cluster, key), + single_command: SingleCommand::new(cluster, key, crate::policy::Replica::Master), bins, policy, operation, @@ -76,7 +76,7 @@ impl<'a> Command for WriteCommand<'a> { ) } - async fn get_node(&self) -> Result> { + async fn get_node(&mut self) -> Result> { self.single_command.get_node().await } diff --git a/aerospike-core/src/expressions/lists.rs b/aerospike-core/src/expressions/lists.rs index 5371fc99..14ca2b49 100644 --- a/aerospike-core/src/expressions/lists.rs +++ b/aerospike-core/src/expressions/lists.rs @@ -17,7 +17,7 @@ use crate::expressions::{nil, ExpOp, ExpType, ExpressionArgument, FilterExpression, MODIFY}; use crate::operations::cdt_context::{CdtContext, CtxType}; -use crate::operations::lists::{CdtListOpType, ListPolicy, ListReturnType, ListSortFlags}; +use crate::operations::lists::{CdtListOpType, ListPolicy, ListReturnType, ListSortFlags, ToListReturnTypeBitmask}; use crate::Value; const MODULE: i64 = 0; @@ -388,15 +388,16 @@ pub fn size(bin: FilterExpression, ctx: &[CdtContext]) -> FilterExpression { /// int_val(0)); /// ``` /// -pub fn get_by_value( - return_type: ListReturnType, +pub fn get_by_value( + return_type: TLR, value: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtListOpType::GetByValue as i64)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(value), ExpressionArgument::Context(ctx.to_vec()), ]; @@ -414,17 +415,18 @@ pub fn get_by_value( /// /// get_by_value_range(ListReturnType::Values, Some(int_val(10)), Some(int_val(20)), list_bin("a".to_string()), &[]); /// ``` -pub fn get_by_value_range( - return_type: ListReturnType, +pub fn get_by_value_range( + return_type: TLR, value_begin: Option, value_end: Option, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let mut args = vec![ ExpressionArgument::Context(ctx.to_vec()), ExpressionArgument::Value(Value::from(CdtListOpType::GetByValueInterval as i64)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ]; if let Some(val_beg) = value_begin { args.push(ExpressionArgument::FilterExpression(val_beg)); @@ -439,15 +441,16 @@ pub fn get_by_value_range( /// Create expression that selects list items identified by values and returns selected data /// specified by returnType. -pub fn get_by_value_list( - return_type: ListReturnType, +pub fn get_by_value_list( + return_type: TLR, values: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtListOpType::GetByValueList as i64)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(values), ExpressionArgument::Context(ctx.to_vec()), ]; @@ -467,16 +470,17 @@ pub fn get_by_value_list( /// (3,3) = [11,15] /// (3,-3) = [0,4,5,9,11,15] /// ``` -pub fn get_by_value_relative_rank_range( - return_type: ListReturnType, +pub fn get_by_value_relative_rank_range( + return_type: TLR, value: FilterExpression, rank: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtListOpType::GetByValueRelRankRange as i64)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(value), ExpressionArgument::FilterExpression(rank), ExpressionArgument::Context(ctx.to_vec()), @@ -497,17 +501,18 @@ pub fn get_by_value_relative_rank_range( /// (3,3,7) = [11,15] /// (3,-3,2) = [] /// ``` -pub fn get_by_value_relative_rank_range_count( - return_type: ListReturnType, +pub fn get_by_value_relative_rank_range_count( + return_type: TLR, value: FilterExpression, rank: FilterExpression, count: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtListOpType::GetByValueRelRankRange as i64)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(value), ExpressionArgument::FilterExpression(rank), ExpressionArgument::FilterExpression(count), @@ -529,16 +534,17 @@ pub fn get_by_value_relative_rank_range_count( /// int_val(5)); /// ``` /// -pub fn get_by_index( - return_type: ListReturnType, +pub fn get_by_index( + return_type: TLR, value_type: ExpType, index: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtListOpType::GetByIndex as i64)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(index), ExpressionArgument::Context(ctx.to_vec()), ]; @@ -547,15 +553,16 @@ pub fn get_by_index( /// Create expression that selects list items starting at specified index to the end of list /// and returns selected data specified by returnType . -pub fn get_by_index_range( - return_type: ListReturnType, +pub fn get_by_index_range( + return_type: TLR, index: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtListOpType::GetByIndexRange as i64)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(index), ExpressionArgument::Context(ctx.to_vec()), ]; @@ -564,16 +571,17 @@ pub fn get_by_index_range( /// Create expression that selects "count" list items starting at specified index /// and returns selected data specified by returnType. -pub fn get_by_index_range_count( - return_type: ListReturnType, +pub fn get_by_index_range_count( + return_type: TLR, index: FilterExpression, count: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtListOpType::GetByIndexRange as i64)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(index), ExpressionArgument::FilterExpression(count), ExpressionArgument::Context(ctx.to_vec()), @@ -591,16 +599,17 @@ pub fn get_by_index_range_count( /// use aerospike::expressions::lists::get_by_rank; /// get_by_rank(ListReturnType::Values, ExpType::STRING, int_val(0), list_bin("a".to_string()), &[]); /// ``` -pub fn get_by_rank( - return_type: ListReturnType, +pub fn get_by_rank( + return_type: TLR, value_type: ExpType, rank: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtListOpType::GetByRank as i64)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(rank), ExpressionArgument::Context(ctx.to_vec()), ]; @@ -609,15 +618,16 @@ pub fn get_by_rank( /// Create expression that selects list items starting at specified rank to the last ranked item /// and returns selected data specified by returnType. -pub fn get_by_rank_range( - return_type: ListReturnType, +pub fn get_by_rank_range( + return_type: TLR, rank: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtListOpType::GetByRankRange as i64)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(rank), ExpressionArgument::Context(ctx.to_vec()), ]; @@ -626,16 +636,17 @@ pub fn get_by_rank_range( /// Create expression that selects "count" list items starting at specified rank and returns /// selected data specified by returnType. -pub fn get_by_rank_range_count( - return_type: ListReturnType, +pub fn get_by_rank_range_count( + return_type: TLR, rank: FilterExpression, count: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtListOpType::GetByRankRange as i64)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(rank), ExpressionArgument::FilterExpression(count), ExpressionArgument::Context(ctx.to_vec()), @@ -687,8 +698,8 @@ fn add_write( } #[doc(hidden)] -const fn get_value_type(return_type: ListReturnType) -> ExpType { - if (return_type as u8 & !(ListReturnType::Inverted as u8)) == ListReturnType::Values as u8 { +const fn get_value_type(return_type: i64) -> ExpType { + if (return_type & !(ListReturnType::Inverted as i64)) == ListReturnType::Values as i64 { ExpType::LIST } else { ExpType::INT diff --git a/aerospike-core/src/expressions/maps.rs b/aerospike-core/src/expressions/maps.rs index e51de5d2..7f8f5d3b 100644 --- a/aerospike-core/src/expressions/maps.rs +++ b/aerospike-core/src/expressions/maps.rs @@ -16,7 +16,7 @@ //! Map Cdt Aerospike Filter Expressions. use crate::expressions::{nil, ExpOp, ExpType, ExpressionArgument, FilterExpression, MODIFY}; use crate::operations::cdt_context::{CdtContext, CtxType}; -use crate::operations::maps::{map_write_op, CdtMapOpType}; +use crate::operations::maps::{map_write_op, CdtMapOpType, ToMapReturnTypeBitmask}; use crate::{MapPolicy, MapReturnType, Value}; #[doc(hidden)] @@ -444,16 +444,17 @@ pub fn size(bin: FilterExpression, ctx: &[CdtContext]) -> FilterExpression { /// gt(get_by_key(MapReturnType::Count, ExpType::INT, string_val("B".to_string()), map_bin("a".to_string()), &[]), int_val(0)); /// ``` /// -pub fn get_by_key( - return_type: MapReturnType, +pub fn get_by_key( + return_type: TMR, value_type: ExpType, key: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtMapOpType::GetByKey as u8)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(key), ExpressionArgument::Context(ctx.to_vec()), ]; @@ -464,17 +465,18 @@ pub fn get_by_key( /// If keyBegin is null, the range is less than keyEnd. /// If keyEnd is null, the range is greater than equal to keyBegin. /// Expression returns selected data specified by returnType. -pub fn get_by_key_range( - return_type: MapReturnType, +pub fn get_by_key_range( + return_type: TMR, key_begin: Option, key_end: Option, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let mut args = vec![ ExpressionArgument::Context(ctx.to_vec()), ExpressionArgument::Value(Value::from(CdtMapOpType::GetByKeyInterval as u8)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ]; if let Some(val_beg) = key_begin { args.push(ExpressionArgument::FilterExpression(val_beg)); @@ -488,15 +490,16 @@ pub fn get_by_key_range( } /// Create expression that selects map items identified by keys and returns selected data specified by returnType -pub fn get_by_key_list( - return_type: MapReturnType, +pub fn get_by_key_list( + return_type: TMR, keys: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtMapOpType::GetByKeyList as u8)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(keys), ExpressionArgument::Context(ctx.to_vec()), ]; @@ -514,16 +517,17 @@ pub fn get_by_key_list( /// * (5,-1) = [{4=2},{5=15},{9=10}] /// * (3,2) = [{9=10}] /// * (3,-2) = [{0=17},{4=2},{5=15},{9=10}] -pub fn get_by_key_relative_index_range( - return_type: MapReturnType, +pub fn get_by_key_relative_index_range( + return_type: TMR, key: FilterExpression, index: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtMapOpType::GetByKeyRelIndexRange as u8)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(key), ExpressionArgument::FilterExpression(index), ExpressionArgument::Context(ctx.to_vec()), @@ -542,17 +546,18 @@ pub fn get_by_key_relative_index_range( /// * (5,-1,1) = [{4=2}] /// * (3,2,1) = [{9=10}] /// * (3,-2,2) = [{0=17}] -pub fn get_by_key_relative_index_range_count( - return_type: MapReturnType, +pub fn get_by_key_relative_index_range_count( + return_type: TMR, key: FilterExpression, index: FilterExpression, count: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtMapOpType::GetByKeyRelIndexRange as u8)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(key), ExpressionArgument::FilterExpression(index), ExpressionArgument::FilterExpression(count), @@ -572,15 +577,16 @@ pub fn get_by_key_relative_index_range_count( /// /// gt(get_by_value(MapReturnType::Count, string_val("BBB".to_string()), map_bin("a".to_string()), &[]), int_val(0)); /// ``` -pub fn get_by_value( - return_type: MapReturnType, +pub fn get_by_value( + return_type: TMR, value: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtMapOpType::GetByValue as u8)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(value), ExpressionArgument::Context(ctx.to_vec()), ]; @@ -592,17 +598,18 @@ pub fn get_by_value( /// If valueEnd is null, the range is greater than equal to valueBegin. /// /// Expression returns selected data specified by returnType. -pub fn get_by_value_range( - return_type: MapReturnType, +pub fn get_by_value_range( + return_type: TMR, value_begin: Option, value_end: Option, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let mut args = vec![ ExpressionArgument::Context(ctx.to_vec()), ExpressionArgument::Value(Value::from(CdtMapOpType::GetByValueInterval as u8)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ]; if let Some(val_beg) = value_begin { args.push(ExpressionArgument::FilterExpression(val_beg)); @@ -616,15 +623,16 @@ pub fn get_by_value_range( } /// Create expression that selects map items identified by values and returns selected data specified by returnType. -pub fn get_by_value_list( - return_type: MapReturnType, +pub fn get_by_value_list( + return_type: TMR, values: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtMapOpType::GetByValueList as u8)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(values), ExpressionArgument::Context(ctx.to_vec()), ]; @@ -639,16 +647,17 @@ pub fn get_by_value_list( /// * (value,rank) = [selected items] /// * (11,1) = [{0=17}] /// * (11,-1) = [{9=10},{5=15},{0=17}] -pub fn get_by_value_relative_rank_range( - return_type: MapReturnType, +pub fn get_by_value_relative_rank_range( + return_type: TMR, value: FilterExpression, rank: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtMapOpType::GetByValueRelRankRange as u8)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(value), ExpressionArgument::FilterExpression(rank), ExpressionArgument::Context(ctx.to_vec()), @@ -664,17 +673,18 @@ pub fn get_by_value_relative_rank_range( /// * (value,rank,count) = [selected items] /// * (11,1,1) = [{0=17}] /// * (11,-1,1) = [{9=10}] -pub fn get_by_value_relative_rank_range_count( - return_type: MapReturnType, +pub fn get_by_value_relative_rank_range_count( + return_type: TMR, value: FilterExpression, rank: FilterExpression, count: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtMapOpType::GetByValueRelRankRange as u8)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(value), ExpressionArgument::FilterExpression(rank), ExpressionArgument::FilterExpression(count), @@ -684,16 +694,17 @@ pub fn get_by_value_relative_rank_range_count( } /// Create expression that selects map item identified by index and returns selected data specified by returnType. -pub fn get_by_index( - return_type: MapReturnType, +pub fn get_by_index( + return_type: TMR, value_type: ExpType, index: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtMapOpType::GetByIndex as u8)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(index), ExpressionArgument::Context(ctx.to_vec()), ]; @@ -702,15 +713,16 @@ pub fn get_by_index( /// Create expression that selects map items starting at specified index to the end of map and returns selected /// data specified by returnType. -pub fn get_by_index_range( - return_type: MapReturnType, +pub fn get_by_index_range( + return_type: TMR, index: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtMapOpType::GetByIndexRange as u8)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(index), ExpressionArgument::Context(ctx.to_vec()), ]; @@ -719,16 +731,17 @@ pub fn get_by_index_range( /// Create expression that selects "count" map items starting at specified index and returns selected data /// specified by returnType. -pub fn get_by_index_range_count( - return_type: MapReturnType, +pub fn get_by_index_range_count( + return_type: TMR, index: FilterExpression, count: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtMapOpType::GetByIndexRange as u8)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(index), ExpressionArgument::FilterExpression(count), ExpressionArgument::Context(ctx.to_vec()), @@ -737,16 +750,17 @@ pub fn get_by_index_range_count( } /// Create expression that selects map item identified by rank and returns selected data specified by returnType. -pub fn get_by_rank( - return_type: MapReturnType, +pub fn get_by_rank( + return_type: TMR, value_type: ExpType, rank: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtMapOpType::GetByRank as u8)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(rank), ExpressionArgument::Context(ctx.to_vec()), ]; @@ -755,15 +769,16 @@ pub fn get_by_rank( /// Create expression that selects map items starting at specified rank to the last ranked item and /// returns selected data specified by returnType. -pub fn get_by_rank_range( - return_type: MapReturnType, +pub fn get_by_rank_range( + return_type: TMR, rank: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtMapOpType::GetByRankRange as u8)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(rank), ExpressionArgument::Context(ctx.to_vec()), ]; @@ -772,16 +787,17 @@ pub fn get_by_rank_range( /// Create expression that selects "count" map items starting at specified rank and returns selected /// data specified by returnType. -pub fn get_by_rank_range_count( - return_type: MapReturnType, +pub fn get_by_rank_range_count( + return_type: TMR, rank: FilterExpression, count: FilterExpression, bin: FilterExpression, ctx: &[CdtContext], ) -> FilterExpression { + let return_type = return_type.to_bitmask(); let args = vec![ ExpressionArgument::Value(Value::from(CdtMapOpType::GetByRankRange as u8)), - ExpressionArgument::Value(Value::from(return_type as u8)), + ExpressionArgument::Value(Value::from(return_type)), ExpressionArgument::FilterExpression(rank), ExpressionArgument::FilterExpression(count), ExpressionArgument::Context(ctx.to_vec()), @@ -830,11 +846,11 @@ fn add_write( } #[doc(hidden)] -const fn get_value_type(return_type: MapReturnType) -> ExpType { - let t = return_type as u8 & !(MapReturnType::Inverted as u8); - if t == MapReturnType::Key as u8 || t == MapReturnType::Value as u8 { +const fn get_value_type(return_type: i64) -> ExpType { + let t = return_type & !(MapReturnType::Inverted as i64); + if t == MapReturnType::Key as i64 || t == MapReturnType::Value as i64 { ExpType::LIST - } else if t == MapReturnType::KeyValue as u8 { + } else if t == MapReturnType::KeyValue as i64 { ExpType::MAP } else { ExpType::INT diff --git a/aerospike-core/src/key.rs b/aerospike-core/src/key.rs index 70de715a..d8656950 100644 --- a/aerospike-core/src/key.rs +++ b/aerospike-core/src/key.rs @@ -19,8 +19,8 @@ use std::result::Result as StdResult; use crate::errors::Result; use crate::Value; -use ripemd160::digest::Digest; -use ripemd160::Ripemd160; +use ripemd::digest::Digest; +use ripemd::Ripemd160; #[cfg(feature = "serialization")] use serde::Serialize; /// Unique record identifier. Records can be identified using a specified namespace, an optional @@ -66,14 +66,14 @@ impl Key { fn compute_digest(&mut self) -> Result<()> { let mut hash = Ripemd160::new(); - hash.input(self.set_name.as_bytes()); + hash.update(self.set_name.as_bytes()); if let Some(ref user_key) = self.user_key { - hash.input(&[user_key.particle_type() as u8]); + hash.update(&[user_key.particle_type() as u8]); user_key.write_key_bytes(&mut hash)?; } else { unreachable!(); } - self.digest = hash.result().into(); + self.digest = hash.finalize().into(); Ok(()) } diff --git a/aerospike-core/src/lib.rs b/aerospike-core/src/lib.rs index 1a98c3f7..2d3983b5 100644 --- a/aerospike-core/src/lib.rs +++ b/aerospike-core/src/lib.rs @@ -138,7 +138,6 @@ extern crate base64; extern crate byteorder; extern crate crossbeam_queue; -extern crate ripemd160; #[macro_use] extern crate error_chain; #[macro_use] diff --git a/aerospike-core/src/msgpack/encoder.rs b/aerospike-core/src/msgpack/encoder.rs index bc772bf8..f6c42f15 100644 --- a/aerospike-core/src/msgpack/encoder.rs +++ b/aerospike-core/src/msgpack/encoder.rs @@ -192,16 +192,16 @@ const MSGPACK_MARKER_NIL: u8 = 0xc0; const MSGPACK_MARKER_BOOL_TRUE: u8 = 0xc3; const MSGPACK_MARKER_BOOL_FALSE: u8 = 0xc2; -const MSGPACK_MARKER_I8: u8 = 0xcc; -const MSGPACK_MARKER_I16: u8 = 0xcd; -const MSGPACK_MARKER_I32: u8 = 0xce; +const MSGPACK_MARKER_U8: u8 = 0xcc; +const MSGPACK_MARKER_U16: u8 = 0xcd; +const MSGPACK_MARKER_U32: u8 = 0xce; +const MSGPACK_MARKER_U64: u8 = 0xcf; + +const MSGPACK_MARKER_I8: u8 = 0xd0; +const MSGPACK_MARKER_I16: u8 = 0xd1; +const MSGPACK_MARKER_I32: u8 = 0xd2; const MSGPACK_MARKER_I64: u8 = 0xd3; -const MSGPACK_MARKER_NI8: u8 = 0xd0; -const MSGPACK_MARKER_NI16: u8 = 0xd1; -const MSGPACK_MARKER_NI32: u8 = 0xd2; -const MSGPACK_MARKER_NI64: u8 = 0xd3; - // This method is not compatible with MsgPack specs and is only used by aerospike client<->server // for wire transfer only #[doc(hidden)] @@ -220,15 +220,6 @@ pub fn pack_half_byte(buf: &mut Option<&mut Buffer>, value: u8) -> usize { 1 } -#[doc(hidden)] -pub fn pack_byte(buf: &mut Option<&mut Buffer>, marker: u8, value: u8) -> usize { - if let Some(ref mut buf) = *buf { - buf.write_u8(marker); - buf.write_u8(value); - } - 2 -} - #[doc(hidden)] pub fn pack_nil(buf: &mut Option<&mut Buffer>) -> usize { if let Some(ref mut buf) = *buf { @@ -250,29 +241,35 @@ pub fn pack_bool(buf: &mut Option<&mut Buffer>, value: bool) -> usize { } #[doc(hidden)] -pub fn pack_map_begin(buf: &mut Option<&mut Buffer>, length: usize) -> usize { - match length { - val if val < 16 => pack_half_byte(buf, 0x80 | (length as u8)), - val if (16..(1 << 16)).contains(&val) => pack_i16(buf, 0xde, length as i16), - _ => pack_i32(buf, 0xdf, length as i32), +fn pack_map_begin(buf: &mut Option<&mut Buffer>, length: usize) -> usize { + if length < 16 { + pack_half_byte(buf, 0x80 | (length as u8)) + } else if length < 1 << 16 { + pack_type_u16(buf, 0xde, length as u16) + } else { + pack_type_u32(buf, 0xdf, length as u32) } } #[doc(hidden)] pub fn pack_array_begin(buf: &mut Option<&mut Buffer>, length: usize) -> usize { - match length { - val if val < 16 => pack_half_byte(buf, 0x90 | (length as u8)), - val if (16..(1 << 16)).contains(&val) => pack_i16(buf, 0xdc, length as i16), - _ => pack_i32(buf, 0xdd, length as i32), + if length < 16 { + pack_half_byte(buf, 0x90 | (length as u8)) + } else if length < 1 << 16 { + pack_type_u16(buf, 0xdc, length as u16) + } else { + pack_type_u32(buf, 0xdd, length as u32) } } #[doc(hidden)] -pub fn pack_byte_array_begin(buf: &mut Option<&mut Buffer>, length: usize) -> usize { - match length { - val if val < 32 => pack_half_byte(buf, 0xa0 | (length as u8)), - val if (32..(1 << 16)).contains(&val) => pack_i16(buf, 0xda, length as i16), - _ => pack_i32(buf, 0xdb, length as i32), +pub fn pack_string_begin(buf: &mut Option<&mut Buffer>, length: usize) -> usize { + if length < 32 { + pack_half_byte(buf, 0xa0 | (length as u8)) + } else if length < 1 << 16 { + pack_type_u16(buf, 0xda, length as u16) + } else { + pack_type_u32(buf, 0xdb, length as u32) } } @@ -280,7 +277,7 @@ pub fn pack_byte_array_begin(buf: &mut Option<&mut Buffer>, length: usize) -> us pub fn pack_blob(buf: &mut Option<&mut Buffer>, value: &[u8]) -> usize { let mut size = value.len() + 1; - size += pack_byte_array_begin(buf, size); + size += pack_string_begin(buf, size); if let Some(ref mut buf) = *buf { buf.write_u8(ParticleType::BLOB as u8); buf.write_bytes(value); @@ -293,7 +290,7 @@ pub fn pack_blob(buf: &mut Option<&mut Buffer>, value: &[u8]) -> usize { pub fn pack_string(buf: &mut Option<&mut Buffer>, value: &str) -> usize { let mut size = value.len() + 1; - size += pack_byte_array_begin(buf, size); + size += pack_string_begin(buf, size); if let Some(ref mut buf) = *buf { buf.write_u8(ParticleType::STRING as u8); buf.write_str(value); @@ -306,7 +303,7 @@ pub fn pack_string(buf: &mut Option<&mut Buffer>, value: &str) -> usize { pub fn pack_raw_string(buf: &mut Option<&mut Buffer>, value: &str) -> usize { let mut size = value.len(); - size += pack_byte_array_begin(buf, size); + size += pack_string_begin(buf, size); if let Some(ref mut buf) = *buf { buf.write_str(value); } @@ -318,7 +315,7 @@ pub fn pack_raw_string(buf: &mut Option<&mut Buffer>, value: &str) -> usize { fn pack_geo_json(buf: &mut Option<&mut Buffer>, value: &str) -> usize { let mut size = value.len() + 1; - size += pack_byte_array_begin(buf, size); + size += pack_string_begin(buf, size); if let Some(ref mut buf) = *buf { buf.write_u8(ParticleType::GEOJSON as u8); buf.write_str(value); @@ -328,76 +325,76 @@ fn pack_geo_json(buf: &mut Option<&mut Buffer>, value: &str) -> usize { } #[doc(hidden)] -pub fn pack_integer(buf: &mut Option<&mut Buffer>, val: i64) -> usize { - match val { - val if (0..(1 << 7)).contains(&val) => pack_half_byte(buf, val as u8), - val if val >= 1 << 7 && val < i64::from(i8::max_value()) => { - pack_byte(buf, MSGPACK_MARKER_I8, val as u8) - } - val if val >= i64::from(i8::max_value()) && val < i64::from(i16::max_value()) => { - pack_i16(buf, MSGPACK_MARKER_I16, val as i16) +pub fn pack_integer(buf: &mut Option<&mut Buffer>, value: i64) -> usize { + if value >= 0 { + pack_u64(buf, value as u64) + } else if value >= -32 { + pack_half_byte(buf, 0xe0 | ((Wrapping(value as u8) + Wrapping(32)).0)) + } else if value >= i64::from(i8::MIN) { + if let Some(ref mut buf) = *buf { + buf.write_u8(MSGPACK_MARKER_I8); + buf.write_i8(value as i8); } - val if val >= i64::from(i16::max_value()) && val < i64::from(i32::max_value()) => { - pack_i32(buf, MSGPACK_MARKER_I32, val as i32) + 2 + } else if value >= i64::from(i16::MIN) { + if let Some(ref mut buf) = *buf { + buf.write_u8(MSGPACK_MARKER_I16); + buf.write_i16(value as i16); } - val if val >= i64::from(i32::max_value()) => pack_i64(buf, MSGPACK_MARKER_I64, val), - - // Negative values - val if val >= -32 && val < 0 => { - pack_half_byte(buf, 0xe0 | ((Wrapping(val as u8) + Wrapping(32)).0)) - } - val if val >= i64::from(i8::min_value()) && val < -32 => { - pack_byte(buf, MSGPACK_MARKER_NI8, val as u8) + 3 + } else if value >= i64::from(i32::MIN) { + if let Some(ref mut buf) = *buf { + buf.write_u8(MSGPACK_MARKER_I32); + buf.write_i32(value as i32); } - val if val >= i64::from(i16::min_value()) && val < i64::from(i8::min_value()) => { - pack_i16(buf, MSGPACK_MARKER_NI16, val as i16) - } - val if val >= i64::from(i32::min_value()) && val < i64::from(i16::min_value()) => { - pack_i32(buf, MSGPACK_MARKER_NI32, val as i32) + 5 + } else { + if let Some(ref mut buf) = *buf { + buf.write_u8(MSGPACK_MARKER_I64); + buf.write_i64(value); } - val if val < i64::from(i32::min_value()) => pack_i64(buf, MSGPACK_MARKER_NI64, val), - _ => unreachable!(), + 9 } } - #[doc(hidden)] -pub fn pack_i16(buf: &mut Option<&mut Buffer>, marker: u8, value: i16) -> usize { +fn pack_type_u16(buf: &mut Option<&mut Buffer>, marker: u8, value: u16) -> usize { if let Some(ref mut buf) = *buf { buf.write_u8(marker); - buf.write_i16(value); + buf.write_u16(value); } 3 } #[doc(hidden)] -pub fn pack_i32(buf: &mut Option<&mut Buffer>, marker: u8, value: i32) -> usize { +fn pack_type_u32(buf: &mut Option<&mut Buffer>, marker: u8, value: u32) -> usize { if let Some(ref mut buf) = *buf { buf.write_u8(marker); - buf.write_i32(value); + buf.write_u32(value); } 5 } -#[doc(hidden)] -pub fn pack_i64(buf: &mut Option<&mut Buffer>, marker: u8, value: i64) -> usize { - if let Some(ref mut buf) = *buf { - buf.write_u8(marker); - buf.write_i64(value); - } - 9 -} - #[doc(hidden)] pub fn pack_u64(buf: &mut Option<&mut Buffer>, value: u64) -> usize { - if value <= i64::max_value() as u64 { - return pack_integer(buf, value as i64); - } - - if let Some(ref mut buf) = *buf { - buf.write_u8(0xcf); - buf.write_u64(value); + if value < (1 << 7) { + pack_half_byte(buf, value as u8) + } else if value < u64::from(u8::MAX) { + if let Some(ref mut buf) = *buf { + buf.write_u8(MSGPACK_MARKER_U8); + buf.write_u8(value as u8); + } + 2 + } else if value < u64::from(u16::MAX) { + pack_type_u16(buf, MSGPACK_MARKER_U16, value as u16) + } else if value < u64::from(u32::MAX) { + pack_type_u32(buf, MSGPACK_MARKER_U32, value as u32) + } else { + if let Some(ref mut buf) = *buf { + buf.write_u8(MSGPACK_MARKER_U64); + buf.write_u64(value); + } + 9 } - 9 } #[doc(hidden)] diff --git a/aerospike-core/src/net/connection_pool.rs b/aerospike-core/src/net/connection_pool.rs index 95575dd1..18b87278 100644 --- a/aerospike-core/src/net/connection_pool.rs +++ b/aerospike-core/src/net/connection_pool.rs @@ -89,16 +89,14 @@ impl Queue { ) .await; - if conn.is_err() { + let Ok(Ok(conn)) = conn else { let mut internals = self.0.internals.lock().await; internals.num_conns -= 1; drop(internals); bail!(ErrorKind::Connection( "Could not open network connection".to_string() )); - } - - let conn = conn.unwrap()?; + }; connection = conn; break; diff --git a/aerospike-core/src/operations/exp.rs b/aerospike-core/src/operations/exp.rs index 78f4f720..3fb33781 100644 --- a/aerospike-core/src/operations/exp.rs +++ b/aerospike-core/src/operations/exp.rs @@ -23,6 +23,7 @@ use crate::operations::{Operation, OperationBin, OperationData, OperationType}; use crate::ParticleType; /// Expression write Flags +#[derive(Clone, Copy)] pub enum ExpWriteFlags { /// Default. Allow create or update. Default = 0, @@ -43,6 +44,28 @@ pub enum ExpWriteFlags { EvalNoFail = 1 << 4, } +/// Something that can be resolved into a set of ExpWriteFlags. Either a single ExpWriteFlag, Option, [ExpWriteFlag], etc. +pub trait ToExpWriteFlagBitmask { + /// Convert to an i64 bitmask + fn to_bitmask(self) -> i64; +} + +impl ToExpWriteFlagBitmask for ExpWriteFlags { + fn to_bitmask(self) -> i64 { + self as i64 + } +} + +impl> ToExpWriteFlagBitmask for T { + fn to_bitmask(self) -> i64 { + let mut out = 0; + for val in self { + out |= val.to_bitmask(); + } + out + } +} + #[doc(hidden)] pub type ExpressionEncoder = Box, &ExpOperation) -> usize + Send + Sync + 'static>; @@ -79,15 +102,38 @@ pub enum ExpReadFlags { EvalNoFail = 1 << 4, } +/// Something that can be resolved into a set of ExpWriteFlags. Either a single ExpWriteFlag, Option, [ExpWriteFlag], etc. +pub trait ToExpReadFlagBitmask { + /// Convert to an i64 bitmask + fn to_bitmask(self) -> i64; +} + +impl ToExpReadFlagBitmask for ExpReadFlags { + fn to_bitmask(self) -> i64 { + self as i64 + } +} + +impl> ToExpReadFlagBitmask for T { + fn to_bitmask(self) -> i64 { + let mut out = 0; + for val in self { + out |= val.to_bitmask(); + } + out + } +} + + /// Create operation that performs a expression that writes to record bin. -pub fn write_exp<'a>( +pub fn write_exp<'a, E: ToExpWriteFlagBitmask>( bin: &'a str, exp: &'a FilterExpression, - flags: ExpWriteFlags, + flags: E, ) -> Operation<'a> { let op = ExpOperation { encoder: Box::new(pack_write_exp), - policy: flags as i64, + policy: flags.to_bitmask(), exp, }; Operation { @@ -99,14 +145,14 @@ pub fn write_exp<'a>( } /// Create operation that performs a read expression. -pub fn read_exp<'a>( +pub fn read_exp<'a, E: ToExpReadFlagBitmask>( name: &'a str, exp: &'a FilterExpression, - flags: ExpReadFlags, + flags: E, ) -> Operation<'a> { let op = ExpOperation { encoder: Box::new(pack_read_exp), - policy: flags as i64, + policy: flags.to_bitmask(), exp, }; Operation { diff --git a/aerospike-core/src/operations/hll.rs b/aerospike-core/src/operations/hll.rs index b2f856d1..52c70174 100644 --- a/aerospike-core/src/operations/hll.rs +++ b/aerospike-core/src/operations/hll.rs @@ -41,17 +41,46 @@ pub enum HLLWriteFlags { AllowFold = 8, } +/// Something that can be resolved into a set of ExpWriteFlags. Either a single HLLWriteFlags, Option, [HLLWriteFlags], etc. +pub trait ToHLLWriteFlagsBitmask { + /// Convert to an i64 bitmask + fn to_bitmask(self) -> i64; +} + +impl ToHLLWriteFlagsBitmask for HLLWriteFlags { + fn to_bitmask(self) -> i64 { + self as i64 + } +} + +impl> ToHLLWriteFlagsBitmask for T { + fn to_bitmask(self) -> i64 { + let mut out = 0; + for val in self { + out |= val.to_bitmask(); + } + out + } +} + /// `HLLPolicy` operation policy. #[derive(Debug, Clone, Copy)] pub struct HLLPolicy { /// CdtListWriteFlags - pub flags: HLLWriteFlags, + pub flags: i64, } impl HLLPolicy { /// Use specified `HLLWriteFlags` when performing `HLL` operations pub const fn new(write_flags: HLLWriteFlags) -> Self { - HLLPolicy { flags: write_flags } + HLLPolicy { flags: write_flags as i64 } + } + + /// Use specified `HLLWriteFlags` or combination thereof when performing `HLL` operations + pub fn new_with_flags(write_flags: HWF) -> Self { + HLLPolicy { + flags: write_flags.to_bitmask(), + } } } diff --git a/aerospike-core/src/operations/lists.rs b/aerospike-core/src/operations/lists.rs index e1576d66..731882cc 100644 --- a/aerospike-core/src/operations/lists.rs +++ b/aerospike-core/src/operations/lists.rs @@ -84,7 +84,7 @@ pub enum ListOrderType { Ordered = 1, } -/// `CdtListReturnType` determines the returned values in CDT List operations. +/// `Cdtu64` determines the returned values in CDT List operations. #[derive(Debug, Clone, Copy)] pub enum ListReturnType { /// Do not return a result. @@ -119,6 +119,29 @@ pub enum ListReturnType { Inverted = 0x10000, } +#[derive(Debug, Clone, Copy)] +/// Inverts the returned values in CDT List operations. +pub struct InvertedListReturn(ListReturnType); + +/// Something that can be resolved into a set of ListReturnType. Either a single ListReturnType, or InvertedListReturn(ListReturnType). +pub trait ToListReturnTypeBitmask { + /// Convert to an u64 bitmask + fn to_bitmask(self) -> i64; +} + +impl ToListReturnTypeBitmask for ListReturnType { + fn to_bitmask(self) -> i64 { + self as i64 + } +} + +impl ToListReturnTypeBitmask for InvertedListReturn { + fn to_bitmask(self) -> i64 { + ListReturnType::Inverted as i64 ^ self.0.to_bitmask() + } +} + + /// `CdtListSortFlags` determines sort flags for CDT lists #[derive(Debug, Clone, Copy)] pub enum ListSortFlags { @@ -153,16 +176,51 @@ pub struct ListPolicy { /// CdtListOrderType pub attributes: ListOrderType, /// CdtListWriteFlags - pub flags: ListWriteFlags, + pub flags: u8, +} + + +/// Something that can be resolved into a set of ExpWriteFlags. Either a single ListWriteFlags, Option, [ListWriteFlags], etc. +pub trait ToListWriteFlagsBitmask { + /// Convert to an u8 bitmask potentially containing multiple flags + fn to_bitmask(self) -> u8; +} + +impl ToListWriteFlagsBitmask for ListWriteFlags { + fn to_bitmask(self) -> u8 { + self as u8 + } +} + +impl> ToListWriteFlagsBitmask for T { + fn to_bitmask(self) -> u8 { + let mut out = 0; + for val in self { + out |= val.to_bitmask(); + } + out + } } + impl ListPolicy { /// Create unique key list with specified order when list does not exist. /// Use specified write mode when writing list items. pub const fn new(order: ListOrderType, write_flags: ListWriteFlags) -> Self { ListPolicy { attributes: order, - flags: write_flags, + flags: write_flags as u8, + + } + } + + /// Create unique key list with specified order when list does not exist. + /// Use specified write mode when writing list items. + /// This is non-const, but allows specifying multiple flags. + pub fn new_with_flags(order: ListOrderType, write_flags: LWF) -> Self { + ListPolicy { + attributes: order, + flags: write_flags.to_bitmask(), } } } @@ -423,16 +481,16 @@ pub fn remove_range_from(bin: &str, index: i64) -> Operation { /// Create list remove value operation. Server removes all items that are equal to the /// specified value. Server returns the number of items removed. -pub fn remove_by_value<'a>( +pub fn remove_by_value<'a, TLR: ToListReturnTypeBitmask>( bin: &'a str, value: &'a Value, - return_type: ListReturnType, + return_type: TLR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtListOpType::RemoveByValue as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(value), ], }; @@ -446,16 +504,16 @@ pub fn remove_by_value<'a>( /// Create list remove by value list operation. Server removes all items that are equal to /// one of the specified values. Server returns the number of items removed -pub fn remove_by_value_list<'a>( +pub fn remove_by_value_list<'a, TLR: ToListReturnTypeBitmask>( bin: &'a str, values: &'a [Value], - return_type: ListReturnType, + return_type: TLR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtListOpType::RemoveByValueList as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::List(values), ], }; @@ -472,9 +530,9 @@ pub fn remove_by_value_list<'a>( /// If valueBegin is nil, the range is less than valueEnd. /// If valueEnd is nil, the range is greater than equal to valueBegin. /// Server returns removed data specified by returnType -pub fn remove_by_value_range<'a>( +pub fn remove_by_value_range<'a, TLR: ToListReturnTypeBitmask>( bin: &'a str, - return_type: ListReturnType, + return_type: TLR, begin: &'a Value, end: &'a Value, ) -> Operation<'a> { @@ -482,7 +540,7 @@ pub fn remove_by_value_range<'a>( op: CdtListOpType::RemoveByValueInterval as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(begin), CdtArgument::Value(end), ], @@ -509,9 +567,9 @@ pub fn remove_by_value_range<'a>( /// (3,3) = [11,15] /// (3,-3) = [0,4,5,9,11,15] /// ``` -pub fn remove_by_value_relative_rank_range<'a>( +pub fn remove_by_value_relative_rank_range<'a, TLR: ToListReturnTypeBitmask>( bin: &'a str, - return_type: ListReturnType, + return_type: TLR, value: &'a Value, rank: i64, ) -> Operation<'a> { @@ -519,7 +577,7 @@ pub fn remove_by_value_relative_rank_range<'a>( op: CdtListOpType::RemoveByValueRelRankRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(value), CdtArgument::Int(rank), ], @@ -546,9 +604,9 @@ pub fn remove_by_value_relative_rank_range<'a>( /// (3,3,7) = [11,15] /// (3,-3,2) = [] /// ``` -pub fn remove_by_value_relative_rank_range_count<'a>( +pub fn remove_by_value_relative_rank_range_count<'a, TLR: ToListReturnTypeBitmask>( bin: &'a str, - return_type: ListReturnType, + return_type: TLR, value: &'a Value, rank: i64, count: i64, @@ -557,7 +615,7 @@ pub fn remove_by_value_relative_rank_range_count<'a>( op: CdtListOpType::RemoveByValueRelRankRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(value), CdtArgument::Int(rank), CdtArgument::Int(count), @@ -573,12 +631,12 @@ pub fn remove_by_value_relative_rank_range_count<'a>( /// Creates a list remove operation. /// Server removes list item identified by index and returns removed data specified by returnType. -pub fn remove_by_index(bin: &str, index: i64, return_type: ListReturnType) -> Operation { +pub fn remove_by_index(bin: &str, index: i64, return_type: TLR) -> Operation { let cdt_op = CdtOperation { op: CdtListOpType::RemoveByIndex as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(index), ], }; @@ -593,12 +651,12 @@ pub fn remove_by_index(bin: &str, index: i64, return_type: ListReturnType) -> Op /// Creates a list remove operation. /// Server removes list items starting at specified index to the end of list and returns removed /// data specified by returnType. -pub fn remove_by_index_range(bin: &str, index: i64, return_type: ListReturnType) -> Operation { +pub fn remove_by_index_range(bin: &str, index: i64, return_type: TLR) -> Operation { let cdt_op = CdtOperation { op: CdtListOpType::RemoveByIndexRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(index), ], }; @@ -612,17 +670,17 @@ pub fn remove_by_index_range(bin: &str, index: i64, return_type: ListReturnType) /// Creates a list remove operation. /// Server removes "count" list items starting at specified index and returns removed data specified by returnType. -pub fn remove_by_index_range_count( +pub fn remove_by_index_range_count( bin: &str, index: i64, count: i64, - return_type: ListReturnType, + return_type: TLR, ) -> Operation { let cdt_op = CdtOperation { op: CdtListOpType::RemoveByIndexRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(index), CdtArgument::Int(count), ], @@ -637,11 +695,11 @@ pub fn remove_by_index_range_count( /// Creates a list remove operation. /// Server removes list item identified by rank and returns removed data specified by returnType. -pub fn remove_by_rank(bin: &str, rank: i64, return_type: ListReturnType) -> Operation { +pub fn remove_by_rank(bin: &str, rank: i64, return_type: TLR) -> Operation { let cdt_op = CdtOperation { op: CdtListOpType::RemoveByRank as u8, encoder: Box::new(pack_cdt_op), - args: vec![CdtArgument::Byte(return_type as u8), CdtArgument::Int(rank)], + args: vec![CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(rank)], }; Operation { op: OperationType::CdtWrite, @@ -654,11 +712,11 @@ pub fn remove_by_rank(bin: &str, rank: i64, return_type: ListReturnType) -> Oper /// Creates a list remove operation. /// Server removes list items starting at specified rank to the last ranked item and returns removed /// data specified by returnType. -pub fn remove_by_rank_range(bin: &str, rank: i64, return_type: ListReturnType) -> Operation { +pub fn remove_by_rank_range(bin: &str, rank: i64, return_type: TLR) -> Operation { let cdt_op = CdtOperation { op: CdtListOpType::RemoveByRankRange as u8, encoder: Box::new(pack_cdt_op), - args: vec![CdtArgument::Byte(return_type as u8), CdtArgument::Int(rank)], + args: vec![CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(rank)], }; Operation { op: OperationType::CdtWrite, @@ -670,17 +728,17 @@ pub fn remove_by_rank_range(bin: &str, rank: i64, return_type: ListReturnType) - /// Creates a list remove operation. /// Server removes "count" list items starting at specified rank and returns removed data specified by returnType. -pub fn remove_by_rank_range_count( +pub fn remove_by_rank_range_count( bin: &str, rank: i64, count: i64, - return_type: ListReturnType, + return_type: TLR, ) -> Operation { let cdt_op = CdtOperation { op: CdtListOpType::RemoveByRankRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(rank), CdtArgument::Int(count), ], @@ -831,16 +889,16 @@ pub fn get_range_from(bin: &str, index: i64) -> Operation { /// Creates a list get by value operation. /// Server selects list items identified by value and returns selected data specified by returnType. -pub fn get_by_value<'a>( +pub fn get_by_value<'a, TLR: ToListReturnTypeBitmask>( bin: &'a str, value: &'a Value, - return_type: ListReturnType, + return_type: TLR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtListOpType::GetByValue as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(value), ], }; @@ -855,16 +913,16 @@ pub fn get_by_value<'a>( /// Creates list get by value list operation. /// Server selects list items identified by values and returns selected data specified by returnType. -pub fn get_by_value_list<'a>( +pub fn get_by_value_list<'a, TLR: ToListReturnTypeBitmask>( bin: &'a str, values: &'a [Value], - return_type: ListReturnType, + return_type: TLR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtListOpType::GetByValueList as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::List(values), ], }; @@ -881,17 +939,17 @@ pub fn get_by_value_list<'a>( /// If valueBegin is null, the range is less than valueEnd. /// If valueEnd is null, the range is greater than equal to valueBegin. /// Server returns selected data specified by returnType. -pub fn get_by_value_range<'a>( +pub fn get_by_value_range<'a, TLR: ToListReturnTypeBitmask>( bin: &'a str, begin: &'a Value, end: &'a Value, - return_type: ListReturnType, + return_type: TLR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtListOpType::GetByValueInterval as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(begin), CdtArgument::Value(end), ], @@ -906,12 +964,12 @@ pub fn get_by_value_range<'a>( /// Creates list get by index operation. /// Server selects list item identified by index and returns selected data specified by returnType -pub fn get_by_index(bin: &str, index: i64, return_type: ListReturnType) -> Operation { +pub fn get_by_index(bin: &str, index: i64, return_type: TLR) -> Operation { let cdt_op = CdtOperation { op: CdtListOpType::GetByIndex as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(index), ], }; @@ -927,12 +985,12 @@ pub fn get_by_index(bin: &str, index: i64, return_type: ListReturnType) -> Opera /// Creates list get by index range operation. /// Server selects list items starting at specified index to the end of list and returns selected /// data specified by returnType. -pub fn get_by_index_range(bin: &str, index: i64, return_type: ListReturnType) -> Operation { +pub fn get_by_index_range(bin: &str, index: i64, return_type: TLR) -> Operation { let cdt_op = CdtOperation { op: CdtListOpType::GetByIndexRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(index), ], }; @@ -948,17 +1006,17 @@ pub fn get_by_index_range(bin: &str, index: i64, return_type: ListReturnType) -> /// Creates list get by index range operation. /// Server selects "count" list items starting at specified index and returns selected data specified /// by returnType. -pub fn get_by_index_range_count( +pub fn get_by_index_range_count( bin: &str, index: i64, count: i64, - return_type: ListReturnType, + return_type: TLR, ) -> Operation { let cdt_op = CdtOperation { op: CdtListOpType::GetByIndexRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(index), CdtArgument::Int(count), ], @@ -974,11 +1032,11 @@ pub fn get_by_index_range_count( /// Creates a list get by rank operation. /// Server selects list item identified by rank and returns selected data specified by returnType. -pub fn get_by_rank(bin: &str, rank: i64, return_type: ListReturnType) -> Operation { +pub fn get_by_rank(bin: &str, rank: i64, return_type: TLR) -> Operation { let cdt_op = CdtOperation { op: CdtListOpType::GetByRank as u8, encoder: Box::new(pack_cdt_op), - args: vec![CdtArgument::Byte(return_type as u8), CdtArgument::Int(rank)], + args: vec![CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(rank)], }; Operation { op: OperationType::CdtRead, @@ -991,11 +1049,11 @@ pub fn get_by_rank(bin: &str, rank: i64, return_type: ListReturnType) -> Operati /// Creates a list get by rank range operation. /// Server selects list items starting at specified rank to the last ranked item and returns selected /// data specified by returnType. -pub fn get_by_rank_range(bin: &str, rank: i64, return_type: ListReturnType) -> Operation { +pub fn get_by_rank_range(bin: &str, rank: i64, return_type: TLR) -> Operation { let cdt_op = CdtOperation { op: CdtListOpType::GetByRankRange as u8, encoder: Box::new(pack_cdt_op), - args: vec![CdtArgument::Byte(return_type as u8), CdtArgument::Int(rank)], + args: vec![CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(rank)], }; Operation { op: OperationType::CdtRead, @@ -1007,17 +1065,17 @@ pub fn get_by_rank_range(bin: &str, rank: i64, return_type: ListReturnType) -> O /// Creates a list get by rank range operation. /// Server selects "count" list items starting at specified rank and returns selected data specified by returnType. -pub fn get_by_rank_range_count( +pub fn get_by_rank_range_count( bin: &str, rank: i64, count: i64, - return_type: ListReturnType, + return_type: TLR, ) -> Operation { let cdt_op = CdtOperation { op: CdtListOpType::GetByRankRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(rank), CdtArgument::Int(count), ], @@ -1044,17 +1102,17 @@ pub fn get_by_rank_range_count( /// (3,3) = [11,15] /// (3,-3) = [0,4,5,9,11,15] /// ``` -pub fn get_by_value_relative_rank_range<'a>( +pub fn get_by_value_relative_rank_range<'a, TLR: ToListReturnTypeBitmask>( bin: &'a str, value: &'a Value, rank: i64, - return_type: ListReturnType, + return_type: TLR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtListOpType::GetByValueRelRankRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(value), CdtArgument::Int(rank), ], @@ -1081,18 +1139,18 @@ pub fn get_by_value_relative_rank_range<'a>( /// (3,3,7) = [11,15] /// (3,-3,2) = [] /// ``` -pub fn get_by_value_relative_rank_range_count<'a>( +pub fn get_by_value_relative_rank_range_count<'a, TLR: ToListReturnTypeBitmask>( bin: &'a str, value: &'a Value, rank: i64, count: i64, - return_type: ListReturnType, + return_type: TLR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtListOpType::GetByValueRelRankRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(value), CdtArgument::Int(rank), CdtArgument::Int(count), diff --git a/aerospike-core/src/operations/maps.rs b/aerospike-core/src/operations/maps.rs index 7b7eac41..427bcaf6 100644 --- a/aerospike-core/src/operations/maps.rs +++ b/aerospike-core/src/operations/maps.rs @@ -152,6 +152,28 @@ pub enum MapReturnType { Inverted = 0x10000, } +#[derive(Debug, Clone, Copy)] +/// Inverts the returned values in CDT List operations. +pub struct InvertedMapReturn(MapReturnType); + +/// Something that can be resolved into a set of MapReturnType. Either a single MapReturnType, or InvertedMapReturn(MapReturnType). +pub trait ToMapReturnTypeBitmask { + /// Convert to an u64 bitmask + fn to_bitmask(self) -> i64; +} + +impl ToMapReturnTypeBitmask for MapReturnType { + fn to_bitmask(self) -> i64 { + self as i64 + } +} + +impl ToMapReturnTypeBitmask for InvertedMapReturn { + fn to_bitmask(self) -> i64 { + MapReturnType::Inverted as i64 ^ self.0.to_bitmask() + } +} + /// Unique key map write type. #[derive(Debug, Clone, Copy)] pub enum MapWriteMode { @@ -392,16 +414,16 @@ pub fn clear(bin: &str) -> Operation { /// Create map remove operation. Server removes the map item identified by the key and returns /// the removed data specified by `return_type`. -pub fn remove_by_key<'a>( +pub fn remove_by_key<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, key: &'a Value, - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtMapOpType::RemoveByKey as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(key), ], }; @@ -415,16 +437,16 @@ pub fn remove_by_key<'a>( /// Create map remove operation. Server removes map items identified by keys and returns /// removed data specified by `return_type`. -pub fn remove_by_key_list<'a>( +pub fn remove_by_key_list<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, keys: &'a [Value], - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtMapOpType::RemoveKeyList as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::List(keys), ], }; @@ -440,14 +462,14 @@ pub fn remove_by_key_list<'a>( /// (`begin` inclusive, `end` exclusive). If `begin` is `Value::Nil`, the range is less than /// `end`. If `end` is `Value::Nil`, the range is greater than equal to `begin`. Server returns /// removed data specified by `return_type`. -pub fn remove_by_key_range<'a>( +pub fn remove_by_key_range<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, begin: &'a Value, end: &'a Value, - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let mut args = vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(begin), ]; if !end.is_nil() { @@ -468,16 +490,16 @@ pub fn remove_by_key_range<'a>( /// Create map remove operation. Server removes the map items identified by value and returns /// the removed data specified by `return_type`. -pub fn remove_by_value<'a>( +pub fn remove_by_value<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, value: &'a Value, - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtMapOpType::RemoveByValue as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(value), ], }; @@ -491,16 +513,16 @@ pub fn remove_by_value<'a>( /// Create map remove operation. Server removes the map items identified by values and returns /// the removed data specified by `return_type`. -pub fn remove_by_value_list<'a>( +pub fn remove_by_value_list<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, values: &'a [Value], - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtMapOpType::RemoveValueList as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::List(values), ], }; @@ -516,14 +538,14 @@ pub fn remove_by_value_list<'a>( /// inclusive, `end` exclusive). If `begin` is `Value::Nil`, the range is less than `end`. If /// `end` is `Value::Nil`, the range is greater than equal to `begin`. Server returns the /// removed data specified by `return_type`. -pub fn remove_by_value_range<'a>( +pub fn remove_by_value_range<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, begin: &'a Value, end: &'a Value, - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let mut args = vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(begin), ]; if !end.is_nil() { @@ -544,12 +566,12 @@ pub fn remove_by_value_range<'a>( /// Create map remove operation. Server removes the map item identified by the index and return /// the removed data specified by `return_type`. -pub fn remove_by_index(bin: &str, index: i64, return_type: MapReturnType) -> Operation { +pub fn remove_by_index(bin: &str, index: i64, return_type: TMR) -> Operation { let cdt_op = CdtOperation { op: CdtMapOpType::RemoveByIndex as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(index), ], }; @@ -563,17 +585,17 @@ pub fn remove_by_index(bin: &str, index: i64, return_type: MapReturnType) -> Ope /// Create map remove operation. Server removes `count` map items starting at the specified /// index and returns the removed data specified by `return_type`. -pub fn remove_by_index_range( +pub fn remove_by_index_range( bin: &str, index: i64, count: i64, - return_type: MapReturnType, + return_type: TMR, ) -> Operation { let cdt_op = CdtOperation { op: CdtMapOpType::RemoveByIndexRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(index), CdtArgument::Int(count), ], @@ -588,12 +610,12 @@ pub fn remove_by_index_range( /// Create map remove operation. Server removes the map items starting at the specified index /// to the end of the map and returns the removed data specified by `return_type`. -pub fn remove_by_index_range_from(bin: &str, index: i64, return_type: MapReturnType) -> Operation { +pub fn remove_by_index_range_from(bin: &str, index: i64, return_type: TMR) -> Operation { let cdt_op = CdtOperation { op: CdtMapOpType::RemoveByIndexRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(index), ], }; @@ -607,11 +629,11 @@ pub fn remove_by_index_range_from(bin: &str, index: i64, return_type: MapReturnT /// Create map remove operation. Server removes the map item identified by rank and returns the /// removed data specified by `return_type`. -pub fn remove_by_rank(bin: &str, rank: i64, return_type: MapReturnType) -> Operation { +pub fn remove_by_rank(bin: &str, rank: i64, return_type: TMR) -> Operation { let cdt_op = CdtOperation { op: CdtMapOpType::RemoveByRank as u8, encoder: Box::new(pack_cdt_op), - args: vec![CdtArgument::Byte(return_type as u8), CdtArgument::Int(rank)], + args: vec![CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(rank)], }; Operation { op: OperationType::CdtWrite, @@ -623,17 +645,17 @@ pub fn remove_by_rank(bin: &str, rank: i64, return_type: MapReturnType) -> Opera /// Create map remove operation. Server removes `count` map items starting at the specified /// rank and returns the removed data specified by `return_type`. -pub fn remove_by_rank_range( +pub fn remove_by_rank_range( bin: &str, rank: i64, count: i64, - return_type: MapReturnType, + return_type: TMR, ) -> Operation { let cdt_op = CdtOperation { op: CdtMapOpType::RemoveByRankRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(rank), CdtArgument::Int(count), ], @@ -648,11 +670,11 @@ pub fn remove_by_rank_range( /// Create map remove operation. Server removes the map items starting at the specified rank to /// the last ranked item and returns the removed data specified by `return_type`. -pub fn remove_by_rank_range_from(bin: &str, rank: i64, return_type: MapReturnType) -> Operation { +pub fn remove_by_rank_range_from(bin: &str, rank: i64, return_type: TMR) -> Operation { let cdt_op = CdtOperation { op: CdtMapOpType::RemoveByRankRange as u8, encoder: Box::new(pack_cdt_op), - args: vec![CdtArgument::Byte(return_type as u8), CdtArgument::Int(rank)], + args: vec![CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(rank)], }; Operation { op: OperationType::CdtWrite, @@ -679,12 +701,12 @@ pub fn size(bin: &str) -> Operation { /// Create map get by key operation. Server selects the map item identified by the key and /// returns the selected data specified by `return_type`. -pub fn get_by_key<'a>(bin: &'a str, key: &'a Value, return_type: MapReturnType) -> Operation<'a> { +pub fn get_by_key<'a, TMR: ToMapReturnTypeBitmask>(bin: &'a str, key: &'a Value, return_type: TMR) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtMapOpType::GetByKey as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(key), ], }; @@ -700,14 +722,14 @@ pub fn get_by_key<'a>(bin: &'a str, key: &'a Value, return_type: MapReturnType) /// range (`begin` inclusive, `end` exclusive). If `begin` is `Value::Nil`, the range is less /// than `end`. If `end` is `Value::Nil` the range is greater than equal to `begin`. Server /// returns the selected data specified by `return_type`. -pub fn get_by_key_range<'a>( +pub fn get_by_key_range<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, begin: &'a Value, end: &'a Value, - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let mut args = vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(begin), ]; if !end.is_nil() { @@ -728,16 +750,16 @@ pub fn get_by_key_range<'a>( /// Create map get by value operation. Server selects the map items identified by value and /// returns the selected data specified by `return_type`. -pub fn get_by_value<'a>( +pub fn get_by_value<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, value: &'a Value, - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtMapOpType::GetByValue as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(value), ], }; @@ -753,14 +775,14 @@ pub fn get_by_value<'a>( /// value range (`begin` inclusive, `end` exclusive). If `begin` is `Value::Nil`, the range is /// less than `end`. If `end` is `Value::Nil`, the range is greater than equal to `begin`. /// Server returns the selected data specified by `return_type`. -pub fn get_by_value_range<'a>( +pub fn get_by_value_range<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, begin: &'a Value, end: &'a Value, - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let mut args = vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(begin), ]; if !end.is_nil() { @@ -781,12 +803,12 @@ pub fn get_by_value_range<'a>( /// Create map get by index operation. Server selects the map item identified by index and /// returns the selected data specified by `return_type`. -pub fn get_by_index(bin: &str, index: i64, return_type: MapReturnType) -> Operation { +pub fn get_by_index(bin: &str, index: i64, return_type: TMR) -> Operation { let cdt_op = CdtOperation { op: CdtMapOpType::GetByIndex as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(index), ], }; @@ -800,17 +822,17 @@ pub fn get_by_index(bin: &str, index: i64, return_type: MapReturnType) -> Operat /// Create map get by index range operation. Server selects `count` map items starting at the /// specified index and returns the selected data specified by `return_type`. -pub fn get_by_index_range( +pub fn get_by_index_range( bin: &str, index: i64, count: i64, - return_type: MapReturnType, + return_type: TMR, ) -> Operation { let cdt_op = CdtOperation { op: CdtMapOpType::GetByIndexRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(index), CdtArgument::Int(count), ], @@ -826,12 +848,12 @@ pub fn get_by_index_range( /// Create map get by index range operation. Server selects the map items starting at the /// specified index to the end of the map and returns the selected data specified by /// `return_type`. -pub fn get_by_index_range_from(bin: &str, index: i64, return_type: MapReturnType) -> Operation { +pub fn get_by_index_range_from(bin: &str, index: i64, return_type: TMR) -> Operation { let cdt_op = CdtOperation { op: CdtMapOpType::GetByIndexRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(index), ], }; @@ -845,11 +867,11 @@ pub fn get_by_index_range_from(bin: &str, index: i64, return_type: MapReturnType /// Create map get by rank operation. Server selects the map item identified by rank and /// returns the selected data specified by `return_type`. -pub fn get_by_rank(bin: &str, rank: i64, return_type: MapReturnType) -> Operation { +pub fn get_by_rank(bin: &str, rank: i64, return_type: TMR) -> Operation { let cdt_op = CdtOperation { op: CdtMapOpType::GetByRank as u8, encoder: Box::new(pack_cdt_op), - args: vec![CdtArgument::Byte(return_type as u8), CdtArgument::Int(rank)], + args: vec![CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(rank)], }; Operation { op: OperationType::CdtRead, @@ -861,17 +883,17 @@ pub fn get_by_rank(bin: &str, rank: i64, return_type: MapReturnType) -> Operatio /// Create map get rank range operation. Server selects `count` map items at the specified /// rank and returns the selected data specified by `return_type`. -pub fn get_by_rank_range( +pub fn get_by_rank_range( bin: &str, rank: i64, count: i64, - return_type: MapReturnType, + return_type: TMR, ) -> Operation { let cdt_op = CdtOperation { op: CdtMapOpType::GetByRankRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(rank), CdtArgument::Int(count), ], @@ -887,11 +909,11 @@ pub fn get_by_rank_range( /// Create map get by rank range operation. Server selects the map items starting at the /// specified rank to the last ranked item and returns the selected data specified by /// `return_type`. -pub fn get_by_rank_range_from(bin: &str, rank: i64, return_type: MapReturnType) -> Operation { +pub fn get_by_rank_range_from(bin: &str, rank: i64, return_type: TMR) -> Operation { let cdt_op = CdtOperation { op: CdtMapOpType::GetByRankRange as u8, encoder: Box::new(pack_cdt_op), - args: vec![CdtArgument::Byte(return_type as u8), CdtArgument::Int(rank)], + args: vec![CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Int(rank)], }; Operation { op: OperationType::CdtRead, @@ -913,17 +935,17 @@ pub fn get_by_rank_range_from(bin: &str, rank: i64, return_type: MapReturnType) /// (5,-1) = [{4=2},{5=15},{9=10}] /// (3,2) = [{9=10}] /// (3,-2) = [{0=17},{4=2},{5=15},{9=10}] -pub fn remove_by_key_relative_index_range<'a>( +pub fn remove_by_key_relative_index_range<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, key: &'a Value, index: i64, - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtMapOpType::RemoveByKeyRelIndexRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(key), CdtArgument::Int(index), ], @@ -948,18 +970,18 @@ pub fn remove_by_key_relative_index_range<'a>( /// (5,-1,1) = [{4=2}] /// (3,2,1) = [{9=10}] /// (3,-2,2) = [{0=17}] -pub fn remove_by_key_relative_index_range_count<'a>( +pub fn remove_by_key_relative_index_range_count<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, key: &'a Value, index: i64, count: i64, - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtMapOpType::RemoveByKeyRelIndexRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(key), CdtArgument::Int(index), CdtArgument::Int(count), @@ -982,17 +1004,17 @@ pub fn remove_by_key_relative_index_range_count<'a>( /// (value,rank) = [removed items] /// (11,1) = [{0=17}] /// (11,-1) = [{9=10},{5=15},{0=17}] -pub fn remove_by_value_relative_rank_range<'a>( +pub fn remove_by_value_relative_rank_range<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, value: &'a Value, rank: i64, - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtMapOpType::RemoveByValueRelRankRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(value), CdtArgument::Int(rank), ], @@ -1014,18 +1036,18 @@ pub fn remove_by_value_relative_rank_range<'a>( /// (value,rank,count) = [removed items] /// (11,1,1) = [{0=17}] /// (11,-1,1) = [{9=10}] -pub fn remove_by_value_relative_rank_range_count<'a>( +pub fn remove_by_value_relative_rank_range_count<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, value: &'a Value, rank: i64, count: i64, - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtMapOpType::RemoveByValueRelRankRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(value), CdtArgument::Int(rank), CdtArgument::Int(count), @@ -1041,16 +1063,16 @@ pub fn remove_by_value_relative_rank_range_count<'a>( /// Creates a map get by key list operation. /// Server selects map items identified by keys and returns selected data specified by returnType. -pub fn get_by_key_list<'a>( +pub fn get_by_key_list<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, keys: &'a [Value], - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtMapOpType::GetByKeyList as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::List(keys), ], }; @@ -1064,16 +1086,16 @@ pub fn get_by_key_list<'a>( /// Creates a map get by value list operation. /// Server selects map items identified by values and returns selected data specified by returnType. -pub fn get_by_value_list<'a>( +pub fn get_by_value_list<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, values: &'a [Value], - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtMapOpType::GetByValueList as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::List(values), ], }; @@ -1097,17 +1119,17 @@ pub fn get_by_value_list<'a>( /// (5,-1) = [{4=2},{5=15},{9=10}] /// (3,2) = [{9=10}] /// (3,-2) = [{0=17},{4=2},{5=15},{9=10}] -pub fn get_by_key_relative_index_range<'a>( +pub fn get_by_key_relative_index_range<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, key: &'a Value, index: i64, - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtMapOpType::GetByKeyRelIndexRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(key), CdtArgument::Int(index), ], @@ -1132,18 +1154,18 @@ pub fn get_by_key_relative_index_range<'a>( /// (5,-1,1) = [{4=2}] /// (3,2,1) = [{9=10}] /// (3,-2,2) = [{0=17}] -pub fn get_by_key_relative_index_range_count<'a>( +pub fn get_by_key_relative_index_range_count<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, key: &'a Value, index: i64, count: i64, - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtMapOpType::GetByKeyRelIndexRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(key), CdtArgument::Int(index), CdtArgument::Int(count), @@ -1166,17 +1188,17 @@ pub fn get_by_key_relative_index_range_count<'a>( /// (value,rank) = [selected items] /// (11,1) = [{0=17}] /// (11,-1) = [{9=10},{5=15},{0=17}] -pub fn get_by_value_relative_rank_range<'a>( +pub fn get_by_value_relative_rank_range<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, value: &'a Value, rank: i64, - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtMapOpType::GetByValueRelRankRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(value), CdtArgument::Int(rank), ], @@ -1198,18 +1220,18 @@ pub fn get_by_value_relative_rank_range<'a>( /// (value,rank,count) = [selected items] /// (11,1,1) = [{0=17}] /// (11,-1,1) = [{9=10}] -pub fn get_by_value_relative_rank_range_count<'a>( +pub fn get_by_value_relative_rank_range_count<'a, TMR: ToMapReturnTypeBitmask>( bin: &'a str, value: &'a Value, rank: i64, count: i64, - return_type: MapReturnType, + return_type: TMR, ) -> Operation<'a> { let cdt_op = CdtOperation { op: CdtMapOpType::GetByValueRelRankRange as u8, encoder: Box::new(pack_cdt_op), args: vec![ - CdtArgument::Byte(return_type as u8), + CdtArgument::Int(return_type.to_bitmask()), CdtArgument::Value(value), CdtArgument::Int(rank), CdtArgument::Int(count), diff --git a/aerospike-core/src/policy/batch_policy.rs b/aerospike-core/src/policy/batch_policy.rs index e533434c..2bd21c32 100644 --- a/aerospike-core/src/policy/batch_policy.rs +++ b/aerospike-core/src/policy/batch_policy.rs @@ -16,6 +16,8 @@ use crate::expressions::FilterExpression; use crate::policy::{BasePolicy, Concurrency, PolicyLike}; +use super::Replica; + /// `BatchPolicy` encapsulates parameters for all batch operations. #[derive(Debug, Clone)] pub struct BatchPolicy { @@ -48,6 +50,9 @@ pub struct BatchPolicy { /// Optional Filter Expression pub filter_expression: Option, + + /// Defines algorithm used to determine the target node for a command. The replica algorithm only affects single record and batch commands. + pub replica: Replica, } impl BatchPolicy { @@ -70,6 +75,7 @@ impl Default for BatchPolicy { allow_inline: true, send_set_name: false, filter_expression: None, + replica: Replica::default(), } } } diff --git a/aerospike-core/src/policy/client_policy.rs b/aerospike-core/src/policy/client_policy.rs index 1ceb7a86..a5d3f8eb 100644 --- a/aerospike-core/src/policy/client_policy.rs +++ b/aerospike-core/src/policy/client_policy.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::time::Duration; use crate::commands::admin_command::AdminCommand; @@ -83,6 +83,15 @@ pub struct ClientPolicy { /// to join the client's view of the cluster. Should only be set when connecting to servers /// that support the "cluster-name" info command. pub cluster_name: Option, + + /// Mark this client as belonging to a rack, and track server rack data. This field is useful when directing read commands to + /// the server node that contains the key and exists on the same rack as the client. + /// This serves to lower cloud provider costs when nodes are distributed across different + /// racks/data centers. + /// + /// Replica.PreferRack and server rack configuration must + /// also be set to enable this functionality. + pub rack_ids: Option>, } impl Default for ClientPolicy { @@ -100,6 +109,7 @@ impl Default for ClientPolicy { thread_pool_size: 128, cluster_name: None, buffer_reclaim_threshold: 65536, + rack_ids: None, } } } diff --git a/aerospike-core/src/policy/concurrency.rs b/aerospike-core/src/policy/concurrency.rs index 3039f306..2ff09783 100644 --- a/aerospike-core/src/policy/concurrency.rs +++ b/aerospike-core/src/policy/concurrency.rs @@ -27,15 +27,4 @@ pub enum Concurrency { /// extremely large batch sizes because each node can process the request immediately. The /// downside is extra threads will need to be created (or takedn from a thread pool). Parallel, - - /// Issue up to N commands in parallel threads. When a request completes, a new request - /// will be issued until all threads are complete. This mode prevents too many parallel threads - /// being created for large cluster implementations. The downside is extra threads will still - /// need to be created (or taken from a thread pool). - /// - /// E.g. if there are 16 nodes/namespace combinations requested and concurrency is set to - /// `MaxThreads(8)`, then batch requests will be made for 8 node/namespace combinations in - /// parallel threads. When a request completes, a new request will be issued until all 16 - /// requests are complete. - MaxThreads(usize), } diff --git a/aerospike-core/src/policy/mod.rs b/aerospike-core/src/policy/mod.rs index 88687120..d133435c 100644 --- a/aerospike-core/src/policy/mod.rs +++ b/aerospike-core/src/policy/mod.rs @@ -117,6 +117,24 @@ where } } + +/// Defines algorithm used to determine the target node for a command. The replica algorithm only affects single record and batch commands. +#[derive(Debug, Copy, Clone)] +pub enum Replica { + /// Use node containing key's master partition. + Master, + /// Try node containing master partition first. If connection fails, all commands try nodes containing replicated partitions. If socketTimeout is reached, reads also try nodes containing replicated partitions, but writes remain on master node. + Sequence, + /// Try node on the same rack as the client first. If there are no nodes on the same rack, use SEQUENCE instead. + PreferRack, +} + +impl Default for Replica { + fn default() -> Self { + Replica::Sequence + } +} + /// Common parameters shared by all policy types. #[derive(Debug, Clone)] pub struct BasePolicy { diff --git a/aerospike-core/src/policy/read_policy.rs b/aerospike-core/src/policy/read_policy.rs index af2e04e7..f1567ba0 100644 --- a/aerospike-core/src/policy/read_policy.rs +++ b/aerospike-core/src/policy/read_policy.rs @@ -18,13 +18,22 @@ use crate::policy::BasePolicy; use crate::{ConsistencyLevel, Priority}; use std::time::Duration; +use super::{Replica, PolicyLike}; + /// `ReadPolicy` excapsulates parameters for transaction policy attributes /// used in all database operation calls. -pub type ReadPolicy = BasePolicy; +#[derive(Debug, Default)] +pub struct ReadPolicy { + /// Base policy instance + pub base_policy: BasePolicy, + + /// Defines algorithm used to determine the target node for a command. The replica algorithm only affects single record and batch commands. + pub replica: Replica, +} -impl Default for ReadPolicy { - fn default() -> ReadPolicy { - ReadPolicy { +impl Default for BasePolicy { + fn default() -> BasePolicy { + BasePolicy { priority: Priority::Default, timeout: Some(Duration::new(30, 0)), max_retries: Some(2), @@ -35,9 +44,15 @@ impl Default for ReadPolicy { } } -impl ReadPolicy { +impl BasePolicy { /// Get the Optional Filter Expression pub const fn filter_expression(&self) -> &Option { &self.filter_expression } } + +impl PolicyLike for ReadPolicy { + fn base(&self) -> &BasePolicy { + &self.base_policy + } +} diff --git a/aerospike-core/src/query/recordset.rs b/aerospike-core/src/query/recordset.rs index 9712473a..ce2ceecc 100644 --- a/aerospike-core/src/query/recordset.rs +++ b/aerospike-core/src/query/recordset.rs @@ -94,7 +94,7 @@ impl<'a> Iterator for &'a Recordset { fn next(&mut self) -> Option> { loop { if self.is_active() || !self.record_queue.is_empty() { - let result = self.record_queue.pop().ok(); + let result = self.record_queue.pop(); if result.is_some() { self.record_queue_count.fetch_sub(1, Ordering::Relaxed); return result; diff --git a/aerospike-core/src/value.rs b/aerospike-core/src/value.rs index 3aca7e1d..81d71ed6 100644 --- a/aerospike-core/src/value.rs +++ b/aerospike-core/src/value.rs @@ -21,8 +21,8 @@ use std::result::Result as StdResult; use byteorder::{ByteOrder, NetworkEndian}; -use ripemd160::digest::Digest; -use ripemd160::Ripemd160; +use ripemd::digest::Digest; +use ripemd::Ripemd160; use std::vec::Vec; @@ -311,15 +311,15 @@ impl Value { Value::Int(ref val) => { let mut buf = [0; 8]; NetworkEndian::write_i64(&mut buf, *val); - h.input(&buf); + h.update(&buf); Ok(()) } Value::String(ref val) => { - h.input(val.as_bytes()); + h.update(val.as_bytes()); Ok(()) } Value::Blob(ref val) => { - h.input(val); + h.update(val); Ok(()) } _ => panic!("Data type is not supported as Key value."), diff --git a/aerospike-sync/src/client.rs b/aerospike-sync/src/client.rs index 90d2cce7..213a6ab4 100644 --- a/aerospike-sync/src/client.rs +++ b/aerospike-sync/src/client.rs @@ -250,7 +250,7 @@ impl Client { &self, policy: &'a WritePolicy, key: &'a Key, - bins: &'a [Bin<'b>], + bins: &'a [Bin], ) -> Result<()> { block_on(self.async_client.put(policy, key, bins)) } @@ -281,7 +281,7 @@ impl Client { &self, policy: &'a WritePolicy, key: &'a Key, - bins: &'a [Bin<'b>], + bins: &'a [Bin], ) -> Result<()> { block_on(self.async_client.add(policy, key, bins)) } @@ -293,7 +293,7 @@ impl Client { &self, policy: &'a WritePolicy, key: &'a Key, - bins: &'a [Bin<'b>], + bins: &'a [Bin], ) -> Result<()> { block_on(self.async_client.append(policy, key, bins)) } @@ -305,7 +305,7 @@ impl Client { &self, policy: &'a WritePolicy, key: &'a Key, - bins: &'a [Bin<'b>], + bins: &'a [Bin], ) -> Result<()> { block_on(self.async_client.prepend(policy, key, bins)) } diff --git a/benches/client_server.rs b/benches/client_server.rs index 19263bf0..e12eeece 100644 --- a/benches/client_server.rs +++ b/benches/client_server.rs @@ -27,38 +27,40 @@ use bencher::Bencher; #[path = "../tests/common/mod.rs"] mod common; +use futures::executor::block_on; + lazy_static! { static ref TEST_SET: String = common::rand_str(10); } fn single_key_read(bench: &mut Bencher) { - let client = common::client(); + let client = block_on(common::client()); let namespace = common::namespace(); let key = as_key!(namespace, &TEST_SET, common::rand_str(10)); let wbin = as_bin!("i", 1); - let bins = vec![&wbin]; + let bins = vec![wbin]; let rpolicy = ReadPolicy::default(); let wpolicy = WritePolicy::default(); - client.put(&wpolicy, &key, &bins).unwrap(); + block_on(client.put(&wpolicy, &key, &bins)).unwrap(); - bench.iter(|| client.get(&rpolicy, &key, Bins::All).unwrap()); + bench.iter(|| block_on(client.get(&rpolicy, &key, Bins::All)).unwrap()); } fn single_key_read_header(bench: &mut Bencher) { - let client = common::client(); + let client = block_on(common::client()); let namespace = common::namespace(); let key = as_key!(namespace, &TEST_SET, common::rand_str(10)); let wbin = as_bin!("i", 1); - let bins = vec![&wbin]; + let bins = vec![wbin]; let rpolicy = ReadPolicy::default(); let wpolicy = WritePolicy::default(); - client.put(&wpolicy, &key, &bins).unwrap(); + block_on(client.put(&wpolicy, &key, &bins)).unwrap(); - bench.iter(|| client.get(&rpolicy, &key, Bins::None).unwrap()); + bench.iter(|| block_on(client.get(&rpolicy, &key, Bins::None)).unwrap()); } fn single_key_write(bench: &mut Bencher) { - let client = common::client(); + let client = block_on(common::client()); let namespace = common::namespace(); let key = as_key!(namespace, &TEST_SET, common::rand_str(10)); let wpolicy = WritePolicy::default(); @@ -70,7 +72,7 @@ fn single_key_write(bench: &mut Bencher) { let bins = [bin1, bin2, bin3, bin4]; bench.iter(|| { - client.put(&wpolicy, &key, &bins).unwrap(); + block_on(client.put(&wpolicy, &key, &bins)).unwrap(); }); } diff --git a/tests/src/exp.rs b/tests/src/exp.rs index 3a216138..402f55e6 100644 --- a/tests/src/exp.rs +++ b/tests/src/exp.rs @@ -645,11 +645,11 @@ async fn expression_commands() { // GET let key = as_key!(namespace, &set_name, 35); - rpolicy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(15))); + rpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(15))); let test = client.get(&rpolicy, &key, Bins::All).await; assert_eq!(test.is_err(), true, "GET Err Test Failed"); - rpolicy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(35))); + rpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(35))); let test = client.get(&rpolicy, &key, Bins::All).await; assert_eq!(test.is_ok(), true, "GET Ok Test Failed"); diff --git a/tools/benchmark/Cargo.toml b/tools/benchmark/Cargo.toml index f75b1690..8451fe8e 100644 --- a/tools/benchmark/Cargo.toml +++ b/tools/benchmark/Cargo.toml @@ -10,11 +10,12 @@ license = "Apache-2.0" [dependencies] clap = "2.33" log = "0.4" -env_logger = "0.7" +env_logger = "0.9" lazy_static = "1.4" num_cpus = "1.11" -rand = "0.7" +rand = "0.8" aerospike = { path = "../.." } +futures = {version = "0.3.16" } [[bin]] path = "src/main.rs" diff --git a/tools/benchmark/src/cli.rs b/tools/benchmark/src/cli.rs index 845826cd..f854253e 100644 --- a/tools/benchmark/src/cli.rs +++ b/tools/benchmark/src/cli.rs @@ -22,7 +22,7 @@ use num_cpus; use workers::Workload; -const AFTER_HELP: &'static str = r###" +const AFTER_HELP: &str = r###" SETTING SEED HOSTS: diff --git a/tools/benchmark/src/main.rs b/tools/benchmark/src/main.rs index 638e2be7..62271c6b 100644 --- a/tools/benchmark/src/main.rs +++ b/tools/benchmark/src/main.rs @@ -41,6 +41,7 @@ use cli::Options; use generator::KeyPartitions; use stats::Collector; use workers::Worker; +use futures::executor::block_on; fn main() { let _ = env_logger::try_init(); @@ -53,7 +54,7 @@ fn main() { fn connect(options: &Options) -> Client { let mut policy = ClientPolicy::default(); policy.conn_pools_per_node = options.conn_pools_per_node; - Client::new(&policy, &options.hosts).unwrap() + block_on(Client::new(&policy, &options.hosts)).unwrap() } fn run_workload(client: Client, opts: Options) { diff --git a/tools/benchmark/src/workers.rs b/tools/benchmark/src/workers.rs index 7949e0d7..0b5ffb83 100644 --- a/tools/benchmark/src/workers.rs +++ b/tools/benchmark/src/workers.rs @@ -28,6 +28,7 @@ use aerospike::{Client, ErrorKind, Key, ReadPolicy, ResultCode, WritePolicy}; use generator::KeyRange; use percent::Percent; use stats::Histogram; +use futures::executor::block_on; lazy_static! { // How frequently workers send stats to the collector @@ -51,7 +52,7 @@ impl FromStr for Workload { match parts.next() { Some("RU") => { let read_pct = Percent::from_str(parts.next().unwrap_or("100"))?; - Ok(Workload::ReadUpdate { read_pct: read_pct }) + Ok(Workload::ReadUpdate { read_pct }) } Some("I") => Ok(Workload::Initialize), _ => Err(String::from("Invalid workload definition")), @@ -78,7 +79,7 @@ impl Worker { Worker { histogram: Histogram::new(), collector: sender, - task: task, + task, } } @@ -128,7 +129,7 @@ pub struct InsertTask { impl InsertTask { pub fn new(client: Arc) -> Self { InsertTask { - client: client, + client, policy: WritePolicy::default(), } } @@ -138,7 +139,7 @@ impl Task for InsertTask { fn execute(&self, key: &Key) -> Status { let bin = as_bin!("int", random::()); trace!("Inserting {}", key); - self.status(self.client.put(&self.policy, key, &[&bin])) + self.status(block_on(self.client.put(&self.policy, key, &[bin]))) } } @@ -152,10 +153,10 @@ pub struct ReadUpdateTask { impl ReadUpdateTask { pub fn new(client: Arc, reads: Percent) -> Self { ReadUpdateTask { - client: client, + client, rpolicy: ReadPolicy::default(), wpolicy: WritePolicy::default(), - reads: reads, + reads, } } } @@ -164,11 +165,11 @@ impl Task for ReadUpdateTask { fn execute(&self, key: &Key) -> Status { if self.reads >= random() { trace!("Reading {}", key); - self.status(self.client.get(&self.rpolicy, key, ["int"]).map(|_| ())) + self.status(block_on(self.client.get(&self.rpolicy, key, ["int"])).map(|_| ())) } else { trace!("Writing {}", key); let bin = as_bin!("int", random::()); - self.status(self.client.put(&self.wpolicy, key, &[&bin])) + self.status(block_on(self.client.put(&self.wpolicy, key, &[bin]))) } } }