diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 15174b58..061291c0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,7 +9,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, macos-latest, windows-latest] - toolchain: ["1.40.0", "stable"] + toolchain: ["1.51.0", "stable"] runs-on: ${{ matrix.os }} diff --git a/foundationdb-bindingtester/Cargo.toml b/foundationdb-bindingtester/Cargo.toml index 2144faa2..2136552b 100644 --- a/foundationdb-bindingtester/Cargo.toml +++ b/foundationdb-bindingtester/Cargo.toml @@ -37,3 +37,4 @@ futures = "0.3.0" log = "0.4.8" num-bigint = "0.3.0" structopt = "0.3.3" +async-trait = "0.1.48" diff --git a/foundationdb-bindingtester/README.md b/foundationdb-bindingtester/README.md index 2a89756f..c9e89b17 100644 --- a/foundationdb-bindingtester/README.md +++ b/foundationdb-bindingtester/README.md @@ -11,4 +11,5 @@ The following configurations are tested and should pass without any issue: ./bindingtester.py --test-name scripted ./bindingtester.py --num-ops 1000 --test-name api --api-version 610 ./bindingtester.py --num-ops 1000 --concurrency 5 --test-name api --api-version 610 -``` +./bindingtester.py --num-ops 10000 --concurrency 1 --test-name directory --api-version 610 --no-directory-snapshot-ops +``` \ No newline at end of file diff --git a/foundationdb-bindingtester/src/main.rs b/foundationdb-bindingtester/src/main.rs index 69f38e03..6afeed32 100644 --- a/foundationdb-bindingtester/src/main.rs +++ b/foundationdb-bindingtester/src/main.rs @@ -27,13 +27,21 @@ static GOT_COMMITTED_VERSION: Element = static ERROR_NONE: Element = Element::Bytes(Bytes(Cow::Borrowed(b"ERROR: NONE"))); static ERROR_MULTIPLE: Element = Element::Bytes(Bytes(Cow::Borrowed(b"ERROR: MULTIPLE"))); static OK: Element = Element::Bytes(Bytes(Cow::Borrowed(b"OK"))); +static ERROR_DIRECTORY: Element = Element::Bytes(Bytes(Cow::Borrowed(b"DIRECTORY_ERROR"))); #[cfg(feature = "fdb-6_2")] static GOT_APPROXIMATE_SIZE: Element = Element::Bytes(Bytes(Cow::Borrowed(b"GOT_APPROXIMATE_SIZE"))); use crate::fdb::options::{MutationType, StreamingMode}; + +use foundationdb::directory::directory_layer::DirectoryLayer; +use foundationdb::directory::error::DirectoryError; +use foundationdb::directory::{Directory, DirectoryOutput}; +use foundationdb::tuple::{PackResult, TupleUnpack}; + use tuple::VersionstampOffset; + fn mutation_from_str(s: &str) -> MutationType { match s { "ADD" => MutationType::Add, @@ -95,6 +103,14 @@ impl std::fmt::Debug for Instr { } } +#[derive(Debug)] +enum DirectoryStackItem { + Directory(DirectoryLayer), + DirectoryOutput(DirectoryOutput), + Subspace(Subspace), + Null, +} + impl Instr { fn pop_database(&mut self) -> bool { if self.database { @@ -188,6 +204,39 @@ enum InstrCode { // misc UnitTests, + + // Directory/Subspace/Layer Creation + DirectoryCreateSubspace, + DirectoryCreateLayer, + DirectoryCreateOrOpen, + DirectoryCreate, + DirectoryOpen, + + // Directory Management + DirectoryChange, + DirectorySetErrorIndex, + + // Directory Operations + DirectoryMove, + DirectoryMoveTo, + DirectoryRemove, + DirectoryRemoveIfExists, + DirectoryList, + DirectoryExists, + + // Subspace operation + DirectoryPackKey, + DirectoryUnpackKey, + DirectoryRange, + DirectoryContains, + DirectoryOpenSubspace, + + // Directory Logging + DirectoryLogSubspace, + DirectoryLogDirectory, + + // Other + DirectoryStripPrefix, } fn has_opt<'a>(cmd: &'a str, opt: &'static str) -> (&'a str, bool) { @@ -263,6 +312,33 @@ impl Instr { "UNIT_TESTS" => UnitTests, + "DIRECTORY_CREATE_SUBSPACE" => DirectoryCreateSubspace, + "DIRECTORY_CREATE_LAYER" => DirectoryCreateLayer, + "DIRECTORY_CREATE_OR_OPEN" => DirectoryCreateOrOpen, + "DIRECTORY_CREATE" => DirectoryCreate, + "DIRECTORY_OPEN" => DirectoryOpen, + + "DIRECTORY_CHANGE" => DirectoryChange, + "DIRECTORY_SET_ERROR_INDEX" => DirectorySetErrorIndex, + + "DIRECTORY_MOVE" => DirectoryMove, + "DIRECTORY_MOVE_TO" => DirectoryMoveTo, + "DIRECTORY_REMOVE" => DirectoryRemove, + "DIRECTORY_REMOVE_IF_EXISTS" => DirectoryRemoveIfExists, + "DIRECTORY_LIST" => DirectoryList, + "DIRECTORY_EXISTS" => DirectoryExists, + + "DIRECTORY_PACK_KEY" => DirectoryPackKey, + "DIRECTORY_UNPACK_KEY" => DirectoryUnpackKey, + "DIRECTORY_RANGE" => DirectoryRange, + "DIRECTORY_CONTAINS" => DirectoryContains, + "DIRECTORY_OPEN_SUBSPACE" => DirectoryOpenSubspace, + + "DIRECTORY_LOG_SUBSPACE" => DirectoryLogSubspace, + "DIRECTORY_LOG_DIRECTORY" => DirectoryLogDirectory, + + "DIRECTORY_STRIP_PREFIX" => DirectoryStripPrefix, + name => unimplemented!("inimplemented instr: {}", name), }; Instr { @@ -462,6 +538,10 @@ struct StackMachine { threads: Vec>, trx_counter: usize, + + directory_stack: Vec, + directory_index: usize, + error_index: usize, } fn strinc(key: Bytes) -> Bytes { @@ -492,6 +572,10 @@ impl StackMachine { last_version: 0, threads: Vec::new(), trx_counter: 0, + + directory_stack: vec![DirectoryStackItem::Directory(DirectoryLayer::default())], + directory_index: 0, + error_index: 0, } } @@ -563,6 +647,41 @@ impl StackMachine { } } + async fn pop_optional_bytes(&mut self) -> Option> { + let element = self.pop_element().await; + match element { + Element::Bytes(v) => Some(v.to_vec()), + Element::Nil => None, + Element::String(_) => None, + Element::Tuple(_) => None, + Element::Int(_) => None, + Element::BigInt(_) => None, + Element::Float(_) => None, + Element::Double(_) => None, + Element::Bool(_) => None, + Element::Uuid(_) => None, + Element::Versionstamp(_) => None, + } + } + + async fn pop_string_tuple(&mut self, count: usize) -> Vec> { + let mut result = vec![]; + + if count == 0 { + result.push(vec![]); + } else { + for _i in 0..count { + let mut sub_result = vec![]; + let vec_size = self.pop_i64().await; + for _j in 0..vec_size { + sub_result.push(self.pop_str().await); + } + result.push(sub_result); + } + } + result + } + async fn pop_element(&mut self) -> Element<'static> { let item = self.pop().await; if let Some(data) = item.data { @@ -603,6 +722,27 @@ impl StackMachine { } } + fn push_directory_err(&mut self, code: &InstrCode, number: usize, err: DirectoryError) { + debug!("[{}] DIRECTORY_ERROR during {:?}: {:?}", number, code, err); + self.push(number, Element::Tuple(vec![ERROR_DIRECTORY.clone()])); + + if let InstrCode::DirectoryCreateSubspace + | InstrCode::DirectoryCreateOrOpen + | InstrCode::DirectoryCreateLayer + | InstrCode::DirectoryCreate + | InstrCode::DirectoryOpen + | InstrCode::DirectoryMove + | InstrCode::DirectoryMoveTo + | InstrCode::DirectoryOpenSubspace = code + { + debug!( + "pushed NULL in the directory_stack at index {} because of the error", + self.directory_stack.len() + ); + self.directory_stack.push(DirectoryStackItem::Null); + } + } + fn push_err(&mut self, number: usize, err: FdbError) { trace!("ERROR {:?}", err); let packed = pack(&Element::Tuple(vec![ @@ -634,6 +774,7 @@ impl StackMachine { let is_db = instr.pop_database(); let mut mutation = false; let mut pending = false; + let (mut trx, trx_name) = if is_db { ( TransactionState::Transaction(self.check(number, db.create_trx())?), @@ -1506,6 +1647,876 @@ impl StackMachine { // test_locality(db) // test_predicates() } + + // Pop 1 tuple off the stack as [path]. Pop 1 additional item as [raw_prefix]. + // Create a subspace with path as the prefix tuple and the specified + // raw_prefix. Append it to the directory list. + DirectoryCreateSubspace => { + let tuple_prefix = self.pop_string_tuple(1).await; + let raw_prefix = self.pop_bytes().await; + let subspace = + Subspace::from_bytes(&raw_prefix).subspace(tuple_prefix.get(0).unwrap()); + debug!( + "pushing a new subspace {:?} at index {}", + &subspace, + self.directory_stack.len() + ); + self.directory_stack + .push(DirectoryStackItem::Subspace(subspace)); + } + + // Pop 3 items off the stack as [index1, index2, allow_manual_prefixes]. Let + // node_subspace be the object in the directory list at index1 and + // content_subspace be the object in the directory list at index2. Create a new + // directory layer with the specified node_subspace and content_subspace. If + // allow_manual_prefixes is 1, then enable manual prefixes on the directory + // layer. Append the resulting directory layer to the directory list. + // + // If either of the two specified subspaces are null, then do not create a + // directory layer and instead push null onto the directory list. + DirectoryCreateLayer => { + let index_node_subspace = self.pop_usize().await; + let index_content_subspace = self.pop_usize().await; + let allow_manual_prefixes = self.pop_i64().await == 1; + + let node_subspace = self.directory_stack.get(index_node_subspace); + let content_subspace = self.directory_stack.get(index_content_subspace); + + if node_subspace.is_none() || content_subspace.is_none() { + error!( + "pushing null on the directory list: {}, {}", + node_subspace.is_some(), + content_subspace.is_some() + ); + self.directory_stack.push(DirectoryStackItem::Null); + } else { + let node_subspace = match node_subspace.unwrap() { + DirectoryStackItem::Subspace(s) => s.to_owned(), + _ => panic!("expecting subspace"), + }; + + let content_subspace = match content_subspace.unwrap() { + DirectoryStackItem::Subspace(s) => s.to_owned(), + _ => panic!("expecting subspace"), + }; + + debug!("pushed a directory at index {}", self.directory_stack.len()); + + self.directory_stack + .push(DirectoryStackItem::Directory(DirectoryLayer::new( + node_subspace, + content_subspace, + allow_manual_prefixes, + ))); + } + } + + // Pop 1 tuple off the stack as [path]. Pop 2 additional items as + // [layer, prefix]. create a directory with the specified path, layer, + // and prefix. If either of layer or prefix is null, use the default value for + // that parameter (layer='', prefix=null). + DirectoryCreate => { + let path = self.pop_string_tuple(1).await; + let layer = self.pop_optional_bytes().await; + let prefix = self.pop_optional_bytes().await; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + debug!( + "creating path {:?} with layer {:?} and prefix {:?} using directory at index {}", + path.get(0).unwrap(), + &layer, + &prefix, + self.directory_index, + ); + + match directory + .create( + txn, + (*path.get(0).unwrap().to_owned()).to_vec(), + prefix, + layer, + ) + .await + { + Ok(directory_subspace) => { + debug!( + "pushing DirectoryOutput at index {}", + self.directory_stack.len() + ); + self.directory_stack + .push(DirectoryStackItem::DirectoryOutput(directory_subspace)); + if is_db { + debug!("commiting local trx"); + local_trx.commit().await.expect("could not commit"); + } + } + Err(e) => { + self.push_directory_err(&instr.code, number, e); + } + }; + } + + // Pop 1 tuple off the stack as [path]. Pop 1 additional item as [layer]. Open + // a directory with the specified path and layer. If layer is null, use the + // default value (layer=''). + DirectoryOpen => { + let path = self.pop_string_tuple(1).await; + let bytes_layer = self.pop_bytes().await; + + let layer = if bytes_layer.is_empty() { + None + } else { + Some(bytes_layer.to_vec()) + }; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + debug!( + "opening path {:?} with layer {:?} with index {}", + path.get(0).unwrap(), + &layer, + self.directory_index + ); + + match directory + .open(txn, (*path.get(0).unwrap().to_owned()).to_vec(), layer) + .await + { + Ok(directory_subspace) => { + debug!( + "pushing newly opened DirectoryOutput at index {}", + self.directory_stack.len() + ); + self.directory_stack + .push(DirectoryStackItem::DirectoryOutput(directory_subspace)); + + if is_db { + debug!("commiting local trx"); + local_trx.commit().await.expect("could not commit"); + } + } + Err(e) => { + self.push_directory_err(&instr.code, number, e); + } + }; + } + + // Use the current directory for this operation. + // + // Pop 1 tuple off the stack as [path]. Pop 1 additional item as [layer]. Open + // a directory with the specified path and layer. If layer is null, use the + // default value (layer=''). + DirectoryCreateOrOpen => { + let path = self.pop_string_tuple(1).await; + let bytes_layer = self.pop_bytes().await; + + let layer = if bytes_layer.is_empty() { + None + } else { + Some(bytes_layer.to_vec()) + }; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + debug!( + "create_or_open path {:?} with layer {:?} with index {}", + path.get(0).unwrap(), + &layer, + self.directory_index + ); + match directory + .create_or_open( + txn, + (*path.get(0).unwrap().to_owned()).to_vec(), + None, + layer, + ) + .await + { + Ok(directory_subspace) => { + debug!( + "pushing created_or_opened {:?} at index {}", + &directory_subspace, + self.directory_stack.len() + ); + self.directory_stack + .push(DirectoryStackItem::DirectoryOutput(directory_subspace)); + if is_db { + debug!("commiting local trx"); + local_trx.commit().await.expect("could not commit"); + } + } + Err(e) => self.push_directory_err(&instr.code, number, e), + }; + } + + // Pop the top item off the stack as [index]. Set the current directory list + // index to index. In the event that the directory at this new index is null + // (as the result of a previous error), set the directory list index to the + // error index. + DirectoryChange => { + self.directory_index = self.pop_usize().await; + debug!("setting directory_index to {}", self.directory_index); + match self.directory_stack.get(self.directory_index) { + None => { + self.directory_index = self.error_index; + debug!( + "setting directory_index to error index {}: no directory found", + self.directory_index + ); + } + Some(d) => { + if let DirectoryStackItem::Null = d { + self.directory_index = self.error_index; + debug!( + "setting directory_index to error index {}: because it is Null", + self.directory_index + ); + } + } + } + } + + // Pop the top item off the stack as [error_index]. Set the current error index + // to error_index. + DirectorySetErrorIndex => { + self.error_index = self.pop_usize().await; + } + + // Use the current directory for this operation. + // + // Pop 2 tuples off the stack as [old_path, new_path]. Call move with the + // specified old_path and new_path. Append the result onto the directory list. + DirectoryMove => { + let paths = self.pop_string_tuple(2).await; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + debug!( + "moving {:?} to {:?} using directory at index {}", + paths.get(0).unwrap(), + paths.get(1).unwrap(), + self.directory_index + ); + + match directory + .move_to( + txn, + (*paths.get(0).unwrap().to_vec()).to_owned(), + (*paths.get(1).unwrap().to_vec()).to_owned(), + ) + .await + { + Ok(s) => { + debug!( + "pushing moved directory {:?} at index {}", + &s, + self.directory_stack.len() + ); + self.directory_stack + .push(DirectoryStackItem::DirectoryOutput(s)); + if is_db { + debug!("commiting local trx"); + local_trx.commit().await.expect("could not commit"); + } + } + Err(e) => { + self.push_directory_err(&instr.code, number, e); + } + }; + } + + // Use the current directory for this operation. + // + // Pop 1 tuple off the stack as [new_absolute_path]. Call moveTo with the + // specified new_absolute_path. Append the result onto the directory list. + DirectoryMoveTo => { + let paths = self.pop_string_tuple(1).await; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + debug!( + "moving directory {:?} to {:?}", + self.directory_index, &paths + ); + + match directory + .move_directory(txn, (*paths.get(0).unwrap().to_vec()).to_owned()) + .await + { + Ok(s) => { + debug!( + "pushing moved directory {:?} at index {}", + &s, + self.directory_stack.len() + ); + self.directory_stack + .push(DirectoryStackItem::DirectoryOutput(s)); + if is_db { + debug!("commiting local trx"); + local_trx.commit().await.expect("could not commit"); + } + } + Err(e) => { + self.push_directory_err(&instr.code, number, e); + } + }; + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [count] (either 0 or 1). If count is 1, pop 1 + // tuple off the stack as [path]. Call remove_if_exits, passing it path if one + // was popped. + DirectoryRemove => { + let count = self.pop_usize().await; + let paths = self.pop_string_tuple(count).await; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + let paths = paths.get(0).expect("could not retrieve a path"); + debug!( + "removing path {:?} using directory at index {}", + paths, self.directory_index + ); + match directory.remove(txn, paths.to_owned()).await { + Ok(deleted) => { + if !deleted { + self.push_directory_err( + &instr.code, + number, + DirectoryError::Other(String::from("directory does not exists")), + ); + } else if is_db { + local_trx.commit().await.expect("could not commit"); + } + } + Err(e) => { + self.push_directory_err(&instr.code, number, e); + } + } + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [count] (either 0 or 1). If count is 1, pop 1 + // tuple off the stack as [path]. Call remove_if_exits, passing it path if one + // was popped. + DirectoryRemoveIfExists => { + let count = self.pop_usize().await; + let paths = self.pop_string_tuple(count).await; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + let paths = paths.get(0).expect("could not retrieve a path"); + match directory.remove_if_exists(txn, paths.to_owned()).await { + Ok(_) => { + if is_db { + local_trx.commit().await.expect("could not commit"); + } + } + Err(err) => self.push_directory_err(&instr.code, number, err), + }; + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [count] (either 0 or 1). If count is 1, pop 1 + // tuple off the stack as [path]. Call list, passing it path if one was popped. + // Pack the resulting list of directories using the tuple layer and push the + // packed string onto the stack. + DirectoryList => { + let count = self.pop_usize().await; + let paths = match count { + 1 => { + let paths = self.pop_string_tuple(count).await; + paths.get(0).expect("could not retrieve a path").clone() + } + 0 => vec![], + _ => panic!(), + }; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + match directory.list(txn, paths.to_vec()).await { + Ok(children) => { + let mut elements: Vec = vec![]; + debug!( + "found {} items under path {:?} with directory at index {}:", + children.len(), + paths, + self.directory_index + ); + for child in children { + debug!("\t - {}", &child); + let element = Element::String(Cow::from(child)); + elements.push(element); + } + let tuple = Element::Tuple(elements); + self.push(number, Element::Bytes(Bytes::from(tuple.pack_to_vec()))); + if is_db { + debug!("commiting local trx"); + local_trx.commit().await.expect("could not commit"); + } + } + Err(e) => { + self.push_directory_err(&instr.code, number, e); + } + }; + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [count] (either 0 or 1). If count is 1, pop 1 + // tuple off the stack as [path]. Call exists, passing it path if one + // was popped. Push 1 onto the stack if the path exists and 0 if it does not. + DirectoryExists => { + let count = self.pop_usize().await; + + let paths = self.pop_string_tuple(count).await; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + let paths = paths.get(0).expect("could not retrieve a path"); + match directory.exists(txn, paths.to_owned()).await { + Ok(exists) => { + self.push(number, Element::Int(i64::from(exists))); + if is_db { + debug!("commiting local trx"); + local_trx.commit().await.expect("could not commit"); + } + } + Err(e) => { + self.push_directory_err(&instr.code, number, e); + } + }; + } + + // Use the current directory for this operation. + // + // Pop 1 tuple off the stack as [key_tuple]. Pack key_tuple and push the result + // onto the stack. + DirectoryPackKey => { + let n: usize = self.pop_usize().await; + debug!("DirectoryPackKey {}", n); + let mut buf = Vec::new(); + for _ in 0..n { + let element: Element = self.pop_element().await; + debug!(" - {:?}", element); + buf.push(element); + } + + let tuple = Element::Tuple(buf); + + match self.pack_with_current_subspace(&tuple) { + None => self.push_directory_err( + &instr.code, + number, + DirectoryError::Other(String::from("cannot pack with current item")), + ), + Some(bytes) => { + self.push(number, Element::Bytes(bytes.into())); + } + } + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [key]. Unpack key and push the resulting tuple + // onto the stack one item at a time. + DirectoryUnpackKey => { + let data = self.pop_bytes().await; + let data = data.to_vec(); + debug!("directory_unpack {:?}", data); + let data: Vec = self.unpack_with_current_subspace(&data).unwrap().unwrap(); + for element in data { + debug!(" - {:?}", element); + self.push(number, Element::Tuple(vec![element.into_owned()])); + } + } + + // Use the current directory for this operation. + // + // Pop 1 tuple off the stack as [tuple]. Create a range using tuple and push + // range.begin and range.end onto the stack. + DirectoryRange => { + let n: usize = self.pop_usize().await; + let mut buf = Vec::new(); + for _ in 0..n { + let element: Element = self.pop_element().await; + debug!(" - {:?}", element); + buf.push(element); + } + + match self.get_current_directory_item() { + Some(DirectoryStackItem::DirectoryOutput( + DirectoryOutput::DirectoryPartition(_d), + )) => { + self.push_directory_err( + &instr.code, + number, + DirectoryError::Other(String::from( + "operation not allowed on directoryPartition", + )), + ); + } + _ => { + let tuple = Element::Tuple(buf); + let subspace = self.subspace_with_current_item(&tuple).unwrap(); + let (begin_range, end_range) = subspace.range(); + self.push(number, Element::Bytes(begin_range.into())); + self.push(number, Element::Bytes(end_range.into())); + } + } + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [key]. Check if the current directory contains + // the specified key. Push 1 if it does and 0 if it doesn't. + DirectoryContains => { + let raw_prefix = self.pop_bytes().await; + let b = match self.get_current_directory_item() { + None => panic!("not found"), + Some(DirectoryStackItem::Subspace(s)) => s.is_start_of(&raw_prefix.to_vec()), + Some(DirectoryStackItem::DirectoryOutput(d)) => match d { + DirectoryOutput::DirectorySubspace(s) => { + s.is_start_of(&raw_prefix.to_vec()) + } + _ => panic!("not a DirectorySubspace"), + }, + _ => panic!("not found"), + }; + + self.push(number, Element::Int(b as i64)); + } + + // Use the current directory for this operation. + // + // Pop 1 tuple off the stack as [tuple]. Open the subspace of the current + // directory specified by tuple and push it onto the directory list. + DirectoryOpenSubspace => { + let n: usize = self.pop_usize().await; + debug!("DirectoryRange {}", n); + let mut buf = Vec::new(); + for _ in 0..n { + let element: Element = self.pop_element().await; + debug!(" - {:?}", element); + buf.push(element); + } + + let tuple = Element::Tuple(buf); + self.directory_stack.push(DirectoryStackItem::Subspace( + self.subspace_with_current_item(&tuple).unwrap(), + )); + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [prefix]. Let key equal + // prefix + tuple.pack([dir_index]). Set key to be the result of calling + // directory.key() in the current transaction. + DirectoryLogSubspace => { + debug!( + "logging subspace {}/{}", + self.directory_index, + self.directory_stack.len() + ); + let raw_prefix = self.pop_bytes().await; + let txn = match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }; + let key = Subspace::from_bytes(&*raw_prefix).pack(&self.directory_index); + + match self.directory_stack.get(self.directory_index) { + None => panic!("nothing in the stack"), + Some(DirectoryStackItem::Null) => panic!("Directory is NULL"), + Some(DirectoryStackItem::Directory(_)) => { + panic!("trying to get a subspace, got a Directory") + } + Some(DirectoryStackItem::DirectoryOutput( + DirectoryOutput::DirectorySubspace(d), + )) => { + txn.set(&key, d.bytes()); + debug!( + "logging subspace [{}] {:?}={:?}", + self.directory_index, + unpack::>(&key).unwrap(), + d.bytes(), + ); + } + Some(DirectoryStackItem::DirectoryOutput( + DirectoryOutput::DirectoryPartition(_), + )) => { + self.push_directory_err( + &instr.code, + number, + DirectoryError::Other(String::from( + "cannot get key for the root of a directory partition", + )), + ); + } + Some(DirectoryStackItem::Subspace(s)) => { + txn.set(&key, s.bytes()); + debug!( + "logging subspace [{}] {:?}={:?}", + self.directory_index, + unpack::>(&key).unwrap(), + s.bytes(), + ); + } + }; + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [raw_prefix]. Create a subspace log_subspace + // with path (dir_index) and the specified raw_prefix. Set: + // + // tr[log_subspace[u'path']] = the tuple packed path of the directory. + // + // tr[log_subspace[u'layer']] = the tuple packed layer of the directory. + // + // tr[log_subspace[u'exists']] = the packed tuple containing a 1 if the + // directory exists and 0 if it doesn't. + // + // tr[log_subspace[u'children']] the tuple packed list of children of the + // directory. + // + // Where log_subspace[u] is the subspace packed tuple containing only the + // single specified unicode string . + DirectoryLogDirectory => { + debug!("logging directory {}", self.directory_index); + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let txn = match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }; + + let raw_prefix = self.pop_bytes().await; + let subspace = Subspace::from_bytes(&*raw_prefix).subspace(&(self.directory_index)); + + let key_path = subspace.pack(&(String::from("path"))); + let value_path = pack(&self.get_path_for_current_directory().unwrap()); + + let key_layer = subspace.pack(&("layer")); + let value_layer = pack(&self.get_layer_for_current_directory().unwrap()); + + let exists = directory + .exists(&txn, vec![]) + .await + .expect("could not list directory"); + + let key_exists = subspace.pack(&(String::from("exists"))); + let value_exists = pack(&Element::Tuple(vec![Element::Int(match exists { + true => 1, + false => 0, + })])); + + let children = if exists { + directory.list(txn, vec![]).await.unwrap() + } else { + vec![] + }; + + let tuple_children = Element::Tuple( + children + .iter() + .map(|s| Element::String(Cow::Owned(s.clone()))) + .collect(), + ); + + let key_children = subspace.pack(&(String::from("children"))); + let value_children = pack(&tuple_children); + + txn.set(&key_path, &value_path); + debug!( + "[{}] {:?}={:?}", + self.directory_index, + unpack::>(&key_path).unwrap(), + unpack::>(&value_path).unwrap(), + ); + txn.set(&key_layer, &value_layer); + debug!( + "[{}] {:?}={:?}", + self.directory_index, + unpack::>(&key_layer).unwrap(), + unpack::>(&value_layer).unwrap(), + ); + txn.set(&key_exists, &value_exists); + debug!( + "[{}] {:?}={:?}", + self.directory_index, + unpack::>(&key_exists).unwrap(), + unpack::>(&value_exists).unwrap(), + ); + txn.set(&key_children, &value_children); + debug!( + "[{}] {:?}={:?}", + self.directory_index, + unpack::>(&key_children).unwrap(), + unpack::>(&value_children).unwrap(), + ); + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [byte_array]. Call .key() on the current + // subspace and store the result as [prefix]. Throw an error if the popped + // array does not start with prefix. Otherwise, remove the prefix from the + // popped array and push the result onto the stack. + DirectoryStripPrefix => match self.pop_optional_bytes().await { + None => { + self.push_directory_err( + &instr.code, + number, + DirectoryError::Other(String::from("bad input on bytes")), + ); + } + Some(raw_prefix) => { + let ssb = self.get_bytes_for_current_directory().unwrap(); + if !raw_prefix.to_vec().starts_with(&ssb) { + self.push_directory_err( + &instr.code, + number, + DirectoryError::Version(String::from( + "String does not start with raw prefix", + )), + ); + } else { + self.push( + number, + Element::Bytes(Bytes::from(raw_prefix[ssb.len()..].to_owned())), + ); + } + } + }, } if is_db && pending { @@ -1546,6 +2557,95 @@ impl StackMachine { handle.join().expect("joined thread to not panic"); } } + + fn get_current_directory_item(&mut self) -> Option<&DirectoryStackItem> { + self.directory_stack.get(self.directory_index) + } + + fn get_current_directory(&self) -> Option> { + match self.directory_stack.get(self.directory_index) { + None => None, + Some(directory_or_subspace) => match directory_or_subspace { + DirectoryStackItem::Directory(d) => Some(Box::new(d.clone())), + DirectoryStackItem::DirectoryOutput(d) => Some(Box::new((*d).clone())), + _ => None, + }, + } + } + + fn pack_with_current_subspace(&self, v: &T) -> Option> { + match self.directory_stack.get(self.directory_index) { + None => None, + Some(directory_or_subspace) => match directory_or_subspace { + DirectoryStackItem::DirectoryOutput(DirectoryOutput::DirectorySubspace(d)) => { + Some(d.pack(v)) + } + DirectoryStackItem::Subspace(d) => Some(d.pack(v)), + _ => None, + }, + } + } + + fn unpack_with_current_subspace<'de, T: TupleUnpack<'de>>( + &self, + key: &'de [u8], + ) -> Option> { + match self.directory_stack.get(self.directory_index) { + None => None, + Some(directory_or_subspace) => match directory_or_subspace { + DirectoryStackItem::DirectoryOutput(d) => Some(d.unpack(key)), + DirectoryStackItem::Subspace(d) => Some(d.unpack(key)), + _ => None, + }, + } + } + + fn subspace_with_current_item(&self, t: &T) -> Option { + match self.directory_stack.get(self.directory_index) { + None => None, + Some(directory_or_subspace) => match directory_or_subspace { + DirectoryStackItem::DirectoryOutput(d) => Some(d.subspace(t)), + DirectoryStackItem::Subspace(d) => Some(d.subspace(t)), + _ => None, + }, + } + } + + fn get_path_for_current_directory(&self) -> Option> { + match self.directory_stack.get(self.directory_index) { + None => None, + Some(directory_or_subspace) => match directory_or_subspace { + DirectoryStackItem::Directory(d) => Some(d.get_path()), + DirectoryStackItem::DirectoryOutput(d) => Some(d.get_path()), + _ => None, + }, + } + } + + fn get_layer_for_current_directory(&self) -> Option> { + match self.directory_stack.get(self.directory_index) { + None => None, + Some(directory_or_subspace) => match directory_or_subspace { + DirectoryStackItem::DirectoryOutput(d) => Some(d.get_layer()), + DirectoryStackItem::Directory(_) => Some(vec![]), + _ => None, + }, + } + } + + fn get_bytes_for_current_directory(&self) -> Option> { + match self.directory_stack.get(self.directory_index) { + None => None, + Some(directory_or_subspace) => match directory_or_subspace { + DirectoryStackItem::DirectoryOutput(d) => match d { + DirectoryOutput::DirectorySubspace(s) => Some(Vec::from(s.bytes())), + DirectoryOutput::DirectoryPartition(_) => None, + }, + DirectoryStackItem::Subspace(subspace) => Some(Vec::from(subspace.bytes())), + _ => None, + }, + } + } } fn main() { diff --git a/foundationdb/Cargo.toml b/foundationdb/Cargo.toml index a244ca0c..d68110e2 100644 --- a/foundationdb/Cargo.toml +++ b/foundationdb/Cargo.toml @@ -43,15 +43,17 @@ foundationdb-gen = { version = "0.5.1", path = "../foundationdb-gen", default-fe [dependencies] foundationdb-sys = { version = "0.5.1", path = "../foundationdb-sys", default-features = false } -futures = "0.3.1" +futures = "0.3.14" memchr = "2.2.1" rand = { version = "0.7.2", features = ["default", "small_rng"] } static_assertions = "1.1.0" uuid = { version = "0.8.1", optional = true } num-bigint = { version = "0.3.0", optional = true } +byteorder = "1.3.2" +async-trait = "0.1.48" +async-recursion = "0.3.2" [dev-dependencies] -byteorder = "1.3.2" lazy_static = "1.4.0" log = "0.4.8" tokio = { version = "0.2.9", features = ["rt-core", "rt-threaded", "macros"] } diff --git a/foundationdb/examples/class-scheduling.rs b/foundationdb/examples/class-scheduling.rs index c0a80cfa..838c80b5 100644 --- a/foundationdb/examples/class-scheduling.rs +++ b/foundationdb/examples/class-scheduling.rs @@ -22,21 +22,21 @@ use foundationdb::{Database, FdbError, RangeOption, TransactError, TransactOptio type Result = std::result::Result; enum Error { - FdbError(FdbError), + Fdb(FdbError), NoRemainingSeats, TooManyClasses, } impl From for Error { fn from(err: FdbError) -> Self { - Error::FdbError(err) + Error::Fdb(err) } } impl TransactError for Error { fn try_into_fdb_error(self) -> std::result::Result { match self { - Error::FdbError(err) => Ok(err), + Error::Fdb(err) => Ok(err), _ => Err(self), } } @@ -223,10 +223,10 @@ async fn switch_classes( trx: &Transaction, student_id: &str, old_class: &str, - new_class: &str, + _new_class: &str, ) -> Result<()> { - ditch_trx(trx, student_id.clone(), old_class.clone()).await; - signup_trx(trx, student_id.clone(), new_class.clone()).await?; + ditch_trx(trx, <&str>::clone(&student_id), <&str>::clone(&old_class)).await; + signup_trx(trx, <&str>::clone(&student_id), <&str>::clone(&old_class)).await?; Ok(()) } @@ -297,7 +297,7 @@ async fn simulate_students(student_id: usize, num_ops: usize) { for _ in 0..num_ops { let mut moods = Vec::::new(); - if my_classes.len() > 0 { + if !my_classes.is_empty() { moods.push(Mood::Ditch); moods.push(Mood::Switch); } @@ -306,7 +306,7 @@ async fn simulate_students(student_id: usize, num_ops: usize) { moods.push(Mood::Add); } - let mood = moods.choose(&mut rng).map(|mood| *mood).unwrap(); + let mood = moods.choose(&mut rng).copied().unwrap(); // on errors we recheck for available classes if perform_op( diff --git a/foundationdb/src/api.rs b/foundationdb/src/api.rs index 93290756..1347ec7f 100644 --- a/foundationdb/src/api.rs +++ b/foundationdb/src/api.rs @@ -13,7 +13,6 @@ //! - [API versioning](https://apple.github.io/foundationdb/api-c.html#api-versioning) //! - [Network](https://apple.github.io/foundationdb/api-c.html#network) -use std::panic; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Condvar, Mutex}; use std::thread; @@ -61,9 +60,9 @@ impl FdbApiBuilder { /// /// This function will panic if called more than once pub fn build(self) -> FdbResult { - if VERSION_SELECTED.compare_and_swap(false, true, Ordering::AcqRel) { - panic!("the fdb select api version can only be run once per process"); - } + VERSION_SELECTED + .compare_exchange_weak(false, true, Ordering::AcqRel, Ordering::Acquire) + .expect("the fdb select api version can only be run once per process"); error::eval(unsafe { fdb_sys::fdb_select_api_version_impl( self.runtime_version, diff --git a/foundationdb/src/directory/directory_layer.rs b/foundationdb/src/directory/directory_layer.rs new file mode 100644 index 00000000..43295880 --- /dev/null +++ b/foundationdb/src/directory/directory_layer.rs @@ -0,0 +1,830 @@ +// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors +// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors. +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +//! The default Directory implementation. + +use crate::directory::directory_partition::DirectoryPartition; +use crate::directory::directory_subspace::DirectorySubspace; +use crate::directory::error::DirectoryError; +use crate::directory::node::Node; +use crate::directory::{compare_slice, strinc, Directory, DirectoryOutput}; +use crate::future::FdbSlice; +use crate::tuple::hca::HighContentionAllocator; +use crate::tuple::{Element, Subspace, TuplePack}; +use crate::RangeOption; +use crate::{FdbResult, Transaction}; +use async_recursion::async_recursion; +use async_trait::async_trait; +use byteorder::{LittleEndian, WriteBytesExt}; + +use std::cmp::Ordering; + +use std::ops::Deref; +use std::option::Option::Some; +use std::sync::Arc; + +pub(crate) const DEFAULT_SUB_DIRS: i64 = 0; +const MAJOR_VERSION: u32 = 1; +const MINOR_VERSION: u32 = 0; +const PATCH_VERSION: u32 = 0; +pub(crate) const DEFAULT_NODE_PREFIX: &[u8] = b"\xFE"; +const DEFAULT_HCA_PREFIX: &[u8] = b"hca"; +pub(crate) const PARTITION_LAYER: &[u8] = b"partition"; +pub(crate) const LAYER_SUFFIX: &[u8] = b"layer"; + +/// A DirectoryLayer defines a new root directory. +/// The node subspace and content subspace control where the directory metadata and contents, +/// respectively, are stored. The default root directory has a node subspace with raw prefix \xFE +/// and a content subspace with no prefix. +#[derive(Clone)] +pub struct DirectoryLayer { + pub(crate) inner: Arc, +} + +impl std::fmt::Debug for DirectoryLayer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.inner.fmt(f) + } +} + +#[derive(Debug)] +pub struct DirectoryLayerInner { + pub(crate) root_node: Subspace, + pub(crate) node_subspace: Subspace, + pub(crate) content_subspace: Subspace, + pub(crate) allocator: HighContentionAllocator, + pub(crate) allow_manual_prefixes: bool, + + pub(crate) path: Vec, +} + +impl Deref for DirectoryLayer { + type Target = DirectoryLayerInner; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl Default for DirectoryLayer { + /// The default root directory stores directory layer metadata in keys beginning with 0xFE, + ///and allocates newly created directories in (unused) prefixes starting with 0x00 through 0xFD. + ///This is appropriate for otherwise empty databases, but may conflict with other formal or informal partitionings of keyspace. + /// If you already have other content in your database, you may wish to use NewDirectoryLayer to + /// construct a non-standard root directory to control where metadata and keys are stored. + fn default() -> Self { + Self::new( + Subspace::from_bytes(DEFAULT_NODE_PREFIX), + Subspace::all(), + false, + ) + } +} + +impl DirectoryLayer { + pub fn new( + node_subspace: Subspace, + content_subspace: Subspace, + allow_manual_prefixes: bool, + ) -> Self { + let root_node = node_subspace.subspace(&node_subspace.bytes()); + + DirectoryLayer { + inner: Arc::new(DirectoryLayerInner { + root_node: root_node.clone(), + node_subspace, + content_subspace, + allocator: HighContentionAllocator::new(root_node.subspace(&DEFAULT_HCA_PREFIX)), + allow_manual_prefixes, + path: vec![], + }), + } + } + + pub(crate) fn new_with_path( + node_subspace: Subspace, + content_subspace: Subspace, + allow_manual_prefixes: bool, + path: Vec, + ) -> Self { + let root_node = node_subspace.subspace(&node_subspace.bytes()); + + DirectoryLayer { + inner: Arc::new(DirectoryLayerInner { + root_node: root_node.clone(), + node_subspace, + content_subspace, + allocator: HighContentionAllocator::new(root_node.subspace(&DEFAULT_HCA_PREFIX)), + allow_manual_prefixes, + path, + }), + } + } + + pub fn get_path(&self) -> Vec { + self.path.clone() + } + + fn node_with_optional_prefix(&self, prefix: Option) -> Option { + match prefix { + None => None, + Some(fdb_slice) => Some(self.node_with_prefix(&(&*fdb_slice))), + } + } + + fn node_with_prefix(&self, prefix: &T) -> Subspace { + self.inner.node_subspace.subspace(prefix) + } + + async fn find(&self, trx: &Transaction, path: Vec) -> Result { + let mut node = Node { + subspace: Some(self.root_node.clone()), + current_path: vec![], + target_path: path.clone(), + layer: vec![], + loaded_metadata: false, + directory_layer: self.clone(), + }; + + // walking through the provided path + for path_name in path.iter() { + node.current_path.push(path_name.clone()); + let node_subspace = match node.subspace { + // unreachable because on first iteration, it is set to root_node, + // on other iteration, `node.exists` is checking for the subspace's value + None => unreachable!("node's subspace is not set"), + Some(s) => s, + }; + let key = node_subspace.subspace(&(DEFAULT_SUB_DIRS, path_name.to_owned())); + + // finding the next node + let fdb_slice_value = trx.get(key.bytes(), false).await?; + + node = Node { + subspace: self.node_with_optional_prefix(fdb_slice_value), + current_path: node.current_path.clone(), + target_path: path.clone(), + layer: vec![], + loaded_metadata: false, + directory_layer: self.clone(), + }; + + node.load_metadata(&trx).await?; + + if !node.exists() || node.layer.eq(PARTITION_LAYER) { + return Ok(node); + } + } + + if !node.loaded_metadata { + node.load_metadata(&trx).await?; + } + + Ok(node) + } + + fn to_absolute_path(&self, sub_path: &[String]) -> Vec { + let mut path: Vec = Vec::with_capacity(self.path.len() + sub_path.len()); + + path.extend_from_slice(&self.path); + path.extend_from_slice(sub_path); + + path + } + + pub(crate) fn contents_of_node( + &self, + node: Subspace, + path: Vec, + layer: Vec, + ) -> Result { + let prefix: Vec = self.node_subspace.unpack(node.bytes())?; + + if layer.eq(PARTITION_LAYER) { + Ok(DirectoryOutput::DirectoryPartition( + DirectoryPartition::new(self.to_absolute_path(&path), prefix, self.clone()), + )) + } else { + Ok(DirectoryOutput::DirectorySubspace(DirectorySubspace::new( + self.to_absolute_path(&path), + prefix, + self, + layer, + ))) + } + } + + /// `create_or_open_internal` is the function used to open and/or create a directory. + #[async_recursion] + async fn create_or_open_internal( + &self, + trx: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + allow_create: bool, + allow_open: bool, + ) -> Result { + self.check_version(trx, allow_create).await?; + + if prefix.is_some() && !self.allow_manual_prefixes { + if self.path.is_empty() { + return Err(DirectoryError::PrefixNotAllowed); + } + + return Err(DirectoryError::CannotPrefixInPartition); + } + + if path.is_empty() { + return Err(DirectoryError::NoPathProvided); + } + + let node = self.find(trx, path.to_owned()).await?; + + if node.exists() { + if node.is_in_partition(false) { + let sub_path = node.get_partition_subpath(); + match node.get_contents()? { + DirectoryOutput::DirectorySubspace(_) => unreachable!("already in partition"), + DirectoryOutput::DirectoryPartition(directory_partition) => { + let dir_space = directory_partition + .directory_subspace + .directory_layer + .create_or_open_internal( + trx, + sub_path.to_owned(), + prefix, + layer, + allow_create, + allow_open, + ) + .await?; + Ok(dir_space) + } + } + } else { + self.open_internal(layer, &node, allow_open).await + } + } else { + self.create_internal(trx, path, layer, prefix, allow_create) + .await + } + } + + async fn open_internal( + &self, + layer: Option>, + node: &Node, + allow_open: bool, + ) -> Result { + if !allow_open { + return Err(DirectoryError::DirAlreadyExists); + } + + match layer { + None => {} + Some(layer) => { + if !layer.is_empty() { + match compare_slice(&layer, &node.layer) { + Ordering::Equal => {} + _ => { + return Err(DirectoryError::IncompatibleLayer); + } + } + } + } + } + + node.get_contents() + } + + async fn create_internal( + &self, + trx: &Transaction, + path: Vec, + layer: Option>, + prefix: Option>, + allow_create: bool, + ) -> Result { + if !allow_create { + return Err(DirectoryError::DirectoryDoesNotExists); + } + + let layer = layer.unwrap_or_default(); + + self.check_version(trx, allow_create).await?; + let new_prefix = self.get_prefix(trx, prefix.clone()).await?; + + let is_prefix_free = self + .is_prefix_free(trx, new_prefix.to_owned(), prefix.is_none()) + .await?; + + if !is_prefix_free { + return Err(DirectoryError::DirectoryPrefixInUse); + } + + let parent_node = self.get_parent_node(trx, path.to_owned()).await?; + let node = self.node_with_prefix(&new_prefix); + + let key = parent_node.subspace(&(DEFAULT_SUB_DIRS, path.last().unwrap())); + let key_layer = node.pack(&LAYER_SUFFIX.to_vec()); + + trx.set(&key.bytes(), &new_prefix); + trx.set(&key_layer, &layer); + + self.contents_of_node(node, path.to_owned(), layer.to_owned()) + } + + async fn get_parent_node( + &self, + trx: &Transaction, + path: Vec, + ) -> Result { + if path.len() > 1 { + let (_, list) = path.split_last().unwrap(); + + let parent = self + .create_or_open_internal(trx, list.to_vec(), None, None, true, true) + .await?; + Ok(self.node_with_prefix(&parent.bytes().to_vec())) + } else { + Ok(self.root_node.clone()) + } + } + + async fn is_prefix_free( + &self, + trx: &Transaction, + prefix: Vec, + snapshot: bool, + ) -> Result { + if prefix.is_empty() { + return Ok(false); + } + + let node = self + .node_containing_key(trx, prefix.to_owned(), snapshot) + .await?; + + if node.is_some() { + return Ok(false); + } + + let range_option = RangeOption::from(( + self.node_subspace.pack(&prefix), + self.node_subspace.pack(&strinc(prefix)), + )); + + let result = trx.get_range(&range_option, 1, snapshot).await?; + + Ok(result.is_empty()) + } + + async fn node_containing_key( + &self, + trx: &Transaction, + key: Vec, + snapshot: bool, + ) -> Result, DirectoryError> { + if key.starts_with(self.node_subspace.bytes()) { + return Ok(Some(self.root_node.clone())); + } + + let mut key_after = key.to_vec(); + // pushing 0x00 to simulate keyAfter + key_after.push(0); + + let range_end = self.node_subspace.pack(&key_after); + + let mut range_option = RangeOption::from((self.node_subspace.range().0, range_end)); + range_option.reverse = true; + range_option.limit = Some(1); + + // checking range + let fdb_values = trx.get_range(&range_option, 1, snapshot).await?; + + match fdb_values.get(0) { + None => {} + Some(fdb_key_value) => { + let previous_prefix: Vec = + self.node_subspace.unpack(fdb_key_value.key())?; + + if let Some(Element::Bytes(b)) = previous_prefix.get(0) { + let previous_prefix = b.to_vec(); + if key.starts_with(&previous_prefix) { + return Ok(Some(self.node_with_prefix(&previous_prefix))); + }; + }; + } + } + Ok(None) + } + + async fn get_prefix( + &self, + trx: &Transaction, + prefix: Option>, + ) -> Result, DirectoryError> { + match prefix { + None => { + // no prefix provided, allocating one + let allocator = self.allocator.allocate(trx).await?; + let subspace = self.content_subspace.subspace(&allocator); + + // checking range + let result = trx + .get_range(&RangeOption::from(subspace.range()), 1, false) + .await?; + + if !result.is_empty() { + return Err(DirectoryError::PrefixNotEmpty); + } + + Ok(subspace.bytes().to_vec()) + } + Some(v) => Ok(v), + } + } + + /// `check_version` is checking the Directory's version in FDB. + async fn check_version( + &self, + trx: &Transaction, + allow_creation: bool, + ) -> Result<(), DirectoryError> { + let version = self.get_version_value(trx).await?; + match version { + None => { + if allow_creation { + self.initialize_directory(trx).await + } else { + return Ok(()); + } + } + Some(versions) => { + if versions.len() < 12 { + return Err(DirectoryError::Version( + "incorrect version length".to_string(), + )); + } + let mut arr = [0u8; 4]; + arr.copy_from_slice(&versions[0..4]); + let major: u32 = u32::from_le_bytes(arr); + + arr.copy_from_slice(&versions[4..8]); + let minor: u32 = u32::from_le_bytes(arr); + + arr.copy_from_slice(&versions[8..12]); + let patch: u32 = u32::from_le_bytes(arr); + + if major > MAJOR_VERSION { + let msg = format!("cannot load directory with version {}.{}.{} using directory layer {}.{}.{}", major, minor, patch, MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION); + return Err(DirectoryError::Version(msg)); + } + + if minor > MINOR_VERSION { + let msg = format!("directory with version {}.{}.{} is read-only when opened using directory layer {}.{}.{}", major, minor, patch, MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION); + return Err(DirectoryError::Version(msg)); + } + + Ok(()) + } + } + } + + /// `initialize_directory` is initializing the directory + async fn initialize_directory(&self, trx: &Transaction) -> Result<(), DirectoryError> { + let mut value = vec![]; + value.write_u32::(MAJOR_VERSION).unwrap(); + value.write_u32::(MINOR_VERSION).unwrap(); + value.write_u32::(PATCH_VERSION).unwrap(); + let version_subspace: &[u8] = b"version"; + let directory_version_key = self.root_node.subspace(&version_subspace); + trx.set(directory_version_key.bytes(), &value); + + Ok(()) + } + + async fn get_version_value(&self, trx: &Transaction) -> FdbResult> { + let version_subspace: &[u8] = b"version"; + let version_key = self.root_node.subspace(&version_subspace); + + trx.get(version_key.bytes(), false).await + } + + async fn exists_internal( + &self, + trx: &Transaction, + path: Vec, + ) -> Result { + self.check_version(trx, false).await?; + + let node = self.find(trx, path.to_owned()).await?; + + if !node.exists() { + return Ok(false); + } + + if node.is_in_partition(false) { + return node + .get_contents()? + .exists(trx, node.to_owned().get_partition_subpath()) + .await; + } + + Ok(true) + } + + async fn list_internal( + &self, + trx: &Transaction, + path: Vec, + ) -> Result, DirectoryError> { + self.check_version(trx, false).await?; + + let node = self.find(trx, path.to_owned()).await?; + if !node.exists() { + return Err(DirectoryError::PathDoesNotExists); + } + if node.is_in_partition(true) { + match node.get_contents()? { + DirectoryOutput::DirectorySubspace(_) => unreachable!("already in partition"), + DirectoryOutput::DirectoryPartition(directory_partition) => { + return directory_partition + .directory_subspace + .directory_layer + .list(trx, node.get_partition_subpath()) + .await + } + }; + } + + Ok(node.list_sub_folders(trx).await?) + } + + async fn move_to_internal( + &self, + trx: &Transaction, + old_path: Vec, + new_path: Vec, + ) -> Result { + self.check_version(trx, true).await?; + + if old_path.len() <= new_path.len() + && compare_slice(&old_path[..], &new_path[..old_path.len()]) == Ordering::Equal + { + return Err(DirectoryError::CannotMoveBetweenSubdirectory); + } + + let old_node = self.find(trx, old_path.to_owned()).await?; + let new_node = self.find(trx, new_path.to_owned()).await?; + + if !old_node.exists() { + return Err(DirectoryError::PathDoesNotExists); + } + + if old_node.is_in_partition(false) || new_node.is_in_partition(false) { + if !old_node.is_in_partition(false) + || !new_node.is_in_partition(false) + || old_node.current_path.eq(&new_node.current_path) + { + return Err(DirectoryError::CannotMoveBetweenPartition); + } + + return new_node + .get_contents()? + .move_to( + trx, + old_node.get_partition_subpath(), + new_node.get_partition_subpath(), + ) + .await; + } + + if new_node.exists() || new_path.is_empty() { + return Err(DirectoryError::DirAlreadyExists); + } + + let parent_path = match new_path.split_last() { + None => vec![], + Some((_, elements)) => elements.to_vec(), + }; + + let parent_node = self.find(trx, parent_path).await?; + if !parent_node.exists() { + return Err(DirectoryError::ParentDirDoesNotExists); + } + + let subspace_parent_node = match parent_node.subspace { + // not reachable because `self.find` is creating a node with a subspace, + None => unreachable!("node's subspace is not set"), + Some(ref s) => s.clone(), + }; + + let key = + subspace_parent_node.subspace(&(DEFAULT_SUB_DIRS, new_path.to_owned().last().unwrap())); + let value: Vec = self + .node_subspace + .unpack(old_node.subspace.clone().unwrap().bytes())?; + trx.set(&key.bytes(), &value); + + self.remove_from_parent(trx, old_path.to_owned()).await?; + + self.contents_of_node( + old_node.subspace.unwrap(), + new_path.to_owned(), + old_node.layer, + ) + } + + async fn remove_from_parent( + &self, + trx: &Transaction, + path: Vec, + ) -> Result<(), DirectoryError> { + let (last_element, parent_path) = match path.split_last() { + None => return Err(DirectoryError::BadDestinationDirectory), + Some((last, elements)) => (last.clone(), elements.to_vec()), + }; + + let parent_node = self.find(trx, parent_path).await?; + match parent_node.subspace { + None => {} + Some(subspace) => { + let key = subspace.pack(&(DEFAULT_SUB_DIRS, last_element)); + trx.clear(&key); + } + } + + Ok(()) + } + + #[async_recursion] + async fn remove_internal( + &self, + trx: &Transaction, + path: Vec, + fail_on_nonexistent: bool, + ) -> Result { + self.check_version(trx, true).await?; + + if path.is_empty() { + return Err(DirectoryError::CannotModifyRootDirectory); + } + + let node = self.find(&trx, path.to_owned()).await?; + + if !node.exists() { + return if fail_on_nonexistent { + Err(DirectoryError::DirectoryDoesNotExists) + } else { + Ok(false) + }; + } + + if node.is_in_partition(false) { + match node.get_contents()? { + DirectoryOutput::DirectorySubspace(_) => { + unreachable!("already directory partition") + } + DirectoryOutput::DirectoryPartition(d) => { + return d + .directory_subspace + .directory_layer + .remove_internal(trx, node.get_partition_subpath(), fail_on_nonexistent) + .await + } + } + } + + self.remove_recursive(trx, node.subspace.unwrap().clone()) + .await?; + self.remove_from_parent(trx, path.to_owned()).await?; + + Ok(true) + } + + #[async_recursion] + async fn remove_recursive( + &self, + trx: &Transaction, + node_sub: Subspace, + ) -> Result<(), DirectoryError> { + let sub_dir = node_sub.subspace(&(DEFAULT_SUB_DIRS)); + let (mut begin, end) = sub_dir.range(); + + loop { + let range_option = RangeOption::from((begin.as_slice(), end.as_slice())); + + let range = trx.get_range(&range_option, 1024, false).await?; + let has_more = range.more(); + + for row_key in range { + let sub_node = self.node_with_prefix(&row_key.value()); + self.remove_recursive(trx, sub_node).await?; + begin = row_key.key().pack_to_vec(); + } + + if !has_more { + break; + } + } + + let node_prefix: Vec = self.node_subspace.unpack(node_sub.bytes())?; + + trx.clear_range(&node_prefix, &strinc(node_prefix.to_owned())); + trx.clear_subspace_range(&node_sub); + + Ok(()) + } +} + +#[async_trait] +impl Directory for DirectoryLayer { + /// `create_or_open` opens the directory specified by path (relative to this + /// Directory), and returns the directory and its contents as a + /// Subspace. If the directory does not exist, it is created + /// (creating parent directories if necessary). + async fn create_or_open( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result { + self.create_or_open_internal(txn, path, prefix, layer, true, true) + .await + } + + async fn create( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result { + self.create_or_open_internal(txn, path, prefix, layer, true, false) + .await + } + + async fn open( + &self, + txn: &Transaction, + path: Vec, + layer: Option>, + ) -> Result { + self.create_or_open_internal(txn, path, None, layer, false, true) + .await + } + + async fn exists(&self, trx: &Transaction, path: Vec) -> Result { + self.exists_internal(trx, path).await + } + + async fn move_directory( + &self, + _trx: &Transaction, + _new_path: Vec, + ) -> Result { + Err(DirectoryError::CannotMoveRootDirectory) + } + + /// `move_to` the directory from old_path to new_path(both relative to this + /// Directory), and returns the directory (at its new location) and its + /// contents as a Subspace. Move will return an error if a directory + /// does not exist at oldPath, a directory already exists at newPath, or the + /// parent directory of newPath does not exist. + async fn move_to( + &self, + trx: &Transaction, + old_path: Vec, + new_path: Vec, + ) -> Result { + self.move_to_internal(trx, old_path, new_path).await + } + + async fn remove(&self, trx: &Transaction, path: Vec) -> Result { + self.remove_internal(trx, path.to_owned(), true).await + } + + async fn remove_if_exists( + &self, + trx: &Transaction, + path: Vec, + ) -> Result { + self.remove_internal(trx, path.to_owned(), false).await + } + + async fn list( + &self, + trx: &Transaction, + path: Vec, + ) -> Result, DirectoryError> { + self.list_internal(trx, path).await + } +} diff --git a/foundationdb/src/directory/directory_partition.rs b/foundationdb/src/directory/directory_partition.rs new file mode 100644 index 00000000..37741635 --- /dev/null +++ b/foundationdb/src/directory/directory_partition.rs @@ -0,0 +1,243 @@ +// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors +// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors. +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +//! A resulting Subspace whose prefix is preprended to all of its descendant directories's prefixes. + +use crate::directory::directory_layer::{DirectoryLayer, DEFAULT_NODE_PREFIX, PARTITION_LAYER}; +use crate::directory::directory_subspace::DirectorySubspace; +use crate::directory::error::DirectoryError; +use crate::directory::{Directory, DirectoryOutput}; +use crate::tuple::Subspace; +use crate::Transaction; +use async_trait::async_trait; +use std::ops::Deref; +use std::sync::Arc; + +/// A `DirectoryPartition` is a DirectorySubspace whose prefix is preprended to all of its descendant +/// directories's prefixes. It cannot be used as a Subspace. Instead, you must create at +/// least one subdirectory to store content. +#[derive(Clone)] +pub struct DirectoryPartition { + pub(crate) inner: Arc, +} + +#[derive(Debug)] +pub struct DirectoryPartitionInner { + pub(crate) directory_subspace: DirectorySubspace, + pub(crate) parent_directory_layer: DirectoryLayer, +} + +impl Deref for DirectoryPartition { + type Target = DirectoryPartitionInner; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl std::fmt::Debug for DirectoryPartition { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.inner.fmt(f) + } +} + +impl DirectoryPartition { + // https://github.com/apple/foundationdb/blob/master/bindings/flow/DirectoryPartition.h#L34-L43 + pub(crate) fn new( + path: Vec, + prefix: Vec, + parent_directory_layer: DirectoryLayer, + ) -> Self { + let mut node_subspace_bytes = vec![]; + node_subspace_bytes.extend_from_slice(&prefix); + node_subspace_bytes.extend_from_slice(DEFAULT_NODE_PREFIX); + + let new_directory_layer = DirectoryLayer::new_with_path( + Subspace::from_bytes(&node_subspace_bytes), + Subspace::from_bytes(prefix.as_slice()), + false, + path.to_owned(), + ); + + DirectoryPartition { + inner: Arc::new(DirectoryPartitionInner { + directory_subspace: DirectorySubspace::new( + path, + prefix, + &new_directory_layer, + Vec::from(PARTITION_LAYER), + ), + parent_directory_layer, + }), + } + } +} + +impl DirectoryPartition { + pub fn get_path(&self) -> Vec { + self.inner.directory_subspace.get_path() + } + + fn get_directory_layer_for_path(&self, path: &[String]) -> DirectoryLayer { + if path.is_empty() { + self.parent_directory_layer.clone() + } else { + self.directory_subspace.directory_layer.clone() + } + } + + fn get_partition_subpath( + &self, + path: Vec, + directory_layer: Option, + ) -> Vec { + let mut new_path = vec![]; + + new_path.extend_from_slice( + &self.directory_subspace.get_path()[directory_layer + .unwrap_or_else(|| self.directory_subspace.directory_layer.clone()) + .path + .len()..], + ); + new_path.extend_from_slice(&path); + + new_path + } + + pub fn get_layer(&self) -> Vec { + String::from("partition").into_bytes() + } +} + +#[async_trait] +impl Directory for DirectoryPartition { + async fn create_or_open( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result { + self.inner + .directory_subspace + .create_or_open(txn, path, prefix, layer) + .await + } + + async fn create( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result { + self.inner + .directory_subspace + .create(txn, path, prefix, layer) + .await + } + + async fn open( + &self, + txn: &Transaction, + path: Vec, + layer: Option>, + ) -> Result { + self.inner.directory_subspace.open(txn, path, layer).await + } + + async fn exists(&self, trx: &Transaction, path: Vec) -> Result { + let directory_layer = self.get_directory_layer_for_path(&path); + + directory_layer + .exists( + trx, + self.get_partition_subpath(path.to_owned(), Some(directory_layer.clone())), + ) + .await + } + + async fn move_directory( + &self, + trx: &Transaction, + new_path: Vec, + ) -> Result { + let directory_layer = self.get_directory_layer_for_path(&[]); + let directory_layer_path = directory_layer.path.to_owned(); + + if directory_layer_path.len() > new_path.len() { + return Err(DirectoryError::CannotMoveBetweenPartition); + } + + for (i, path) in directory_layer_path.iter().enumerate() { + match new_path.get(i) { + None => return Err(DirectoryError::CannotMoveBetweenPartition), + Some(new_path_item) => { + if !new_path_item.eq(path) { + return Err(DirectoryError::CannotMoveBetweenPartition); + } + } + } + } + + let mut new_relative_path = vec![]; + new_relative_path.extend_from_slice(&new_path[directory_layer_path.len()..]); + + directory_layer + .move_to( + trx, + self.get_partition_subpath(vec![], Some(directory_layer.clone())), + new_relative_path.to_owned(), + ) + .await + } + + async fn move_to( + &self, + trx: &Transaction, + old_path: Vec, + new_path: Vec, + ) -> Result { + self.inner + .directory_subspace + .move_to(trx, old_path, new_path) + .await + } + + async fn remove(&self, trx: &Transaction, path: Vec) -> Result { + let directory_layer = self.get_directory_layer_for_path(&path); + directory_layer + .remove( + trx, + self.get_partition_subpath(path.to_owned(), Some(directory_layer.clone())), + ) + .await + } + + async fn remove_if_exists( + &self, + trx: &Transaction, + path: Vec, + ) -> Result { + let directory_layer = self.get_directory_layer_for_path(&path); + directory_layer + .remove_if_exists( + trx, + self.get_partition_subpath(path.to_owned(), Some(directory_layer.clone())), + ) + .await + } + + async fn list( + &self, + trx: &Transaction, + path: Vec, + ) -> Result, DirectoryError> { + self.inner.directory_subspace.list(trx, path).await + } +} diff --git a/foundationdb/src/directory/directory_subspace.rs b/foundationdb/src/directory/directory_subspace.rs new file mode 100644 index 00000000..fcf63cce --- /dev/null +++ b/foundationdb/src/directory/directory_subspace.rs @@ -0,0 +1,252 @@ +// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors +// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors. +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +//! The resulting Subspace generated with a Directory + +use crate::directory::directory_layer::DirectoryLayer; +use crate::directory::error::DirectoryError; +use crate::directory::{Directory, DirectoryOutput}; +use crate::tuple::{PackResult, Subspace, TuplePack, TupleUnpack}; +use crate::Transaction; +use async_trait::async_trait; + +/// A `DirectorySubspace` represents the contents of a directory, but it also remembers +/// the path with which it was opened and offers convenience methods to operate on the directory at that path. +/// An instance of `DirectorySubspace` can be used for all the usual subspace operations. +/// It can also be used to operate on the directory with which it was opened. +#[derive(Debug, Clone)] +pub struct DirectorySubspace { + pub(crate) directory_layer: DirectoryLayer, + subspace: Subspace, + path: Vec, + layer: Vec, +} + +impl DirectorySubspace { + pub fn new( + path: Vec, + prefix: Vec, + directory_layer: &DirectoryLayer, + layer: Vec, + ) -> Self { + DirectorySubspace { + directory_layer: directory_layer.clone(), + subspace: Subspace::from_bytes(&prefix), + path, + layer, + } + } + + // https://github.com/apple/foundationdb/blob/master/bindings/flow/DirectorySubspace.cpp#L105 + fn get_partition_subpath( + &self, + path: Vec, + directory_layer: Option, + ) -> Vec { + let mut new_path = vec![]; + + new_path.extend_from_slice( + &self.path[directory_layer + .unwrap_or_else(|| self.directory_layer.clone()) + .path + .len()..], + ); + new_path.extend_from_slice(&path); + + new_path + } +} + +impl DirectorySubspace { + pub fn subspace(&self, t: &T) -> Subspace { + self.subspace.subspace(t) + } + + pub fn bytes(&self) -> &[u8] { + self.subspace.bytes() + } + + pub fn pack(&self, t: &T) -> Vec { + self.subspace.pack(t) + } + + pub fn unpack<'de, T: TupleUnpack<'de>>(&self, key: &'de [u8]) -> PackResult { + self.subspace.unpack(key) + } + + pub fn range(&self) -> (Vec, Vec) { + self.subspace.range() + } + + pub fn get_path(&self) -> Vec { + self.path.clone() + } + + pub fn set_path(&mut self, path: Vec) { + self.path = path; + } + + pub fn get_layer(&self) -> Vec { + self.layer.clone() + } + + pub fn is_start_of(&self, key: &[u8]) -> bool { + self.subspace.is_start_of(&key) + } + + fn get_directory_layer_for_path(&self, _: &[String]) -> DirectoryLayer { + self.directory_layer.clone() + } +} + +#[async_trait] +impl Directory for DirectorySubspace { + async fn create_or_open( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result { + self.directory_layer + .create_or_open( + txn, + self.get_partition_subpath(path.to_owned(), None), + prefix, + layer, + ) + .await + } + + async fn create( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result { + self.directory_layer + .create( + txn, + self.get_partition_subpath(path.to_owned(), None), + prefix, + layer, + ) + .await + } + + async fn open( + &self, + txn: &Transaction, + path: Vec, + layer: Option>, + ) -> Result { + self.directory_layer + .open( + txn, + self.get_partition_subpath(path.to_owned(), None), + layer, + ) + .await + } + + async fn exists(&self, trx: &Transaction, path: Vec) -> Result { + let directory_layer = self.get_directory_layer_for_path(&path); + + directory_layer + .exists( + trx, + self.get_partition_subpath(path.to_owned(), Some(directory_layer.clone())), + ) + .await + } + + async fn move_directory( + &self, + trx: &Transaction, + new_path: Vec, + ) -> Result { + let directory_layer = self.get_directory_layer_for_path(&[]); + let directory_layer_path = directory_layer.path.to_owned(); + + if directory_layer_path.len() > new_path.len() { + return Err(DirectoryError::CannotMoveBetweenPartition); + } + + for (i, path) in directory_layer_path.iter().enumerate() { + match new_path.get(i) { + None => return Err(DirectoryError::CannotMoveBetweenPartition), + Some(new_path_item) => { + if !new_path_item.eq(path) { + return Err(DirectoryError::CannotMoveBetweenPartition); + } + } + } + } + + let mut new_relative_path = vec![]; + new_relative_path.extend_from_slice(&new_path[directory_layer_path.len()..]); + + directory_layer + .move_to( + trx, + self.get_partition_subpath(vec![], Some(directory_layer.clone())), + new_relative_path.to_owned(), + ) + .await + } + + async fn move_to( + &self, + trx: &Transaction, + old_path: Vec, + new_path: Vec, + ) -> Result { + self.directory_layer + .move_to( + trx, + self.get_partition_subpath(old_path, None), + self.get_partition_subpath(new_path, None), + ) + .await + } + + async fn remove(&self, trx: &Transaction, path: Vec) -> Result { + let directory_layer = self.get_directory_layer_for_path(&path); + directory_layer + .remove( + trx, + self.get_partition_subpath(path.to_owned(), Some(directory_layer.clone())), + ) + .await + } + + async fn remove_if_exists( + &self, + trx: &Transaction, + path: Vec, + ) -> Result { + let directory_layer = self.get_directory_layer_for_path(&path); + directory_layer + .remove_if_exists( + trx, + self.get_partition_subpath(path.to_owned(), Some(directory_layer.clone())), + ) + .await + } + + async fn list( + &self, + trx: &Transaction, + path: Vec, + ) -> Result, DirectoryError> { + self.directory_layer + .list(trx, self.get_partition_subpath(path.to_owned(), None)) + .await + } +} diff --git a/foundationdb/src/directory/error.rs b/foundationdb/src/directory/error.rs new file mode 100644 index 00000000..c81582c9 --- /dev/null +++ b/foundationdb/src/directory/error.rs @@ -0,0 +1,73 @@ +// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors +// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors. +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +//! Errors that can be thrown by Directory. + +use crate::error; +use crate::tuple::hca::HcaError; +use crate::tuple::PackError; +use std::io; + +/// The enumeration holding all possible errors from a Directory. +#[derive(Debug)] +pub enum DirectoryError { + /// cannot modify the root directory + CannotModifyRootDirectory, + /// prefix is already used + DirectoryPrefixInUse, + /// Directory does not exists + DirectoryDoesNotExists, + /// missing path. + NoPathProvided, + /// tried to create an already existing path. + DirAlreadyExists, + /// missing directory. + PathDoesNotExists, + /// Parent does not exists + ParentDirDoesNotExists, + /// the layer is incompatible. + IncompatibleLayer, + /// the destination directory cannot be a subdirectory of the source directory. + BadDestinationDirectory, + /// Bad directory version. + Version(String), + /// cannot specify a prefix unless manual prefixes are enabled + PrefixNotAllowed, + /// cannot specify a prefix in a partition. + CannotPrefixInPartition, + /// the root directory cannot be moved + CannotMoveRootDirectory, + CannotMoveBetweenPartition, + /// the destination directory cannot be a subdirectory of the source directory + CannotMoveBetweenSubdirectory, + /// Prefix is not empty + PrefixNotEmpty, + IoError(io::Error), + FdbError(error::FdbError), + HcaError(HcaError), + PackError(PackError), + Other(String), +} + +impl From for DirectoryError { + fn from(err: error::FdbError) -> Self { + DirectoryError::FdbError(err) + } +} + +impl From for DirectoryError { + fn from(err: HcaError) -> Self { + DirectoryError::HcaError(err) + } +} + +impl From for DirectoryError { + fn from(err: PackError) -> Self { + DirectoryError::PackError(err) + } +} diff --git a/foundationdb/src/directory/mod.rs b/foundationdb/src/directory/mod.rs new file mode 100644 index 00000000..22c38301 --- /dev/null +++ b/foundationdb/src/directory/mod.rs @@ -0,0 +1,372 @@ +// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors +// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors. +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +//! Directory provides a tool for managing related subspaces. +//! +//! The FoundationDB API provides directories as a tool for managing related Subspaces. +//! For general guidance on directory usage, see the discussion in the [Developer Guide](https://apple.github.io/foundationdb/developer-guide.html#directories). +//! +//! Directories are identified by hierarchical paths analogous to the paths in a Unix-like file system. +//! A path is represented as a slice of strings. Each directory has an associated subspace used to +//! store its content. The directory layer maps each path to a short prefix used for the +//! corresponding subspace. In effect, directories provide a level of indirection for access to subspaces. +//! Directory operations are transactional. +//! +//! It is a direct backport of the [Flow implementation](https://github.com/apple/foundationdb/tree/master/bindings/flow). +//! +//! Examples: +//! +//! ```rust +//! use futures::prelude::*; +//! use foundationdb::directory::Directory; +//! +//! async fn async_main() -> foundationdb::FdbResult<()> { +//! let db = foundationdb::Database::default()?; +//! +//! // creates a transaction +//! let trx = db.create_trx()?; +//! +//! // creates a directory +//! let directory = foundationdb::directory::directory_layer::DirectoryLayer::default(); +//! +//! // use the directory to create a subspace to use +//! let content_subspace = directory.create_or_open( +//! // the transaction used to read/write the directory. +//! &trx, +//! // the path used, which can view as a UNIX path like `/app/my-app`. +//! vec![String::from("my-awesome-app"), String::from("my-awesome-user")], +//! // do not use any custom prefix or layer +//! None, None, +//! ).await; +//! assert_eq!(true, content_subspace.is_ok()); +//! +//! // Don't forget to commit your transaction to persist the subspace +//! trx.commit().await?; +//! +//! Ok(()) +//! } +//! +//! // Safe because drop is called before the program exits +//! let network = unsafe { foundationdb::boot() }; +//! futures::executor::block_on(async_main()).expect("failed to run"); +//! drop(network); +//! ``` +pub mod directory_layer; +pub mod directory_partition; +pub mod directory_subspace; +pub mod error; +pub(crate) mod node; + +use crate::directory::directory_subspace::DirectorySubspace; +use crate::directory::error::DirectoryError; +use async_trait::async_trait; + +use crate::Transaction; + +use crate::directory::directory_partition::DirectoryPartition; +use crate::tuple::{PackResult, Subspace, TuplePack, TupleUnpack}; +use core::cmp; +use std::cmp::Ordering; + +/// `Directory` represents a subspace of keys in a FoundationDB database, identified by a hierarchical path. +#[async_trait] +pub trait Directory { + /// Creates or opens the subdirectory of this Directory located at path (creating parent directories, if necessary). + async fn create_or_open( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result; + + /// Creates a subdirectory of this Directory located at path (creating parent directories if necessary). + async fn create( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result; + + /// Opens the subdirectory of this Directory located at path. + async fn open( + &self, + txn: &Transaction, + path: Vec, + layer: Option>, + ) -> Result; + + /// Checks if the subdirectory of this Directory located at path exists. + async fn exists(&self, trx: &Transaction, path: Vec) -> Result; + + /// Moves this Directory to the specified newAbsolutePath. + async fn move_directory( + &self, + trx: &Transaction, + new_path: Vec, + ) -> Result; + + /// Moves the subdirectory of this Directory located at oldpath to newpath. + async fn move_to( + &self, + trx: &Transaction, + old_path: Vec, + new_path: Vec, + ) -> Result; + + /// Removes the subdirectory of this Directory located at path and all of its subdirectories, as well as all of their contents. + async fn remove(&self, trx: &Transaction, path: Vec) -> Result; + + /// Removes the subdirectory of this Directory located at path (if the path exists) and all of its subdirectories, as well as all of their contents. + async fn remove_if_exists( + &self, + trx: &Transaction, + path: Vec, + ) -> Result; + + /// List the subdirectories of this directory at a given subpath. + async fn list( + &self, + trx: &Transaction, + path: Vec, + ) -> Result, DirectoryError>; +} + +pub(crate) fn compare_slice(a: &[T], b: &[T]) -> cmp::Ordering { + for (ai, bi) in a.iter().zip(b.iter()) { + match ai.cmp(&bi) { + Ordering::Equal => continue, + ord => return ord, + } + } + + // if every single element was equal, compare length + a.len().cmp(&b.len()) +} + +/// DirectoryOutput represents the different output of a Directory. +#[derive(Clone, Debug)] +pub enum DirectoryOutput { + /// Under classic usage, you will obtain an `DirectorySubspace` + DirectorySubspace(DirectorySubspace), + /// You can open an `DirectoryPartition` by using the "partition" layer + DirectoryPartition(DirectoryPartition), +} + +// TODO: should we have a Subspace trait? +impl DirectoryOutput { + pub fn subspace(&self, t: &T) -> Subspace { + match self { + DirectoryOutput::DirectorySubspace(d) => d.subspace(t), + DirectoryOutput::DirectoryPartition(_) => { + panic!("cannot open subspace in the root of a directory partition") + } + } + } + + pub fn bytes(&self) -> &[u8] { + match self { + DirectoryOutput::DirectorySubspace(d) => d.bytes(), + DirectoryOutput::DirectoryPartition(_) => { + panic!("cannot get key for the root of a directory partition") + } + } + } + + pub fn pack(&self, t: &T) -> Vec { + match self { + DirectoryOutput::DirectorySubspace(d) => d.pack(t), + DirectoryOutput::DirectoryPartition(_) => { + panic!("cannot pack for the root of a directory partition") + } + } + } + + pub fn unpack<'de, T: TupleUnpack<'de>>(&self, key: &'de [u8]) -> PackResult { + match self { + DirectoryOutput::DirectorySubspace(d) => d.unpack(key), + DirectoryOutput::DirectoryPartition(_) => { + panic!("cannot unpack keys using the root of a directory partition") + } + } + } + + pub fn range(&self) -> (Vec, Vec) { + match self { + DirectoryOutput::DirectorySubspace(d) => d.range(), + DirectoryOutput::DirectoryPartition(_) => { + panic!("cannot get range for the root of a directory partition") + } + } + } + + pub fn get_path(&self) -> Vec { + match self { + DirectoryOutput::DirectorySubspace(d) => d.get_path(), + DirectoryOutput::DirectoryPartition(d) => d.get_path(), + } + } + + pub fn get_layer(&self) -> Vec { + match self { + DirectoryOutput::DirectorySubspace(d) => d.get_layer(), + DirectoryOutput::DirectoryPartition(d) => d.get_layer(), + } + } +} + +#[async_trait] +impl Directory for DirectoryOutput { + async fn create_or_open( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result { + match self { + DirectoryOutput::DirectorySubspace(d) => { + d.create_or_open(txn, path, prefix, layer).await + } + DirectoryOutput::DirectoryPartition(d) => { + d.create_or_open(txn, path, prefix, layer).await + } + } + } + + async fn create( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result { + match self { + DirectoryOutput::DirectorySubspace(d) => d.create(txn, path, prefix, layer).await, + DirectoryOutput::DirectoryPartition(d) => d.create(txn, path, prefix, layer).await, + } + } + + async fn open( + &self, + txn: &Transaction, + path: Vec, + layer: Option>, + ) -> Result { + match self { + DirectoryOutput::DirectorySubspace(d) => d.open(txn, path, layer).await, + DirectoryOutput::DirectoryPartition(d) => d.open(txn, path, layer).await, + } + } + + async fn exists(&self, trx: &Transaction, path: Vec) -> Result { + match self { + DirectoryOutput::DirectorySubspace(d) => d.exists(trx, path).await, + DirectoryOutput::DirectoryPartition(d) => d.exists(trx, path).await, + } + } + + async fn move_directory( + &self, + trx: &Transaction, + new_path: Vec, + ) -> Result { + match self { + DirectoryOutput::DirectorySubspace(d) => d.move_directory(trx, new_path).await, + DirectoryOutput::DirectoryPartition(d) => d.move_directory(trx, new_path).await, + } + } + + async fn move_to( + &self, + trx: &Transaction, + old_path: Vec, + new_path: Vec, + ) -> Result { + match self { + DirectoryOutput::DirectorySubspace(d) => d.move_to(trx, old_path, new_path).await, + DirectoryOutput::DirectoryPartition(d) => d.move_to(trx, old_path, new_path).await, + } + } + + async fn remove(&self, trx: &Transaction, path: Vec) -> Result { + match self { + DirectoryOutput::DirectorySubspace(d) => d.remove(trx, path).await, + DirectoryOutput::DirectoryPartition(d) => d.remove(trx, path).await, + } + } + + async fn remove_if_exists( + &self, + trx: &Transaction, + path: Vec, + ) -> Result { + match self { + DirectoryOutput::DirectorySubspace(d) => d.remove_if_exists(trx, path).await, + DirectoryOutput::DirectoryPartition(d) => d.remove_if_exists(trx, path).await, + } + } + + async fn list( + &self, + trx: &Transaction, + path: Vec, + ) -> Result, DirectoryError> { + match self { + DirectoryOutput::DirectorySubspace(d) => d.list(trx, path).await, + DirectoryOutput::DirectoryPartition(d) => d.list(trx, path).await, + } + } +} + +// Strinc returns the first key that would sort outside the range prefixed by prefix. +pub(crate) fn strinc(key: Vec) -> Vec { + let mut key = key; + + for i in (0..key.len()).rev() { + if key[i] != 0xff { + key[i] += 1; + return key; + } else { + // stripping key from trailing 0xFF bytes + key.remove(i); + } + } + panic!("failed to strinc"); +} + +#[cfg(test)] +mod tests { + use super::*; + + // https://github.com/apple/foundationdb/blob/e34df983ee8c0db333babf36fb620318d026553d/bindings/c/test/unit/unit_tests.cpp#L95 + #[test] + fn test_strinc() { + assert_eq!(strinc(Vec::from("a".as_bytes())), Vec::from("b".as_bytes())); + assert_eq!(strinc(Vec::from("y".as_bytes())), Vec::from("z".as_bytes())); + assert_eq!( + strinc(Vec::from("!".as_bytes())), + Vec::from("\"".as_bytes()) + ); + assert_eq!(strinc(Vec::from("*".as_bytes())), Vec::from("+".as_bytes())); + assert_eq!( + strinc(Vec::from("fdb".as_bytes())), + Vec::from("fdc".as_bytes()) + ); + assert_eq!( + strinc(Vec::from("foundation database 6".as_bytes())), + Vec::from("foundation database 7".as_bytes()) + ); + + assert_eq!(strinc(vec![61u8, 62u8, 255u8]), vec![61u8, 63u8]); + // from seed 3180880087 + assert_eq!(strinc(vec![253u8, 255u8]), vec![254u8]); + assert_eq!(strinc(vec![253u8, 255u8, 255u8]), vec![254u8]); + } +} diff --git a/foundationdb/src/directory/node.rs b/foundationdb/src/directory/node.rs new file mode 100644 index 00000000..6843ffa6 --- /dev/null +++ b/foundationdb/src/directory/node.rs @@ -0,0 +1,97 @@ +use crate::directory::directory_layer::{ + DirectoryLayer, DEFAULT_SUB_DIRS, LAYER_SUFFIX, PARTITION_LAYER, +}; +use crate::directory::error::DirectoryError; +use crate::directory::DirectoryOutput; +use crate::tuple::Subspace; +use crate::RangeOption; +use crate::Transaction; + +#[derive(Debug, Clone)] +pub(crate) struct Node { + pub(crate) subspace: Option, + pub(crate) current_path: Vec, + pub(crate) target_path: Vec, + pub(crate) layer: Vec, + pub(crate) loaded_metadata: bool, + pub(crate) directory_layer: DirectoryLayer, +} + +impl Node { + // `load_metadata` is loading extra information for the node, like the layer + pub(crate) async fn load_metadata(&mut self, trx: &Transaction) -> Result<(), DirectoryError> { + if !self.exists() { + self.loaded_metadata = true; + return Ok(()); + } + + let key = self.subspace.as_ref().unwrap().pack(&LAYER_SUFFIX.to_vec()); + self.layer = match trx.get(&key, false).await { + Ok(None) => vec![], + Err(err) => return Err(DirectoryError::FdbError(err)), + Ok(Some(fdb_slice)) => fdb_slice.to_vec(), + }; + + self.loaded_metadata = true; + + Ok(()) + } + + pub(crate) fn is_in_partition(&self, include_empty_subpath: bool) -> bool { + assert!(self.loaded_metadata); + + self.exists() + && self.layer.eq(PARTITION_LAYER) + && (include_empty_subpath || self.target_path.len() > self.current_path.len()) + } + + pub(crate) fn get_partition_subpath(&self) -> Vec { + Vec::from(&self.target_path[self.current_path.len()..]) + } + + pub(crate) fn exists(&self) -> bool { + self.subspace.is_some() + } + + /// list sub-folders for a node + pub(crate) async fn list_sub_folders( + &self, + trx: &Transaction, + ) -> Result, DirectoryError> { + let mut results = vec![]; + + let range_option = RangeOption::from( + &self + .subspace + .as_ref() + .unwrap() + .to_owned() + .subspace(&(DEFAULT_SUB_DIRS)), + ); + + let fdb_values = trx.get_range(&range_option, 1_024, false).await?; + + for fdb_value in fdb_values { + let subspace = Subspace::from_bytes(fdb_value.key()); + // stripping from subspace + let sub_directory: (i64, String) = + self.subspace.as_ref().unwrap().unpack(subspace.bytes())?; + results.push(sub_directory.1); + } + Ok(results) + } + + pub(crate) fn get_contents(&self) -> Result { + assert!(self.exists()); + assert!(self.loaded_metadata); + + match &self.subspace { + None => unreachable!(), + Some(subspace) => self.directory_layer.contents_of_node( + subspace.to_owned(), + self.current_path.to_owned(), + self.layer.to_owned(), + ), + } + } +} diff --git a/foundationdb/src/future.rs b/foundationdb/src/future.rs index 80a0827f..b60647ad 100644 --- a/foundationdb/src/future.rs +++ b/foundationdb/src/future.rs @@ -29,7 +29,6 @@ use std::ops::Deref; use std::os::raw::c_char; use std::pin::Pin; use std::ptr::NonNull; -use std::rc::Rc; use std::sync::Arc; use foundationdb_sys as fdb_sys; @@ -325,7 +324,7 @@ impl IntoIterator for FdbValues { fn into_iter(self) -> Self::IntoIter { FdbValuesIter { - f: Rc::new(self._f), + f: Arc::new(self._f), keyvalues: self.keyvalues, len: self.len, pos: 0, @@ -335,11 +334,14 @@ impl IntoIterator for FdbValues { /// An iterator of keyvalues owned by a foundationDB future pub struct FdbValuesIter { - f: Rc, + f: Arc, keyvalues: *const fdb_sys::FDBKeyValue, len: i32, pos: i32, } + +unsafe impl Send for FdbValuesIter {} + impl Iterator for FdbValuesIter { type Item = FdbValue; fn next(&mut self) -> Option { @@ -404,9 +406,12 @@ impl DoubleEndedIterator for FdbValuesIter { /// Until dropped, this might prevent multiple key/values from beeing freed. /// (i.e. the future that own the data is dropped once all data it provided is dropped) pub struct FdbValue { - _f: Rc, + _f: Arc, keyvalue: *const fdb_sys::FDBKeyValue, } + +unsafe impl Send for FdbValue {} + impl Deref for FdbValue { type Target = FdbKeyValue; fn deref(&self) -> &Self::Target { diff --git a/foundationdb/src/lib.rs b/foundationdb/src/lib.rs index de15ee5b..f8e4c3af 100644 --- a/foundationdb/src/lib.rs +++ b/foundationdb/src/lib.rs @@ -100,6 +100,7 @@ pub mod api; #[cfg(any(feature = "fdb-5_1", feature = "fdb-5_2", feature = "fdb-6_0"))] pub mod cluster; mod database; +pub mod directory; mod error; pub mod future; mod keyselector; diff --git a/foundationdb/src/tuple/hca.rs b/foundationdb/src/tuple/hca.rs index 59b05e2c..a77963f7 100644 --- a/foundationdb/src/tuple/hca.rs +++ b/foundationdb/src/tuple/hca.rs @@ -91,16 +91,25 @@ impl TransactError for HcaError { /// Represents a High Contention Allocator for a given subspace #[derive(Debug)] pub struct HighContentionAllocator { + // original subspace kept to implement Clone + subspace: Subspace, counters: Subspace, recent: Subspace, allocation_mutex: Mutex<()>, } +impl Clone for HighContentionAllocator { + fn clone(&self) -> Self { + HighContentionAllocator::new(self.subspace.to_owned()) + } +} + impl HighContentionAllocator { /// Constructs an allocator that will use the input subspace for assigning values. /// The given subspace should not be used by anything other than the allocator pub fn new(subspace: Subspace) -> HighContentionAllocator { HighContentionAllocator { + subspace: subspace.clone(), counters: subspace.subspace(&0i64), recent: subspace.subspace(&1i64), allocation_mutex: Mutex::new(()), diff --git a/foundationdb/src/tuple/mod.rs b/foundationdb/src/tuple/mod.rs index 0c66a927..5a4816be 100644 --- a/foundationdb/src/tuple/mod.rs +++ b/foundationdb/src/tuple/mod.rs @@ -5,7 +5,7 @@ mod element; pub mod hca; mod pack; -mod subspace; +pub mod subspace; mod versionstamp; use std::borrow::Cow; @@ -274,11 +274,11 @@ mod tests { // versionstamp test_serde( - Versionstamp::complete(b"\xaa\xbb\xcc\xdd\xee\xff\x00\x01\x02\x03".clone(), 0), + Versionstamp::complete(*b"\xaa\xbb\xcc\xdd\xee\xff\x00\x01\x02\x03", 0), b"\x33\xaa\xbb\xcc\xdd\xee\xff\x00\x01\x02\x03\x00\x00", ); test_serde( - Versionstamp::complete(b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a".clone(), 657), + Versionstamp::complete(*b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a", 657), b"\x33\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x02\x91", ); @@ -349,67 +349,55 @@ mod tests { test_serde(i64::min_value(), b"\x0C\x7f\xff\xff\xff\xff\xff\xff\xff"); test_serde(9252427359321063944i128, b"\x1c\x80g9\xa9np\x02\x08"); - assert!( - match unpack::(b"\x1c\x80g9\xa9np\x02\x08").unwrap_err() { - PackError::UnsupportedIntLength => true, - _ => false, - } - ); + assert!(matches!( + unpack::(b"\x1c\x80g9\xa9np\x02\x08").unwrap_err(), + PackError::UnsupportedIntLength + )); test_serde( -9252427359321063944i128, b"\x0c\x7f\x98\xc6V\x91\x8f\xfd\xf7", ); - assert!( - match unpack::(b"\x0c\x7f\x98\xc6V\x91\x8f\xfd\xf7").unwrap_err() { - PackError::UnsupportedIntLength => true, - _ => false, - } - ); + assert!(matches!( + unpack::(b"\x0c\x7f\x98\xc6V\x91\x8f\xfd\xf7").unwrap_err(), + PackError::UnsupportedIntLength + )); test_serde( u64::max_value() as i128, b"\x1c\xff\xff\xff\xff\xff\xff\xff\xff", ); - assert!( - match unpack::(b"\x1c\xff\xff\xff\xff\xff\xff\xff\xff").unwrap_err() { - PackError::UnsupportedIntLength => true, - _ => false, - } - ); + assert!(matches!( + unpack::(b"\x1c\xff\xff\xff\xff\xff\xff\xff\xff").unwrap_err(), + PackError::UnsupportedIntLength + )); test_serde( -(u64::max_value() as i128), b"\x0c\x00\x00\x00\x00\x00\x00\x00\x00", ); - assert!( - match unpack::(b"\x0c\x00\x00\x00\x00\x00\x00\x00\x00").unwrap_err() { - PackError::UnsupportedIntLength => true, - _ => false, - } - ); + assert!(matches!( + unpack::(b"\x0c\x00\x00\x00\x00\x00\x00\x00\x00").unwrap_err(), + PackError::UnsupportedIntLength + )); test_serde( (i64::max_value() as i128) + 1, b"\x1c\x80\x00\x00\x00\x00\x00\x00\x00", ); - assert!( - match unpack::(b"\x1c\x80\x00\x00\x00\x00\x00\x00\x00").unwrap_err() { - PackError::UnsupportedIntLength => true, - _ => false, - } - ); + assert!(matches!( + unpack::(b"\x1c\x80\x00\x00\x00\x00\x00\x00\x00").unwrap_err(), + PackError::UnsupportedIntLength + )); test_serde( (i64::min_value() as i128) - 1, b"\x0c\x7f\xff\xff\xff\xff\xff\xff\xfe", ); - assert!( - match unpack::(b"\x0c\x7f\xff\xff\xff\xff\xff\xff\xfe").unwrap_err() { - PackError::UnsupportedIntLength => true, - _ => false, - } - ); + assert!(matches!( + unpack::(b"\x0c\x7f\xff\xff\xff\xff\xff\xff\xfe").unwrap_err(), + PackError::UnsupportedIntLength + )); } #[cfg(feature = "num-bigint")] @@ -627,21 +615,21 @@ mod tests { test_serde(Element::Int(-1), &[0x13, 254]); test_serde( Element::Versionstamp(Versionstamp::complete( - b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a".clone(), + *b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a", 657, )), b"\x33\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x02\x91", ); test_serde( (Element::Versionstamp(Versionstamp::complete( - b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a".clone(), + *b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a", 657, )),), b"\x33\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x02\x91", ); test_serde( (Element::Versionstamp(Versionstamp::complete( - b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a".clone(), + *b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a", 657, )),), b"\x33\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x02\x91", diff --git a/foundationdb/src/tuple/pack.rs b/foundationdb/src/tuple/pack.rs index c6975fc1..19a92d1c 100644 --- a/foundationdb/src/tuple/pack.rs +++ b/foundationdb/src/tuple/pack.rs @@ -179,7 +179,7 @@ fn write_bytes(w: &mut W, v: &[u8]) -> io::Result(input: &'de [u8]) -> PackResult<(&'de [u8], Cow<'de, [u8]>)> { +fn parse_slice(input: &[u8]) -> PackResult<(&[u8], Cow<[u8]>)> { let mut bytes = Vec::new(); let mut pos = 0; for idx in memchr_iter(NIL, input) { @@ -203,7 +203,7 @@ fn parse_slice<'de>(input: &'de [u8]) -> PackResult<(&'de [u8], Cow<'de, [u8]>)> Err(PackError::MissingBytes) } -fn parse_string<'de>(input: &'de [u8]) -> PackResult<(&'de [u8], Cow<'de, str>)> { +fn parse_string(input: &[u8]) -> PackResult<(&[u8], Cow)> { let (input, slice) = parse_slice(input)?; Ok(( input, diff --git a/foundationdb/tests/directory.rs b/foundationdb/tests/directory.rs new file mode 100644 index 00000000..1b845dea --- /dev/null +++ b/foundationdb/tests/directory.rs @@ -0,0 +1,75 @@ +// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use foundationdb::directory::directory_layer::DirectoryLayer; + +use foundationdb::directory::Directory; + +use foundationdb::*; + +mod common; + +#[test] +// testing basic features of the Directory, everything is tracked using with the BindingTester. +fn test_directory() { + let _guard = unsafe { foundationdb::boot() }; + let db = futures::executor::block_on(common::database()).expect("cannot open fdb"); + + eprintln!("clearing all keys"); + let trx = db.create_trx().expect("cannot create txn"); + trx.clear_range(b"", b"\xff"); + futures::executor::block_on(trx.commit()).expect("could not clear keys"); + + eprintln!("creating directories"); + let directory = DirectoryLayer::default(); + + futures::executor::block_on(test_create_then_open_then_delete( + &db, + &directory, + vec![String::from("application")], + )) + .expect("failed to run"); + + futures::executor::block_on(test_create_then_open_then_delete( + &db, + &directory, + vec![String::from("1"), String::from("2")], + )) + .expect("failed to run"); +} + +async fn test_create_then_open_then_delete( + db: &Database, + directory: &DirectoryLayer, + paths: Vec, +) -> FdbResult<()> { + let trx = db.create_trx()?; + + eprintln!("creating {:?}", &paths); + let create_output = directory.create(&trx, paths.to_owned(), None, None).await; + assert!( + create_output.is_ok(), + "cannot create: {:?}", + create_output.err().unwrap() + ); + trx.commit().await.expect("cannot commit"); + let trx = db.create_trx()?; + + eprintln!("opening {:?}", &paths); + let open_output = directory.open(&trx, paths.to_owned(), None).await; + assert!( + open_output.is_ok(), + "cannot create: {:?}", + open_output.err().unwrap() + ); + + assert_eq!(create_output.unwrap().bytes(), open_output.unwrap().bytes()); + trx.commit().await.expect("cannot commit"); + + // removing folder + Ok(()) +} diff --git a/foundationdb/tests/hca.rs b/foundationdb/tests/hca.rs index bf37b7c1..b273910d 100644 --- a/foundationdb/tests/hca.rs +++ b/foundationdb/tests/hca.rs @@ -39,9 +39,9 @@ async fn test_hca_many_sequential_allocations_async() -> FdbResult<()> { let mut all_ints = Vec::new(); for _ in 0..N { - let mut tx = db.create_trx()?; + let tx = db.create_trx()?; - let next_int: i64 = hca.allocate(&mut tx).await.unwrap(); + let next_int: i64 = hca.allocate(&tx).await.unwrap(); all_ints.push(next_int); tx.commit().await?; @@ -84,8 +84,8 @@ async fn test_hca_concurrent_allocations_async() -> FdbResult<()> { Ok(()) } -fn check_hca_result_uniqueness(results: &Vec) { - let result_set: HashSet = HashSet::from_iter(results.clone()); +fn check_hca_result_uniqueness(results: &[i64]) { + let result_set: HashSet = HashSet::from_iter(results.to_owned()); if results.len() != result_set.len() { panic!( diff --git a/foundationdb/tests/range.rs b/foundationdb/tests/range.rs index 884ddafe..b74453a9 100644 --- a/foundationdb/tests/range.rs +++ b/foundationdb/tests/range.rs @@ -20,6 +20,7 @@ fn test_range() { futures::executor::block_on(test_get_ranges_async()).expect("failed to run"); } +#[allow(clippy::needless_collect)] async fn test_get_range_async() -> FdbResult<()> { const N: usize = 10000; @@ -51,8 +52,8 @@ async fn test_get_range_async() -> FdbResult<()> { let len = range.len(); let mut i = 0; for kv in &range { - assert!(kv.key().len() > 0); - assert!(kv.value().len() > 0); + assert!(!kv.key().is_empty()); + assert!(!kv.value().is_empty()); i += 1; } assert_eq!(i, len); diff --git a/foundationdb/tests/tokio.rs b/foundationdb/tests/tokio.rs index 0f547c37..551f95ba 100644 --- a/foundationdb/tests/tokio.rs +++ b/foundationdb/tests/tokio.rs @@ -22,7 +22,7 @@ async fn do_transact() { .expect("failed to open fdb"), ); - let adb = db.clone(); + let adb = db; tokio::spawn(async move { async fn txnfn(_txn: &Transaction) -> FdbResult<()> { Ok(()) @@ -45,7 +45,7 @@ async fn do_trx() { .expect("failed to open fdb"), ); - let adb = db.clone(); + let adb = db; tokio::spawn(async move { adb.create_trx() .expect("failed to create trx") diff --git a/foundationdb/tests/watch.rs b/foundationdb/tests/watch.rs index d44a94dd..c4a982f4 100644 --- a/foundationdb/tests/watch.rs +++ b/foundationdb/tests/watch.rs @@ -17,7 +17,7 @@ fn test_watch() { } async fn test_watch_async() -> FdbResult<()> { - const KEY: &'static [u8] = b"test-watch"; + const KEY: &[u8] = b"test-watch"; let db = common::database().await?; @@ -40,7 +40,7 @@ async fn test_watch_async() -> FdbResult<()> { } async fn test_watch_without_commit_async() -> FdbResult<()> { - const KEY: &'static [u8] = b"test-watch-2"; + const KEY: &[u8] = b"test-watch-2"; let db = common::database().await?; diff --git a/scripts/run_bindingtester.sh b/scripts/run_bindingtester.sh index 04a02c01..ab729890 100755 --- a/scripts/run_bindingtester.sh +++ b/scripts/run_bindingtester.sh @@ -32,7 +32,8 @@ esac ## Run the test echo "testers['rust'] = Tester('rust', '${bindingtester}', 2040, 23, MAX_API_VERSION, types=ALL_TYPES) " >> ./bindings/bindingtester/known_testers.py - ./bindings/bindingtester/bindingtester.py --test-name scripted rust - ./bindings/bindingtester/bindingtester.py --num-ops 1000 --api-version 610 --test-name api --compare python rust - ./bindings/bindingtester/bindingtester.py --num-ops 1000 --api-version 610 --test-name api --concurrency 5 rust + python2 ./bindings/bindingtester/bindingtester.py --test-name scripted rust + python2 ./bindings/bindingtester/bindingtester.py --num-ops 1000 --api-version 610 --test-name api --compare python rust + python2 ./bindings/bindingtester/bindingtester.py --num-ops 1000 --api-version 610 --test-name api --concurrency 5 rust + python2 ./bindings/bindingtester/bindingtester.py --num-ops 10000 --api-version 610 --test-name directory --concurrency 1 rust --no-directory-snapshot-ops --compare )