From b8f79d937cb9dcac3a897119bc5f10bbd4f3f589 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Mon, 15 Jul 2024 16:38:50 +0800 Subject: [PATCH] Support user defined timestamp in rust bindings (#901) --- src/comparator.rs | 127 ++++++++- src/db.rs | 244 ++++++++++++++++++ src/db_iterator.rs | 2 +- src/db_options.rs | 74 ++++++ src/sst_file_writer.rs | 47 +++- src/write_batch.rs | 50 ++++ ...tionfilter.rs => test_compactionfilter.rs} | 0 tests/test_comparator.rs | 182 ++++++++++++- tests/test_db.rs | 28 ++ tests/test_sst_file_writer.rs | 68 ++++- 10 files changed, 815 insertions(+), 7 deletions(-) rename tests/{test_compationfilter.rs => test_compactionfilter.rs} (100%) diff --git a/src/comparator.rs b/src/comparator.rs index 9cfc89549..0fa507d90 100644 --- a/src/comparator.rs +++ b/src/comparator.rs @@ -13,13 +13,43 @@ // limitations under the License. // -use libc::{c_char, c_int, c_void, size_t}; +use libc::{c_char, c_int, c_uchar, c_void, size_t}; use std::cmp::Ordering; +use std::convert::TryInto; use std::ffi::CString; +use std::mem::size_of; use std::slice; pub type CompareFn = dyn Fn(&[u8], &[u8]) -> Ordering; +// Use u64 as the timestamp. This is based on two reasons: +// 1. Follows the logic of [BytewiseComparatorWithU64Ts](https://github.com/facebook/rocksdb/blob/3db030d7ee1b887ce818ec6f6a8d10949f9e9a22/util/comparator.cc#L238) +// 2. u64 is the return type of [Duration::as_secs()](https://doc.rust-lang.org/nightly/std/time/struct.Duration.html#method.as_secs) +fn strip_timestamp_from_user_key(user_key: &[u8], ts_sz: usize) -> &[u8] { + &user_key[..user_key.len() - ts_sz] +} + +fn extract_timestamp_from_user_key(user_key: &[u8], ts_sz: usize) -> &[u8] { + &user_key[user_key.len() - ts_sz..] +} + +// Caller should ensure the pointer is valid and has at least 8 bytes, +// As the slice::from_raw_parts does in compare_ts_callback +#[inline] +fn decode_timestamp(ptr: &[u8]) -> u64 { + u64::from_be_bytes(ptr[..8].try_into().unwrap()) +} + +fn compare_ts(a: &[u8], b: &[u8]) -> c_int { + let a = decode_timestamp(a); + let b = decode_timestamp(b); + match a.cmp(&b) { + Ordering::Less => -1, + Ordering::Equal => 0, + Ordering::Greater => 1, + } +} + pub struct ComparatorCallback { pub name: CString, pub f: Box, @@ -51,3 +81,98 @@ pub unsafe extern "C" fn compare_callback( Ordering::Greater => 1, } } + +/// For two events e1 and e2 whose timestamps are t1 and t2 respectively, +/// Returns value: +/// < 0 iff t1 < t2 +/// == 0 iff t1 == t2 +/// > 0 iff t1 > t2 +/// Note that an all-zero byte array will be the smallest (oldest) timestamp +/// of the same length, and a byte array with all bits 1 will be the largest. +pub unsafe extern "C" fn compare_ts_callback( + raw_cb: *mut c_void, + a_ts: *const c_char, + a_ts_len: size_t, + b_ts: *const c_char, + b_ts_len: size_t, +) -> c_int { + let _: &mut ComparatorCallback = &mut *(raw_cb as *mut ComparatorCallback); + assert_eq!(a_ts_len, size_of::()); + assert_eq!(b_ts_len, size_of::()); + let a: &[u8] = slice::from_raw_parts(a_ts as *const u8, a_ts_len); + let b: &[u8] = slice::from_raw_parts(b_ts as *const u8, b_ts_len); + compare_ts(a, b) +} + +/// Three-way comparison. Returns value: +/// < 0 iff "a" < "b", +/// == 0 iff "a" == "b", +/// > 0 iff "a" > "b" +/// Note this callback also compares timestamp. +/// For the same user key with different timestamps, larger (newer) +/// timestamp comes first. +pub unsafe extern "C" fn compare_with_ts_callback( + raw_cb: *mut c_void, + a_raw: *const c_char, + a_len: size_t, + b_raw: *const c_char, + b_len: size_t, +) -> c_int { + let cb: &mut ComparatorCallback = &mut *(raw_cb as *mut ComparatorCallback); + let a: &[u8] = slice::from_raw_parts(a_raw as *const u8, a_len); + let b: &[u8] = slice::from_raw_parts(b_raw as *const u8, b_len); + let ts_sz = size_of::(); + let a_key = strip_timestamp_from_user_key(a, ts_sz); + let b_key = strip_timestamp_from_user_key(b, ts_sz); + + let res = match (cb.f)(a_key, b_key) { + Ordering::Less => -1, + Ordering::Equal => 0, + Ordering::Greater => 1, + }; + if res != 0 { + return res; + } + let a_ts = extract_timestamp_from_user_key(a, ts_sz); + let b_ts = extract_timestamp_from_user_key(b, ts_sz); + -compare_ts(a_ts, b_ts) +} + +/// Three-way comparison. Returns value: +/// < 0 iff "a" < "b", +/// == 0 iff "a" == "b", +/// > 0 iff "a" > "b" +/// Note this callback ignores timestamp during comparison. +pub unsafe extern "C" fn compare_without_ts_callback( + raw_cb: *mut c_void, + a_raw: *const c_char, + a_len: size_t, + a_has_ts: c_uchar, + b_raw: *const c_char, + b_len: size_t, + b_has_ts: c_uchar, +) -> c_int { + let cb: &mut ComparatorCallback = &mut *(raw_cb as *mut ComparatorCallback); + let a: &[u8] = slice::from_raw_parts(a_raw as *const u8, a_len); + let b: &[u8] = slice::from_raw_parts(b_raw as *const u8, b_len); + let ts_sz = size_of::(); + let a_has_ts = a_has_ts != 0; + let b_has_ts = b_has_ts != 0; + assert!(!a_has_ts || a.len() >= ts_sz); + assert!(!b_has_ts || b.len() >= ts_sz); + let lhs = if a_has_ts { + strip_timestamp_from_user_key(a, ts_sz) + } else { + a + }; + let rhs = if b_has_ts { + strip_timestamp_from_user_key(b, ts_sz) + } else { + b + }; + match (cb.f)(lhs, rhs) { + Ordering::Less => -1, + Ordering::Equal => 0, + Ordering::Greater => 1, + } +} diff --git a/src/db.rs b/src/db.rs index 7f0a6de53..ac181a0ac 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1544,6 +1544,80 @@ impl DBCommon { } } + /// Set the database entry for "key" to "value" with WriteOptions. + /// If "key" already exists, it will coexist with previous entry. + /// `Get` with a timestamp ts specified in ReadOptions will return + /// the most recent key/value whose timestamp is smaller than or equal to ts. + /// Takes an additional argument `ts` as the timestamp. + /// Note: the DB must be opened with user defined timestamp enabled. + pub fn put_with_ts_opt( + &self, + key: K, + ts: S, + value: V, + writeopts: &WriteOptions, + ) -> Result<(), Error> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + S: AsRef<[u8]>, + { + let key = key.as_ref(); + let value = value.as_ref(); + let ts = ts.as_ref(); + unsafe { + ffi_try!(ffi::rocksdb_put_with_ts( + self.inner.inner(), + writeopts.inner, + key.as_ptr() as *const c_char, + key.len() as size_t, + ts.as_ptr() as *const c_char, + ts.len() as size_t, + value.as_ptr() as *const c_char, + value.len() as size_t, + )); + Ok(()) + } + } + + /// Put with timestamp in a specific column family with WriteOptions. + /// If "key" already exists, it will coexist with previous entry. + /// `Get` with a timestamp ts specified in ReadOptions will return + /// the most recent key/value whose timestamp is smaller than or equal to ts. + /// Takes an additional argument `ts` as the timestamp. + /// Note: the DB must be opened with user defined timestamp enabled. + pub fn put_cf_with_ts_opt( + &self, + cf: &impl AsColumnFamilyRef, + key: K, + ts: S, + value: V, + writeopts: &WriteOptions, + ) -> Result<(), Error> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + S: AsRef<[u8]>, + { + let key = key.as_ref(); + let value = value.as_ref(); + let ts = ts.as_ref(); + unsafe { + ffi_try!(ffi::rocksdb_put_cf_with_ts( + self.inner.inner(), + writeopts.inner, + cf.inner(), + key.as_ptr() as *const c_char, + key.len() as size_t, + ts.as_ptr() as *const c_char, + ts.len() as size_t, + value.as_ptr() as *const c_char, + value.len() as size_t, + )); + Ok(()) + } + } + pub fn merge_opt(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error> where K: AsRef<[u8]>, @@ -1631,6 +1705,64 @@ impl DBCommon { } } + /// Remove the database entry (if any) for "key" with WriteOptions. + /// Takes an additional argument `ts` as the timestamp. + /// Note: the DB must be opened with user defined timestamp enabled. + pub fn delete_with_ts_opt( + &self, + key: K, + ts: S, + writeopts: &WriteOptions, + ) -> Result<(), Error> + where + K: AsRef<[u8]>, + S: AsRef<[u8]>, + { + let key = key.as_ref(); + let ts = ts.as_ref(); + unsafe { + ffi_try!(ffi::rocksdb_delete_with_ts( + self.inner.inner(), + writeopts.inner, + key.as_ptr() as *const c_char, + key.len() as size_t, + ts.as_ptr() as *const c_char, + ts.len() as size_t, + )); + Ok(()) + } + } + + /// Delete with timestamp in a specific column family with WriteOptions. + /// Takes an additional argument `ts` as the timestamp. + /// Note: the DB must be opened with user defined timestamp enabled. + pub fn delete_cf_with_ts_opt( + &self, + cf: &impl AsColumnFamilyRef, + key: K, + ts: S, + writeopts: &WriteOptions, + ) -> Result<(), Error> + where + K: AsRef<[u8]>, + S: AsRef<[u8]>, + { + let key = key.as_ref(); + let ts = ts.as_ref(); + unsafe { + ffi_try!(ffi::rocksdb_delete_cf_with_ts( + self.inner.inner(), + writeopts.inner, + cf.inner(), + key.as_ptr() as *const c_char, + key.len() as size_t, + ts.as_ptr() as *const c_char, + ts.len() as size_t, + )); + Ok(()) + } + } + pub fn put(&self, key: K, value: V) -> Result<(), Error> where K: AsRef<[u8]>, @@ -1647,6 +1779,53 @@ impl DBCommon { self.put_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default()) } + /// Set the database entry for "key" to "value". + /// If "key" already exists, it will coexist with previous entry. + /// `Get` with a timestamp ts specified in ReadOptions will return + /// the most recent key/value whose timestamp is smaller than or equal to ts. + /// Takes an additional argument `ts` as the timestamp. + /// Note: the DB must be opened with user defined timestamp enabled. + pub fn put_with_ts(&self, key: K, ts: S, value: V) -> Result<(), Error> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + S: AsRef<[u8]>, + { + self.put_with_ts_opt( + key.as_ref(), + ts.as_ref(), + value.as_ref(), + &WriteOptions::default(), + ) + } + + /// Put with timestamp in a specific column family. + /// If "key" already exists, it will coexist with previous entry. + /// `Get` with a timestamp ts specified in ReadOptions will return + /// the most recent key/value whose timestamp is smaller than or equal to ts. + /// Takes an additional argument `ts` as the timestamp. + /// Note: the DB must be opened with user defined timestamp enabled. + pub fn put_cf_with_ts( + &self, + cf: &impl AsColumnFamilyRef, + key: K, + ts: S, + value: V, + ) -> Result<(), Error> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + S: AsRef<[u8]>, + { + self.put_cf_with_ts_opt( + cf, + key.as_ref(), + ts.as_ref(), + value.as_ref(), + &WriteOptions::default(), + ) + } + pub fn merge(&self, key: K, value: V) -> Result<(), Error> where K: AsRef<[u8]>, @@ -1675,6 +1854,29 @@ impl DBCommon { self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default()) } + /// Remove the database entry (if any) for "key". + /// Takes an additional argument `ts` as the timestamp. + /// Note: the DB must be opened with user defined timestamp enabled. + pub fn delete_with_ts, S: AsRef<[u8]>>( + &self, + key: K, + ts: S, + ) -> Result<(), Error> { + self.delete_with_ts_opt(key.as_ref(), ts.as_ref(), &WriteOptions::default()) + } + + /// Delete with timestamp in a specific column family. + /// Takes an additional argument `ts` as the timestamp. + /// Note: the DB must be opened with user defined timestamp enabled. + pub fn delete_cf_with_ts, S: AsRef<[u8]>>( + &self, + cf: &impl AsColumnFamilyRef, + key: K, + ts: S, + ) -> Result<(), Error> { + self.delete_cf_with_ts_opt(cf, key.as_ref(), ts.as_ref(), &WriteOptions::default()) + } + /// Runs a manual compaction on the Range of keys given. This is not likely to be needed for typical usage. pub fn compact_range, E: AsRef<[u8]>>(&self, start: Option, end: Option) { unsafe { @@ -2198,6 +2400,48 @@ impl DBCommon { drop(cf); Ok(()) } + + /// Increase the full_history_ts of column family. The new ts_low value should + /// be newer than current full_history_ts value. + /// If another thread updates full_history_ts_low concurrently to a higher + /// timestamp than the requested ts_low, a try again error will be returned. + pub fn increase_full_history_ts_low>( + &self, + cf: &impl AsColumnFamilyRef, + ts: S, + ) -> Result<(), Error> { + let ts = ts.as_ref(); + unsafe { + ffi_try!(ffi::rocksdb_increase_full_history_ts_low( + self.inner.inner(), + cf.inner(), + ts.as_ptr() as *const c_char, + ts.len() as size_t, + )); + Ok(()) + } + } + + /// Get current full_history_ts value. + pub fn get_full_history_ts_low(&self, cf: &impl AsColumnFamilyRef) -> Result, Error> { + unsafe { + let mut ts_lowlen = 0; + let ts = ffi_try!(ffi::rocksdb_get_full_history_ts_low( + self.inner.inner(), + cf.inner(), + &mut ts_lowlen, + )); + + if ts.is_null() { + Err(Error::new("Could not get full_history_ts_low".to_owned())) + } else { + let mut vec = vec![0; ts_lowlen]; + ptr::copy_nonoverlapping(ts as *mut u8, vec.as_mut_ptr(), ts_lowlen); + ffi::rocksdb_free(ts as *mut c_void); + Ok(vec) + } + } + } } impl DBCommon { diff --git a/src/db_iterator.rs b/src/db_iterator.rs index 846cfcf25..a68939c34 100644 --- a/src/db_iterator.rs +++ b/src/db_iterator.rs @@ -28,7 +28,7 @@ pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>; /// This iterator is different to the standard ``DBIteratorWithThreadMode`` as it aims Into /// replicate the underlying iterator API within RocksDB itself. This should /// give access to more performance and flexibility but departs from the -/// widely recognised Rust idioms. +/// widely recognized Rust idioms. /// /// ``` /// use rocksdb::{DB, Options}; diff --git a/src/db_options.rs b/src/db_options.rs index efddc69bf..dc246139d 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::ffi::CStr; +use std::mem::size_of; use std::path::Path; use std::ptr::{null_mut, NonNull}; use std::slice; @@ -1575,6 +1576,33 @@ impl Options { } } + /// Sets the comparator that are timestamp-aware, used to define the order of keys in the table, + /// taking timestamp into consideration. + /// Find more information on timestamp-aware comparator on [here](https://github.com/facebook/rocksdb/wiki/User-defined-Timestamp) + /// + /// The client must ensure that the comparator supplied here has the same + /// name and orders keys *exactly* the same as the comparator provided to + /// previous open calls on the same DB. + pub fn set_comparator_with_ts(&mut self, name: impl CStrLike, compare_fn: Box) { + let cb = Box::new(ComparatorCallback { + name: name.into_c_string().unwrap(), + f: compare_fn, + }); + let ts_size = size_of::(); + unsafe { + let cmp = ffi::rocksdb_comparator_with_ts_create( + Box::into_raw(cb).cast::(), + Some(comparator::destructor_callback), + Some(comparator::compare_with_ts_callback), + Some(comparator::compare_ts_callback), + Some(comparator::compare_without_ts_callback), + Some(comparator::name_callback), + ts_size, + ); + ffi::rocksdb_options_set_comparator(self.inner, cmp); + } + } + pub fn set_prefix_extractor(&mut self, prefix_extractor: SliceTransform) { unsafe { ffi::rocksdb_options_set_prefix_extractor(self.inner, prefix_extractor.inner); @@ -3833,6 +3861,39 @@ impl ReadOptions { ffi::rocksdb_readoptions_set_async_io(self.inner, c_uchar::from(v)); } } + + /// Timestamp of operation. Read should return the latest data visible to the + /// specified timestamp. All timestamps of the same database must be of the + /// same length and format. The user is responsible for providing a customized + /// compare function via Comparator to order tuples. + /// For iterator, iter_start_ts is the lower bound (older) and timestamp + /// serves as the upper bound. Versions of the same record that fall in + /// the timestamp range will be returned. If iter_start_ts is nullptr, + /// only the most recent version visible to timestamp is returned. + /// The user-specified timestamp feature is still under active development, + /// and the API is subject to change. + pub fn set_timestamp>(&mut self, ts: S) { + let ts = ts.as_ref(); + unsafe { + ffi::rocksdb_readoptions_set_timestamp( + self.inner, + ts.as_ptr() as *const c_char, + ts.len() as size_t, + ); + } + } + + /// See `set_timestamp` + pub fn set_iter_start_ts>(&mut self, ts: S) { + let ts = ts.as_ref(); + unsafe { + ffi::rocksdb_readoptions_set_iter_start_ts( + self.inner, + ts.as_ptr() as *const c_char, + ts.len() as size_t, + ); + } + } } impl Default for ReadOptions { @@ -4253,6 +4314,19 @@ impl CompactOptions { ffi::rocksdb_compactoptions_set_target_level(self.inner, lvl); } } + + /// Set user-defined timestamp low bound, the data with older timestamp than + /// low bound maybe GCed by compaction. Default: nullptr + pub fn set_full_history_ts_low>(&mut self, ts: S) { + let ts = ts.as_ref(); + unsafe { + ffi::rocksdb_compactoptions_set_full_history_ts_low( + self.inner, + ts.as_ptr() as *mut c_char, + ts.len() as size_t, + ); + } + } } pub struct WaitForCompactOptions { diff --git a/src/sst_file_writer.rs b/src/sst_file_writer.rs index 81a2f0962..f2b5a180c 100644 --- a/src/sst_file_writer.rs +++ b/src/sst_file_writer.rs @@ -109,7 +109,6 @@ impl<'a> SstFileWriter<'a> { { let key = key.as_ref(); let value = value.as_ref(); - unsafe { ffi_try!(ffi::rocksdb_sstfilewriter_put( self.inner, @@ -122,6 +121,31 @@ impl<'a> SstFileWriter<'a> { } } + /// Adds a Put key with value to currently opened file + /// REQUIRES: key is after any previously added key according to comparator. + pub fn put_with_ts(&mut self, key: K, ts: S, value: V) -> Result<(), Error> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + S: AsRef<[u8]>, + { + let key = key.as_ref(); + let value = value.as_ref(); + let ts = ts.as_ref(); + unsafe { + ffi_try!(ffi::rocksdb_sstfilewriter_put_with_ts( + self.inner, + key.as_ptr() as *const c_char, + key.len() as size_t, + ts.as_ptr() as *const c_char, + ts.len() as size_t, + value.as_ptr() as *const c_char, + value.len() as size_t, + )); + Ok(()) + } + } + /// Adds a Merge key with value to currently opened file /// REQUIRES: key is after any previously added key according to comparator. pub fn merge(&mut self, key: K, value: V) -> Result<(), Error> @@ -158,6 +182,27 @@ impl<'a> SstFileWriter<'a> { Ok(()) } } + + /// Adds a deletion key to currently opened file + /// REQUIRES: key is after any previously added key according to comparator. + pub fn delete_with_ts, S: AsRef<[u8]>>( + &mut self, + key: K, + ts: S, + ) -> Result<(), Error> { + let key = key.as_ref(); + let ts = ts.as_ref(); + unsafe { + ffi_try!(ffi::rocksdb_sstfilewriter_delete_with_ts( + self.inner, + key.as_ptr() as *const c_char, + key.len() as size_t, + ts.as_ptr() as *const c_char, + ts.len() as size_t, + )); + Ok(()) + } + } } impl<'a> Drop for SstFileWriter<'a> { diff --git a/src/write_batch.rs b/src/write_batch.rs index 2aff80997..e35411e3b 100644 --- a/src/write_batch.rs +++ b/src/write_batch.rs @@ -168,6 +168,7 @@ impl WriteBatchWithTransaction { } } + /// Insert a value into the specific column family of the database under the given key. pub fn put_cf(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V) where K: AsRef<[u8]>, @@ -188,6 +189,31 @@ impl WriteBatchWithTransaction { } } + /// Insert a value into the specific column family of the database + /// under the given key with timestamp. + pub fn put_cf_with_ts(&mut self, cf: &impl AsColumnFamilyRef, key: K, ts: S, value: V) + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + S: AsRef<[u8]>, + { + let key = key.as_ref(); + let value = value.as_ref(); + let ts = ts.as_ref(); + unsafe { + ffi::rocksdb_writebatch_put_cf_with_ts( + self.inner, + cf.inner(), + key.as_ptr() as *const c_char, + key.len() as size_t, + ts.as_ptr() as *const c_char, + ts.len() as size_t, + value.as_ptr() as *const c_char, + value.len() as size_t, + ); + } + } + pub fn merge(&mut self, key: K, value: V) where K: AsRef<[u8]>, @@ -240,6 +266,8 @@ impl WriteBatchWithTransaction { } } + /// Removes the database entry in the specific column family for key. + /// Does nothing if the key was not found. pub fn delete_cf>(&mut self, cf: &impl AsColumnFamilyRef, key: K) { let key = key.as_ref(); @@ -253,6 +281,28 @@ impl WriteBatchWithTransaction { } } + /// Removes the database entry in the specific column family with timestamp for key. + /// Does nothing if the key was not found. + pub fn delete_cf_with_ts, S: AsRef<[u8]>>( + &mut self, + cf: &impl AsColumnFamilyRef, + key: K, + ts: S, + ) { + let key = key.as_ref(); + let ts = ts.as_ref(); + unsafe { + ffi::rocksdb_writebatch_delete_cf_with_ts( + self.inner, + cf.inner(), + key.as_ptr() as *const c_char, + key.len() as size_t, + ts.as_ptr() as *const c_char, + ts.len() as size_t, + ); + } + } + /// Clear all updates buffered in this batch. pub fn clear(&mut self) { unsafe { diff --git a/tests/test_compationfilter.rs b/tests/test_compactionfilter.rs similarity index 100% rename from tests/test_compationfilter.rs rename to tests/test_compactionfilter.rs diff --git a/tests/test_comparator.rs b/tests/test_comparator.rs index 7550cdbf4..001effc69 100644 --- a/tests/test_comparator.rs +++ b/tests/test_comparator.rs @@ -1,4 +1,4 @@ -use rocksdb::{Options, DB}; +use rocksdb::{CompactOptions, Options, ReadOptions, DB}; use std::cmp::Ordering; use std::iter::FromIterator; @@ -39,13 +39,12 @@ pub fn write_to_db_with_comparator(compare_fn: Box) -> Vec { #[test] /// First verify that using a function as a comparator works as expected -/// This should verify backwards compatablity +/// This should verify backwards compatibility /// Then run a test with a clojure where an x-variable is passed /// Keep in mind that this variable must be moved to the clojure /// Then run a test with a reverse sorting clojure and make sure the order is reverted fn test_comparator() { let local_compare = move |one: &[u8], two: &[u8]| one.cmp(two); - let x = 0; let local_compare_reverse = move |one: &[u8], two: &[u8]| { println!( @@ -72,3 +71,180 @@ fn test_comparator() { ); assert_eq!(vec!["b-key", "a-key"], res_closure_reverse); } + +#[inline] +pub fn encode_timestamp(ts: u64) -> [u8; 8] { + ts.to_be_bytes() +} + +#[test] +fn test_comparator_with_ts() { + let path = "_path_for_rocksdb_storage_with_ts"; + let _ = DB::destroy(&Options::default(), path); + + { + let mut db_opts = Options::default(); + db_opts.create_missing_column_families(true); + db_opts.create_if_missing(true); + let compare_fn = move |one: &[u8], two: &[u8]| one.cmp(two); + db_opts.set_comparator_with_ts("cname", Box::new(compare_fn)); + let db = DB::open(&db_opts, path).unwrap(); + + let key = b"hello"; + let val1 = b"world0"; + let val2 = b"world1"; + + let ts = &encode_timestamp(1); + let ts2 = &encode_timestamp(2); + let ts3 = &encode_timestamp(3); + + let mut opts = ReadOptions::default(); + opts.set_timestamp(ts); + + // basic put and get + db.put_with_ts(key, ts, val1).unwrap(); + let value = db.get_opt(key, &opts).unwrap(); + assert_eq!(value.unwrap().as_slice(), val1); + + // update + db.put_with_ts(key, ts2, val2).unwrap(); + opts.set_timestamp(ts2); + let value = db.get_opt(key, &opts).unwrap(); + assert_eq!(value.unwrap().as_slice(), val2); + + // delete + db.delete_with_ts(key, ts3).unwrap(); + opts.set_timestamp(ts3); + let value = db.get_opt(key, &opts).unwrap(); + assert!(value.is_none()); + + // ts2 should read deleted data + opts.set_timestamp(ts2); + let value = db.get_opt(key, &opts).unwrap(); + assert_eq!(value.unwrap().as_slice(), val2); + + // ts1 should read old data + opts.set_timestamp(ts); + let value = db.get_opt(key, &opts).unwrap(); + assert_eq!(value.unwrap().as_slice(), val1); + + // test iterator with ts + opts.set_timestamp(ts2); + let mut iter = db.raw_iterator_opt(opts); + iter.seek_to_first(); + let mut result_vec = Vec::new(); + while iter.valid() { + let key = iter.key().unwrap(); + // maybe not best way to copy? + let key_str = key.iter().map(|b| *b as char).collect::>(); + result_vec.push(String::from_iter(key_str)); + iter.next(); + } + assert_eq!(result_vec, ["hello"]); + + // test full_history_ts_low works + let mut compact_opts = CompactOptions::default(); + compact_opts.set_full_history_ts_low(ts2); + db.compact_range_opt(None::<&[u8]>, None::<&[u8]>, &compact_opts); + db.flush().unwrap(); + + let mut opts = ReadOptions::default(); + opts.set_timestamp(ts3); + let value = db.get_opt(key, &opts).unwrap(); + assert_eq!(value, None); + // cannot read with timestamp older than full_history_ts_low + opts.set_timestamp(ts); + assert!(db.get_opt(key, &opts).is_err()); + } + + let _ = DB::destroy(&Options::default(), path); +} + +#[test] +fn test_comparator_with_column_family_with_ts() { + let path = "_path_for_rocksdb_storage_with_column_family_with_ts"; + let _ = DB::destroy(&Options::default(), path); + + { + let mut db_opts = Options::default(); + db_opts.create_missing_column_families(true); + db_opts.create_if_missing(true); + + let mut cf_opts = Options::default(); + let compare_fn = move |one: &[u8], two: &[u8]| one.cmp(two); + cf_opts.set_comparator_with_ts("cname", Box::new(compare_fn)); + + let cfs = vec![("cf", cf_opts)]; + + let db = DB::open_cf_with_opts(&db_opts, path, cfs).unwrap(); + let cf = db.cf_handle("cf").unwrap(); + + let key = b"hello"; + let val1 = b"world0"; + let val2 = b"world1"; + + let ts = &encode_timestamp(1); + let ts2 = &encode_timestamp(2); + let ts3 = &encode_timestamp(3); + + let mut opts = ReadOptions::default(); + opts.set_timestamp(ts); + + // basic put and get + db.put_cf_with_ts(&cf, key, ts, val1).unwrap(); + let value = db.get_cf_opt(&cf, key, &opts).unwrap(); + assert_eq!(value.unwrap().as_slice(), val1); + + // update + db.put_cf_with_ts(&cf, key, ts2, val2).unwrap(); + opts.set_timestamp(ts2); + let value = db.get_cf_opt(&cf, key, &opts).unwrap(); + assert_eq!(value.unwrap().as_slice(), val2); + + // delete + db.delete_cf_with_ts(&cf, key, ts3).unwrap(); + opts.set_timestamp(ts3); + let value = db.get_cf_opt(&cf, key, &opts).unwrap(); + assert!(value.is_none()); + + // ts2 should read deleted data + opts.set_timestamp(ts2); + let value = db.get_cf_opt(&cf, key, &opts).unwrap(); + assert_eq!(value.unwrap().as_slice(), val2); + + // ts1 should read old data + opts.set_timestamp(ts); + let value = db.get_cf_opt(&cf, key, &opts).unwrap(); + assert_eq!(value.unwrap().as_slice(), val1); + + // test iterator with ts + opts.set_timestamp(ts2); + let mut iter = db.raw_iterator_cf_opt(&cf, opts); + iter.seek_to_first(); + let mut result_vec = Vec::new(); + while iter.valid() { + let key = iter.key().unwrap(); + // maybe not best way to copy? + let key_str = key.iter().map(|b| *b as char).collect::>(); + result_vec.push(String::from_iter(key_str)); + iter.next(); + } + assert_eq!(result_vec, ["hello"]); + + // test full_history_ts_low works + let mut compact_opts = CompactOptions::default(); + compact_opts.set_full_history_ts_low(ts2); + db.compact_range_cf_opt(&cf, None::<&[u8]>, None::<&[u8]>, &compact_opts); + db.flush().unwrap(); + + let mut opts = ReadOptions::default(); + opts.set_timestamp(ts3); + let value = db.get_cf_opt(&cf, key, &opts).unwrap(); + assert_eq!(value, None); + // cannot read with timestamp older than full_history_ts_low + opts.set_timestamp(ts); + assert!(db.get_cf_opt(&cf, key, &opts).is_err()); + } + + let _ = DB::destroy(&Options::default(), path); +} diff --git a/tests/test_db.rs b/tests/test_db.rs index 62a5847dd..10e836b83 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -1569,3 +1569,31 @@ fn test_atomic_flush_cfs() { ); } } + +#[test] +fn test_full_history_ts_low() { + let path = DBPath::new("_rust_full_history_ts_low"); + let _ = DB::destroy(&Options::default(), &path); + + { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + let mut cf_opts = Options::default(); + let compare_fn = move |one: &[u8], two: &[u8]| one.cmp(two); + cf_opts.set_comparator_with_ts("cname", Box::new(compare_fn)); + + let cfs = vec![("cf", cf_opts)]; + + let db = DB::open_cf_with_opts(&opts, &path, cfs).unwrap(); + let cf = db.cf_handle("cf").unwrap(); + + let ts = &1u64.to_be_bytes(); + db.increase_full_history_ts_low(&cf, ts).unwrap(); + let ret = db.get_full_history_ts_low(&cf); + assert_eq!(ts, ret.unwrap().as_slice()); + + let _ = DB::destroy(&Options::default(), &path); + } +} diff --git a/tests/test_sst_file_writer.rs b/tests/test_sst_file_writer.rs index 5376f5ab7..39a7f9c07 100644 --- a/tests/test_sst_file_writer.rs +++ b/tests/test_sst_file_writer.rs @@ -16,7 +16,7 @@ mod util; use pretty_assertions::assert_eq; -use rocksdb::{Error, Options, SstFileWriter, DB}; +use rocksdb::{Error, Options, ReadOptions, SstFileWriter, DB}; use util::DBPath; #[test] @@ -50,3 +50,69 @@ fn sst_file_writer_works() { assert!(db.get(b"k3").unwrap().is_none()); } } + +#[inline] +pub fn encode_timestamp(ts: u64) -> [u8; 8] { + ts.to_be_bytes() +} + +#[test] +fn sst_file_writer_with_ts_works() { + let db_path = DBPath::new("_rust_rocksdb_sstfilewritertest_with_ts"); + let dir = tempfile::Builder::new() + .prefix("_rust_rocksdb_sstfilewritertest_with_ts") + .tempdir() + .expect("Failed to create temporary path for file writer."); + let writer_path = dir.path().join("filewriter"); + let compare_fn = move |one: &[u8], two: &[u8]| one.cmp(two); + + let ts = &encode_timestamp(1); + let ts2 = &encode_timestamp(2); + let ts3 = &encode_timestamp(3); + { + let mut opts = Options::default(); + opts.set_comparator_with_ts("cname", Box::new(compare_fn)); + let mut writer = SstFileWriter::create(&opts); + writer.open(&writer_path).unwrap(); + writer.put_with_ts(b"k1", ts, b"v1").unwrap(); + + writer.put_with_ts(b"k2", ts2, b"v2").unwrap(); + + writer.put_with_ts(b"k3", ts2, b"v3").unwrap(); + writer.finish().unwrap(); + assert!(writer.file_size() > 0); + } + + { + let _ = DB::destroy(&Options::default(), &db_path); + + let mut db_opts = Options::default(); + db_opts.create_missing_column_families(true); + db_opts.create_if_missing(true); + db_opts.set_comparator_with_ts("cname", Box::new(compare_fn)); + + let db = DB::open(&db_opts, &db_path).unwrap(); + db.ingest_external_file(vec![&writer_path]).unwrap(); + db.delete_with_ts(b"k3", ts3).unwrap(); + + let mut opts = ReadOptions::default(); + opts.set_timestamp(ts); + + let r: Result>, Error> = db.get_opt(b"k1", &opts); + assert_eq!(r.unwrap().unwrap(), b"v1"); + + // at ts1 k2 should be invisible + assert!(db.get_opt(b"k2", &opts).unwrap().is_none()); + + // at ts2 k2 and k3 should be visible + opts.set_timestamp(ts2); + let r: Result>, Error> = db.get_opt(b"k2", &opts); + assert_eq!(r.unwrap().unwrap(), b"v2"); + let r = db.get_opt(b"k3", &opts); + assert_eq!(r.unwrap().unwrap(), b"v3"); + + // at ts3 the k3 should be deleted + opts.set_timestamp(ts3); + assert!(db.get_opt(b"k3", &opts).unwrap().is_none()); + } +}