From 1710120e4549e04ba3baa6a1ee5a5a801fa45a72 Mon Sep 17 00:00:00 2001 From: Larry <26318510+larry0x@users.noreply.github.com> Date: Fri, 26 Jul 2024 22:00:24 +0100 Subject: [PATCH] Improvements to user defined timestamp (#905) --- src/comparator.rs | 213 ++++++++++++---------------------- src/db_options.rs | 122 +++++++++++++------ tests/test_comparator.rs | 43 ++++--- tests/test_db.rs | 17 ++- tests/test_sst_file_writer.rs | 34 +++--- tests/util/mod.rs | 118 ++++++++++++++++++- 6 files changed, 336 insertions(+), 211 deletions(-) diff --git a/src/comparator.rs b/src/comparator.rs index 0fa507d90..3e3b40f99 100644 --- a/src/comparator.rs +++ b/src/comparator.rs @@ -15,164 +15,103 @@ use libc::{c_char, c_int, c_uchar, c_void, size_t}; use std::cmp::Ordering; -use std::convert::TryInto; use std::ffi::CString; -use std::mem::size_of; use std::slice; pub type CompareFn = dyn Fn(&[u8], &[u8]) -> Ordering; -// Use u64 as the timestamp. This is based on two reasons: -// 1. Follows the logic of [BytewiseComparatorWithU64Ts](https://github.com/facebook/rocksdb/blob/3db030d7ee1b887ce818ec6f6a8d10949f9e9a22/util/comparator.cc#L238) -// 2. u64 is the return type of [Duration::as_secs()](https://doc.rust-lang.org/nightly/std/time/struct.Duration.html#method.as_secs) -fn strip_timestamp_from_user_key(user_key: &[u8], ts_sz: usize) -> &[u8] { - &user_key[..user_key.len() - ts_sz] -} +pub type CompareTsFn = dyn Fn(&[u8], &[u8]) -> Ordering; -fn extract_timestamp_from_user_key(user_key: &[u8], ts_sz: usize) -> &[u8] { - &user_key[user_key.len() - ts_sz..] -} +pub type CompareWithoutTsFn = dyn Fn(&[u8], bool, &[u8], bool) -> Ordering; -// Caller should ensure the pointer is valid and has at least 8 bytes, -// As the slice::from_raw_parts does in compare_ts_callback -#[inline] -fn decode_timestamp(ptr: &[u8]) -> u64 { - u64::from_be_bytes(ptr[..8].try_into().unwrap()) +pub struct ComparatorCallback { + pub name: CString, + pub compare_fn: Box, } -fn compare_ts(a: &[u8], b: &[u8]) -> c_int { - let a = decode_timestamp(a); - let b = decode_timestamp(b); - match a.cmp(&b) { - Ordering::Less => -1, - Ordering::Equal => 0, - Ordering::Greater => 1, +impl ComparatorCallback { + pub unsafe extern "C" fn destructor_callback(raw_cb: *mut c_void) { + drop(Box::from_raw(raw_cb as *mut Self)); } -} -pub struct ComparatorCallback { - pub name: CString, - pub f: Box, -} + pub unsafe extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char { + let cb: &mut Self = &mut *(raw_cb as *mut Self); + let ptr = cb.name.as_ptr(); + ptr as *const c_char + } -pub unsafe extern "C" fn destructor_callback(raw_cb: *mut c_void) { - drop(Box::from_raw(raw_cb as *mut ComparatorCallback)); + pub unsafe extern "C" fn compare_callback( + raw_cb: *mut c_void, + a_raw: *const c_char, + a_len: size_t, + b_raw: *const c_char, + b_len: size_t, + ) -> c_int { + let cb: &mut Self = &mut *(raw_cb as *mut Self); + let a: &[u8] = slice::from_raw_parts(a_raw as *const u8, a_len); + let b: &[u8] = slice::from_raw_parts(b_raw as *const u8, b_len); + (cb.compare_fn)(a, b) as c_int + } } -pub unsafe extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char { - let cb: &mut ComparatorCallback = &mut *(raw_cb as *mut ComparatorCallback); - let ptr = cb.name.as_ptr(); - ptr as *const c_char +pub struct ComparatorWithTsCallback { + pub name: CString, + pub compare_fn: Box, + pub compare_ts_fn: Box, + pub compare_without_ts_fn: Box, } -pub unsafe extern "C" fn compare_callback( - raw_cb: *mut c_void, - a_raw: *const c_char, - a_len: size_t, - b_raw: *const c_char, - b_len: size_t, -) -> c_int { - let cb: &mut ComparatorCallback = &mut *(raw_cb as *mut ComparatorCallback); - let a: &[u8] = slice::from_raw_parts(a_raw as *const u8, a_len); - let b: &[u8] = slice::from_raw_parts(b_raw as *const u8, b_len); - match (cb.f)(a, b) { - Ordering::Less => -1, - Ordering::Equal => 0, - Ordering::Greater => 1, +impl ComparatorWithTsCallback { + pub unsafe extern "C" fn destructor_callback(raw_cb: *mut c_void) { + drop(Box::from_raw(raw_cb as *mut Self)); } -} -/// For two events e1 and e2 whose timestamps are t1 and t2 respectively, -/// Returns value: -/// < 0 iff t1 < t2 -/// == 0 iff t1 == t2 -/// > 0 iff t1 > t2 -/// Note that an all-zero byte array will be the smallest (oldest) timestamp -/// of the same length, and a byte array with all bits 1 will be the largest. -pub unsafe extern "C" fn compare_ts_callback( - raw_cb: *mut c_void, - a_ts: *const c_char, - a_ts_len: size_t, - b_ts: *const c_char, - b_ts_len: size_t, -) -> c_int { - let _: &mut ComparatorCallback = &mut *(raw_cb as *mut ComparatorCallback); - assert_eq!(a_ts_len, size_of::()); - assert_eq!(b_ts_len, size_of::()); - let a: &[u8] = slice::from_raw_parts(a_ts as *const u8, a_ts_len); - let b: &[u8] = slice::from_raw_parts(b_ts as *const u8, b_ts_len); - compare_ts(a, b) -} + pub unsafe extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char { + let cb: &mut Self = &mut *(raw_cb as *mut Self); + let ptr = cb.name.as_ptr(); + ptr as *const c_char + } -/// Three-way comparison. Returns value: -/// < 0 iff "a" < "b", -/// == 0 iff "a" == "b", -/// > 0 iff "a" > "b" -/// Note this callback also compares timestamp. -/// For the same user key with different timestamps, larger (newer) -/// timestamp comes first. -pub unsafe extern "C" fn compare_with_ts_callback( - raw_cb: *mut c_void, - a_raw: *const c_char, - a_len: size_t, - b_raw: *const c_char, - b_len: size_t, -) -> c_int { - let cb: &mut ComparatorCallback = &mut *(raw_cb as *mut ComparatorCallback); - let a: &[u8] = slice::from_raw_parts(a_raw as *const u8, a_len); - let b: &[u8] = slice::from_raw_parts(b_raw as *const u8, b_len); - let ts_sz = size_of::(); - let a_key = strip_timestamp_from_user_key(a, ts_sz); - let b_key = strip_timestamp_from_user_key(b, ts_sz); + pub unsafe extern "C" fn compare_callback( + raw_cb: *mut c_void, + a_raw: *const c_char, + a_len: size_t, + b_raw: *const c_char, + b_len: size_t, + ) -> c_int { + let cb: &mut Self = &mut *(raw_cb as *mut Self); + let a: &[u8] = slice::from_raw_parts(a_raw as *const u8, a_len); + let b: &[u8] = slice::from_raw_parts(b_raw as *const u8, b_len); + (cb.compare_fn)(a, b) as c_int + } - let res = match (cb.f)(a_key, b_key) { - Ordering::Less => -1, - Ordering::Equal => 0, - Ordering::Greater => 1, - }; - if res != 0 { - return res; + pub unsafe extern "C" fn compare_ts_callback( + raw_cb: *mut c_void, + a_ts_raw: *const c_char, + a_ts_len: size_t, + b_ts_raw: *const c_char, + b_ts_len: size_t, + ) -> c_int { + let cb: &mut Self = &mut *(raw_cb as *mut Self); + let a_ts: &[u8] = slice::from_raw_parts(a_ts_raw as *const u8, a_ts_len); + let b_ts: &[u8] = slice::from_raw_parts(b_ts_raw as *const u8, b_ts_len); + (cb.compare_ts_fn)(a_ts, b_ts) as c_int } - let a_ts = extract_timestamp_from_user_key(a, ts_sz); - let b_ts = extract_timestamp_from_user_key(b, ts_sz); - -compare_ts(a_ts, b_ts) -} -/// Three-way comparison. Returns value: -/// < 0 iff "a" < "b", -/// == 0 iff "a" == "b", -/// > 0 iff "a" > "b" -/// Note this callback ignores timestamp during comparison. -pub unsafe extern "C" fn compare_without_ts_callback( - raw_cb: *mut c_void, - a_raw: *const c_char, - a_len: size_t, - a_has_ts: c_uchar, - b_raw: *const c_char, - b_len: size_t, - b_has_ts: c_uchar, -) -> c_int { - let cb: &mut ComparatorCallback = &mut *(raw_cb as *mut ComparatorCallback); - let a: &[u8] = slice::from_raw_parts(a_raw as *const u8, a_len); - let b: &[u8] = slice::from_raw_parts(b_raw as *const u8, b_len); - let ts_sz = size_of::(); - let a_has_ts = a_has_ts != 0; - let b_has_ts = b_has_ts != 0; - assert!(!a_has_ts || a.len() >= ts_sz); - assert!(!b_has_ts || b.len() >= ts_sz); - let lhs = if a_has_ts { - strip_timestamp_from_user_key(a, ts_sz) - } else { - a - }; - let rhs = if b_has_ts { - strip_timestamp_from_user_key(b, ts_sz) - } else { - b - }; - match (cb.f)(lhs, rhs) { - Ordering::Less => -1, - Ordering::Equal => 0, - Ordering::Greater => 1, + pub unsafe extern "C" fn compare_without_ts_callback( + raw_cb: *mut c_void, + a_raw: *const c_char, + a_len: size_t, + a_has_ts_raw: c_uchar, + b_raw: *const c_char, + b_len: size_t, + b_has_ts_raw: c_uchar, + ) -> c_int { + let cb: &mut Self = &mut *(raw_cb as *mut Self); + let a: &[u8] = slice::from_raw_parts(a_raw as *const u8, a_len); + let a_has_ts = a_has_ts_raw != 0; + let b: &[u8] = slice::from_raw_parts(b_raw as *const u8, b_len); + let b_has_ts = b_has_ts_raw != 0; + (cb.compare_without_ts_fn)(a, a_has_ts, b, b_has_ts) as c_int } } diff --git a/src/db_options.rs b/src/db_options.rs index dc246139d..97c31a0c3 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::ffi::CStr; -use std::mem::size_of; use std::path::Path; use std::ptr::{null_mut, NonNull}; use std::slice; @@ -25,7 +24,9 @@ use crate::statistics::{Histogram, HistogramData, StatsLevel}; use crate::{ compaction_filter::{self, CompactionFilterCallback, CompactionFilterFn}, compaction_filter_factory::{self, CompactionFilterFactory}, - comparator::{self, ComparatorCallback, CompareFn}, + comparator::{ + ComparatorCallback, ComparatorWithTsCallback, CompareFn, CompareTsFn, CompareWithoutTsFn, + }, db::DBAccess, env::Env, ffi, @@ -342,6 +343,12 @@ pub struct BlockBasedOptions { pub struct ReadOptions { pub(crate) inner: *mut ffi::rocksdb_readoptions_t, + // The `ReadOptions` owns a copy of the timestamp and iteration bounds. + // This is necessary to ensure the pointers we pass over the FFI live as + // long as the `ReadOptions`. This way, when performing the read operation, + // the pointers are guaranteed to be valid. + timestamp: Option>, + iter_start_ts: Option>, iterate_upper_bound: Option>, iterate_lower_bound: Option>, } @@ -1562,15 +1569,15 @@ impl Options { pub fn set_comparator(&mut self, name: impl CStrLike, compare_fn: Box) { let cb = Box::new(ComparatorCallback { name: name.into_c_string().unwrap(), - f: compare_fn, + compare_fn, }); unsafe { let cmp = ffi::rocksdb_comparator_create( Box::into_raw(cb).cast::(), - Some(comparator::destructor_callback), - Some(comparator::compare_callback), - Some(comparator::name_callback), + Some(ComparatorCallback::destructor_callback), + Some(ComparatorCallback::compare_callback), + Some(ComparatorCallback::name_callback), ); ffi::rocksdb_options_set_comparator(self.inner, cmp); } @@ -1583,21 +1590,30 @@ impl Options { /// The client must ensure that the comparator supplied here has the same /// name and orders keys *exactly* the same as the comparator provided to /// previous open calls on the same DB. - pub fn set_comparator_with_ts(&mut self, name: impl CStrLike, compare_fn: Box) { - let cb = Box::new(ComparatorCallback { + pub fn set_comparator_with_ts( + &mut self, + name: impl CStrLike, + timestamp_size: usize, + compare_fn: Box, + compare_ts_fn: Box, + compare_without_ts_fn: Box, + ) { + let cb = Box::new(ComparatorWithTsCallback { name: name.into_c_string().unwrap(), - f: compare_fn, + compare_fn, + compare_ts_fn, + compare_without_ts_fn, }); - let ts_size = size_of::(); + unsafe { let cmp = ffi::rocksdb_comparator_with_ts_create( Box::into_raw(cb).cast::(), - Some(comparator::destructor_callback), - Some(comparator::compare_with_ts_callback), - Some(comparator::compare_ts_callback), - Some(comparator::compare_without_ts_callback), - Some(comparator::name_callback), - ts_size, + Some(ComparatorWithTsCallback::destructor_callback), + Some(ComparatorWithTsCallback::compare_callback), + Some(ComparatorWithTsCallback::compare_ts_callback), + Some(ComparatorWithTsCallback::compare_without_ts_callback), + Some(ComparatorWithTsCallback::name_callback), + timestamp_size, ); ffi::rocksdb_options_set_comparator(self.inner, cmp); } @@ -3872,26 +3888,43 @@ impl ReadOptions { /// only the most recent version visible to timestamp is returned. /// The user-specified timestamp feature is still under active development, /// and the API is subject to change. - pub fn set_timestamp>(&mut self, ts: S) { - let ts = ts.as_ref(); + pub fn set_timestamp>>(&mut self, ts: S) { + self.set_timestamp_impl(Some(ts.into())); + } + + fn set_timestamp_impl(&mut self, ts: Option>) { + let (ptr, len) = if let Some(ref ts) = ts { + (ts.as_ptr() as *const c_char, ts.len()) + } else if self.timestamp.is_some() { + // The stored timestamp is a `Some` but we're updating it to a `None`. + // This means to cancel a previously set timestamp. + // To do this, use a null pointer and zero length. + (std::ptr::null(), 0) + } else { + return; + }; + self.timestamp = ts; unsafe { - ffi::rocksdb_readoptions_set_timestamp( - self.inner, - ts.as_ptr() as *const c_char, - ts.len() as size_t, - ); + ffi::rocksdb_readoptions_set_timestamp(self.inner, ptr, len); } } /// See `set_timestamp` - pub fn set_iter_start_ts>(&mut self, ts: S) { - let ts = ts.as_ref(); + pub fn set_iter_start_ts>>(&mut self, ts: S) { + self.set_iter_start_ts_impl(Some(ts.into())); + } + + fn set_iter_start_ts_impl(&mut self, ts: Option>) { + let (ptr, len) = if let Some(ref ts) = ts { + (ts.as_ptr() as *const c_char, ts.len()) + } else if self.timestamp.is_some() { + (std::ptr::null(), 0) + } else { + return; + }; + self.iter_start_ts = ts; unsafe { - ffi::rocksdb_readoptions_set_iter_start_ts( - self.inner, - ts.as_ptr() as *const c_char, - ts.len() as size_t, - ); + ffi::rocksdb_readoptions_set_iter_start_ts(self.inner, ptr, len); } } } @@ -3901,6 +3934,8 @@ impl Default for ReadOptions { unsafe { Self { inner: ffi::rocksdb_readoptions_create(), + timestamp: None, + iter_start_ts: None, iterate_upper_bound: None, iterate_lower_bound: None, } @@ -4258,6 +4293,7 @@ pub enum BottommostLevelCompaction { pub struct CompactOptions { pub(crate) inner: *mut ffi::rocksdb_compactoptions_t, + full_history_ts_low: Option>, } impl Default for CompactOptions { @@ -4265,7 +4301,10 @@ impl Default for CompactOptions { let opts = unsafe { ffi::rocksdb_compactoptions_create() }; assert!(!opts.is_null(), "Could not create RocksDB Compact Options"); - Self { inner: opts } + Self { + inner: opts, + full_history_ts_low: None, + } } } @@ -4317,14 +4356,21 @@ impl CompactOptions { /// Set user-defined timestamp low bound, the data with older timestamp than /// low bound maybe GCed by compaction. Default: nullptr - pub fn set_full_history_ts_low>(&mut self, ts: S) { - let ts = ts.as_ref(); + pub fn set_full_history_ts_low>>(&mut self, ts: S) { + self.set_full_history_ts_low_impl(Some(ts.into())); + } + + fn set_full_history_ts_low_impl(&mut self, ts: Option>) { + let (ptr, len) = if let Some(ref ts) = ts { + (ts.as_ptr() as *mut c_char, ts.len()) + } else if self.full_history_ts_low.is_some() { + (std::ptr::null::>() as *mut c_char, 0) + } else { + return; + }; + self.full_history_ts_low = ts; unsafe { - ffi::rocksdb_compactoptions_set_full_history_ts_low( - self.inner, - ts.as_ptr() as *mut c_char, - ts.len() as size_t, - ); + ffi::rocksdb_compactoptions_set_full_history_ts_low(self.inner, ptr, len); } } } diff --git a/tests/test_comparator.rs b/tests/test_comparator.rs index 001effc69..ad9410ffb 100644 --- a/tests/test_comparator.rs +++ b/tests/test_comparator.rs @@ -1,6 +1,9 @@ +mod util; + use rocksdb::{CompactOptions, Options, ReadOptions, DB}; use std::cmp::Ordering; use std::iter::FromIterator; +use util::{U64Comparator, U64Timestamp}; /// This function is for ensuring test of backwards compatibility pub fn rocks_old_compare(one: &[u8], two: &[u8]) -> Ordering { @@ -72,11 +75,6 @@ fn test_comparator() { assert_eq!(vec!["b-key", "a-key"], res_closure_reverse); } -#[inline] -pub fn encode_timestamp(ts: u64) -> [u8; 8] { - ts.to_be_bytes() -} - #[test] fn test_comparator_with_ts() { let path = "_path_for_rocksdb_storage_with_ts"; @@ -86,17 +84,22 @@ fn test_comparator_with_ts() { let mut db_opts = Options::default(); db_opts.create_missing_column_families(true); db_opts.create_if_missing(true); - let compare_fn = move |one: &[u8], two: &[u8]| one.cmp(two); - db_opts.set_comparator_with_ts("cname", Box::new(compare_fn)); + db_opts.set_comparator_with_ts( + U64Comparator::NAME, + U64Timestamp::SIZE, + Box::new(U64Comparator::compare), + Box::new(U64Comparator::compare_ts), + Box::new(U64Comparator::compare_without_ts), + ); let db = DB::open(&db_opts, path).unwrap(); let key = b"hello"; let val1 = b"world0"; let val2 = b"world1"; - let ts = &encode_timestamp(1); - let ts2 = &encode_timestamp(2); - let ts3 = &encode_timestamp(3); + let ts = U64Timestamp::new(1); + let ts2 = U64Timestamp::new(2); + let ts3 = U64Timestamp::new(3); let mut opts = ReadOptions::default(); opts.set_timestamp(ts); @@ -171,8 +174,13 @@ fn test_comparator_with_column_family_with_ts() { db_opts.create_if_missing(true); let mut cf_opts = Options::default(); - let compare_fn = move |one: &[u8], two: &[u8]| one.cmp(two); - cf_opts.set_comparator_with_ts("cname", Box::new(compare_fn)); + cf_opts.set_comparator_with_ts( + U64Comparator::NAME, + U64Timestamp::SIZE, + Box::new(U64Comparator::compare), + Box::new(U64Comparator::compare_ts), + Box::new(U64Comparator::compare_without_ts), + ); let cfs = vec![("cf", cf_opts)]; @@ -183,9 +191,9 @@ fn test_comparator_with_column_family_with_ts() { let val1 = b"world0"; let val2 = b"world1"; - let ts = &encode_timestamp(1); - let ts2 = &encode_timestamp(2); - let ts3 = &encode_timestamp(3); + let ts = U64Timestamp::new(1); + let ts2 = U64Timestamp::new(2); + let ts3 = U64Timestamp::new(3); let mut opts = ReadOptions::default(); opts.set_timestamp(ts); @@ -237,6 +245,11 @@ fn test_comparator_with_column_family_with_ts() { db.compact_range_cf_opt(&cf, None::<&[u8]>, None::<&[u8]>, &compact_opts); db.flush().unwrap(); + // Attempt to read `full_history_ts_low`. + // It should match the value we set earlier (`ts2`). + let full_history_ts_low = db.get_full_history_ts_low(&cf).unwrap(); + assert_eq!(U64Timestamp::from(full_history_ts_low.as_slice()), ts2); + let mut opts = ReadOptions::default(); opts.set_timestamp(ts3); let value = db.get_cf_opt(&cf, key, &opts).unwrap(); diff --git a/tests/test_db.rs b/tests/test_db.rs index 10e836b83..6af67fd21 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -28,7 +28,7 @@ use rocksdb::{ UniversalCompactOptions, UniversalCompactionStopStyle, WaitForCompactOptions, WriteBatch, DB, DEFAULT_COLUMN_FAMILY_NAME, }; -use util::{assert_iter, pair, DBPath}; +use util::{assert_iter, pair, DBPath, U64Comparator, U64Timestamp}; #[test] fn external() { @@ -1581,18 +1581,23 @@ fn test_full_history_ts_low() { opts.create_missing_column_families(true); let mut cf_opts = Options::default(); - let compare_fn = move |one: &[u8], two: &[u8]| one.cmp(two); - cf_opts.set_comparator_with_ts("cname", Box::new(compare_fn)); + cf_opts.set_comparator_with_ts( + U64Comparator::NAME, + U64Timestamp::SIZE, + Box::new(U64Comparator::compare), + Box::new(U64Comparator::compare_ts), + Box::new(U64Comparator::compare_without_ts), + ); let cfs = vec![("cf", cf_opts)]; let db = DB::open_cf_with_opts(&opts, &path, cfs).unwrap(); let cf = db.cf_handle("cf").unwrap(); - let ts = &1u64.to_be_bytes(); + let ts = U64Timestamp::new(1); db.increase_full_history_ts_low(&cf, ts).unwrap(); - let ret = db.get_full_history_ts_low(&cf); - assert_eq!(ts, ret.unwrap().as_slice()); + let ret = U64Timestamp::from(db.get_full_history_ts_low(&cf).unwrap().as_slice()); + assert_eq!(ts, ret); let _ = DB::destroy(&Options::default(), &path); } diff --git a/tests/test_sst_file_writer.rs b/tests/test_sst_file_writer.rs index 39a7f9c07..7f939d66a 100644 --- a/tests/test_sst_file_writer.rs +++ b/tests/test_sst_file_writer.rs @@ -17,7 +17,7 @@ mod util; use pretty_assertions::assert_eq; use rocksdb::{Error, Options, ReadOptions, SstFileWriter, DB}; -use util::DBPath; +use util::{DBPath, U64Comparator, U64Timestamp}; #[test] fn sst_file_writer_works() { @@ -51,11 +51,6 @@ fn sst_file_writer_works() { } } -#[inline] -pub fn encode_timestamp(ts: u64) -> [u8; 8] { - ts.to_be_bytes() -} - #[test] fn sst_file_writer_with_ts_works() { let db_path = DBPath::new("_rust_rocksdb_sstfilewritertest_with_ts"); @@ -64,22 +59,27 @@ fn sst_file_writer_with_ts_works() { .tempdir() .expect("Failed to create temporary path for file writer."); let writer_path = dir.path().join("filewriter"); - let compare_fn = move |one: &[u8], two: &[u8]| one.cmp(two); - let ts = &encode_timestamp(1); - let ts2 = &encode_timestamp(2); - let ts3 = &encode_timestamp(3); + let ts = U64Timestamp::new(1); + let ts2 = U64Timestamp::new(2); + let ts3 = U64Timestamp::new(3); { let mut opts = Options::default(); - opts.set_comparator_with_ts("cname", Box::new(compare_fn)); + opts.set_comparator_with_ts( + U64Comparator::NAME, + U64Timestamp::SIZE, + Box::new(U64Comparator::compare), + Box::new(U64Comparator::compare_ts), + Box::new(U64Comparator::compare_without_ts), + ); + let mut writer = SstFileWriter::create(&opts); writer.open(&writer_path).unwrap(); writer.put_with_ts(b"k1", ts, b"v1").unwrap(); - writer.put_with_ts(b"k2", ts2, b"v2").unwrap(); - writer.put_with_ts(b"k3", ts2, b"v3").unwrap(); writer.finish().unwrap(); + assert!(writer.file_size() > 0); } @@ -89,7 +89,13 @@ fn sst_file_writer_with_ts_works() { let mut db_opts = Options::default(); db_opts.create_missing_column_families(true); db_opts.create_if_missing(true); - db_opts.set_comparator_with_ts("cname", Box::new(compare_fn)); + db_opts.set_comparator_with_ts( + U64Comparator::NAME, + U64Timestamp::SIZE, + Box::new(U64Comparator::compare), + Box::new(U64Comparator::compare_ts), + Box::new(U64Comparator::compare_without_ts), + ); let db = DB::open(&db_opts, &db_path).unwrap(); db.ingest_external_file(vec![&writer_path]).unwrap(); diff --git a/tests/util/mod.rs b/tests/util/mod.rs index 6a077e8a5..0e0302827 100644 --- a/tests/util/mod.rs +++ b/tests/util/mod.rs @@ -1,6 +1,10 @@ #![allow(dead_code)] -use std::path::{Path, PathBuf}; +use std::{ + cmp::Ordering, + convert::TryInto, + path::{Path, PathBuf}, +}; use rocksdb::{Error, Options, DB}; @@ -58,3 +62,115 @@ pub fn assert_iter_reversed(iter: impl Iterator>, wan got.reverse(); assert_eq!(got.as_slice(), want); } + +/// A timestamp type we use in testing [user-defined timestamp](https://github.com/facebook/rocksdb/wiki/User-defined-Timestamp). +/// This is a `u64` in little endian encoding. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct U64Timestamp([u8; Self::SIZE]); + +impl U64Timestamp { + pub const SIZE: usize = 8; + + pub fn new(ts: u64) -> Self { + Self(ts.to_le_bytes()) + } +} + +impl From<&[u8]> for U64Timestamp { + fn from(slice: &[u8]) -> Self { + assert_eq!( + slice.len(), + Self::SIZE, + "incorrect timestamp length: {}, should be {}", + slice.len(), + Self::SIZE + ); + Self(slice.try_into().unwrap()) + } +} + +impl From for Vec { + fn from(ts: U64Timestamp) -> Self { + ts.0.to_vec() + } +} + +impl AsRef<[u8]> for U64Timestamp { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl PartialOrd for U64Timestamp { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for U64Timestamp { + fn cmp(&self, other: &Self) -> Ordering { + let lhs = u64::from_le_bytes(self.0); + let rhs = u64::from_le_bytes(other.0); + lhs.cmp(&rhs) + } +} + +/// A comparator for use in column families with [user-defined timestamp](https://github.com/facebook/rocksdb/wiki/User-defined-Timestamp) +/// enabled. This comparator assumes `u64` timestamp in little endian encoding. +/// This is the same behavior as RocksDB's built-in comparator. +/// +/// Adapted from C++ and Golang implementations from: +/// - [rocksdb](https://github.com/facebook/rocksdb/blob/v9.4.0/test_util/testutil.cc#L112) +/// - [gorocksdb](https://github.com/linxGnu/grocksdb/blob/v1.9.2/db_ts_test.go#L167) +/// - [SeiDB](https://github.com/sei-protocol/sei-db/blob/v0.0.41/ss/rocksdb/comparator.go) +pub struct U64Comparator; + +impl U64Comparator { + pub const NAME: &'static str = "rust-rocksdb.U64Comparator"; + + pub fn compare(a: &[u8], b: &[u8]) -> Ordering { + // First, compare the keys without timestamps. If the keys are different, + // then we don't have to consider the timestamps at all. + let ord = Self::compare_without_ts(a, true, b, true); + if ord != Ordering::Equal { + return ord; + } + + // The keys are the same, so now we compare the timestamps. + // The larger (i.e. newer) key should come first, hence the `reverse`. + Self::compare_ts( + extract_timestamp_from_user_key(a), + extract_timestamp_from_user_key(b), + ) + .reverse() + } + + pub fn compare_ts(bz1: &[u8], bz2: &[u8]) -> Ordering { + let ts1 = U64Timestamp::from(bz1); + let ts2 = U64Timestamp::from(bz2); + ts1.cmp(&ts2) + } + + pub fn compare_without_ts( + mut a: &[u8], + a_has_ts: bool, + mut b: &[u8], + b_has_ts: bool, + ) -> Ordering { + if a_has_ts { + a = strip_timestamp_from_user_key(a); + } + if b_has_ts { + b = strip_timestamp_from_user_key(b); + } + a.cmp(b) + } +} + +fn extract_timestamp_from_user_key(key: &[u8]) -> &[u8] { + &key[(key.len() - U64Timestamp::SIZE)..] +} + +fn strip_timestamp_from_user_key(key: &[u8]) -> &[u8] { + &key[..(key.len() - U64Timestamp::SIZE)] +}