Skip to content

Commit

Permalink
Support user defined timestamp in rust bindings (rust-rocksdb#901)
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 authored Jul 15, 2024
1 parent 1c36046 commit b8f79d9
Show file tree
Hide file tree
Showing 10 changed files with 815 additions and 7 deletions.
127 changes: 126 additions & 1 deletion src/comparator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,43 @@
// limitations under the License.
//

use libc::{c_char, c_int, c_void, size_t};
use libc::{c_char, c_int, c_uchar, c_void, size_t};
use std::cmp::Ordering;
use std::convert::TryInto;
use std::ffi::CString;
use std::mem::size_of;
use std::slice;

pub type CompareFn = dyn Fn(&[u8], &[u8]) -> Ordering;

// Use u64 as the timestamp. This is based on two reasons:
// 1. Follows the logic of [BytewiseComparatorWithU64Ts](https://github.com/facebook/rocksdb/blob/3db030d7ee1b887ce818ec6f6a8d10949f9e9a22/util/comparator.cc#L238)
// 2. u64 is the return type of [Duration::as_secs()](https://doc.rust-lang.org/nightly/std/time/struct.Duration.html#method.as_secs)
fn strip_timestamp_from_user_key(user_key: &[u8], ts_sz: usize) -> &[u8] {
&user_key[..user_key.len() - ts_sz]
}

fn extract_timestamp_from_user_key(user_key: &[u8], ts_sz: usize) -> &[u8] {
&user_key[user_key.len() - ts_sz..]
}

// Caller should ensure the pointer is valid and has at least 8 bytes,
// As the slice::from_raw_parts does in compare_ts_callback
#[inline]
fn decode_timestamp(ptr: &[u8]) -> u64 {
u64::from_be_bytes(ptr[..8].try_into().unwrap())
}

fn compare_ts(a: &[u8], b: &[u8]) -> c_int {
let a = decode_timestamp(a);
let b = decode_timestamp(b);
match a.cmp(&b) {
Ordering::Less => -1,
Ordering::Equal => 0,
Ordering::Greater => 1,
}
}

pub struct ComparatorCallback {
pub name: CString,
pub f: Box<CompareFn>,
Expand Down Expand Up @@ -51,3 +81,98 @@ pub unsafe extern "C" fn compare_callback(
Ordering::Greater => 1,
}
}

/// For two events e1 and e2 whose timestamps are t1 and t2 respectively,
/// Returns value:
/// < 0 iff t1 < t2
/// == 0 iff t1 == t2
/// > 0 iff t1 > t2
/// Note that an all-zero byte array will be the smallest (oldest) timestamp
/// of the same length, and a byte array with all bits 1 will be the largest.
pub unsafe extern "C" fn compare_ts_callback(
raw_cb: *mut c_void,
a_ts: *const c_char,
a_ts_len: size_t,
b_ts: *const c_char,
b_ts_len: size_t,
) -> c_int {
let _: &mut ComparatorCallback = &mut *(raw_cb as *mut ComparatorCallback);
assert_eq!(a_ts_len, size_of::<u64>());
assert_eq!(b_ts_len, size_of::<u64>());
let a: &[u8] = slice::from_raw_parts(a_ts as *const u8, a_ts_len);
let b: &[u8] = slice::from_raw_parts(b_ts as *const u8, b_ts_len);
compare_ts(a, b)
}

/// Three-way comparison. Returns value:
/// < 0 iff "a" < "b",
/// == 0 iff "a" == "b",
/// > 0 iff "a" > "b"
/// Note this callback also compares timestamp.
/// For the same user key with different timestamps, larger (newer)
/// timestamp comes first.
pub unsafe extern "C" fn compare_with_ts_callback(
raw_cb: *mut c_void,
a_raw: *const c_char,
a_len: size_t,
b_raw: *const c_char,
b_len: size_t,
) -> c_int {
let cb: &mut ComparatorCallback = &mut *(raw_cb as *mut ComparatorCallback);
let a: &[u8] = slice::from_raw_parts(a_raw as *const u8, a_len);
let b: &[u8] = slice::from_raw_parts(b_raw as *const u8, b_len);
let ts_sz = size_of::<u64>();
let a_key = strip_timestamp_from_user_key(a, ts_sz);
let b_key = strip_timestamp_from_user_key(b, ts_sz);

let res = match (cb.f)(a_key, b_key) {
Ordering::Less => -1,
Ordering::Equal => 0,
Ordering::Greater => 1,
};
if res != 0 {
return res;
}
let a_ts = extract_timestamp_from_user_key(a, ts_sz);
let b_ts = extract_timestamp_from_user_key(b, ts_sz);
-compare_ts(a_ts, b_ts)
}

/// Three-way comparison. Returns value:
/// < 0 iff "a" < "b",
/// == 0 iff "a" == "b",
/// > 0 iff "a" > "b"
/// Note this callback ignores timestamp during comparison.
pub unsafe extern "C" fn compare_without_ts_callback(
raw_cb: *mut c_void,
a_raw: *const c_char,
a_len: size_t,
a_has_ts: c_uchar,
b_raw: *const c_char,
b_len: size_t,
b_has_ts: c_uchar,
) -> c_int {
let cb: &mut ComparatorCallback = &mut *(raw_cb as *mut ComparatorCallback);
let a: &[u8] = slice::from_raw_parts(a_raw as *const u8, a_len);
let b: &[u8] = slice::from_raw_parts(b_raw as *const u8, b_len);
let ts_sz = size_of::<u64>();
let a_has_ts = a_has_ts != 0;
let b_has_ts = b_has_ts != 0;
assert!(!a_has_ts || a.len() >= ts_sz);
assert!(!b_has_ts || b.len() >= ts_sz);
let lhs = if a_has_ts {
strip_timestamp_from_user_key(a, ts_sz)
} else {
a
};
let rhs = if b_has_ts {
strip_timestamp_from_user_key(b, ts_sz)
} else {
b
};
match (cb.f)(lhs, rhs) {
Ordering::Less => -1,
Ordering::Equal => 0,
Ordering::Greater => 1,
}
}
Loading

0 comments on commit b8f79d9

Please sign in to comment.