Skip to content

Commit

Permalink
Support RocksDB 7.x BackupEngineOptions (rust-rocksdb#700)
Browse files Browse the repository at this point in the history
  • Loading branch information
exabytes18 authored Feb 9, 2023
1 parent 3805d1f commit 2823760
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 158 deletions.
74 changes: 53 additions & 21 deletions src/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
// limitations under the License.
//

use crate::env::Env;
use crate::{db::DBInner, ffi, ffi_util::to_cpath, DBCommon, Error, ThreadMode};

use libc::{c_int, c_uchar};
use libc::c_uchar;
use std::ffi::CString;
use std::path::Path;

/// Represents information of a backup including timestamp of the backup
Expand All @@ -35,31 +37,36 @@ pub struct BackupEngineInfo {

pub struct BackupEngine {
inner: *mut ffi::rocksdb_backup_engine_t,
_outlive: Env,
}

pub struct BackupEngineOptions {
inner: *mut ffi::rocksdb_options_t,
inner: *mut ffi::rocksdb_backup_engine_options_t,
}

pub struct RestoreOptions {
inner: *mut ffi::rocksdb_restore_options_t,
}

impl BackupEngine {
/// Open a backup engine with the specified options.
pub fn open<P: AsRef<Path>>(opts: &BackupEngineOptions, path: P) -> Result<Self, Error> {
let cpath = to_cpath(path)?;

/// Open a backup engine with the specified options and RocksDB Env.
pub fn open(opts: &BackupEngineOptions, env: &Env) -> Result<Self, Error> {
let be: *mut ffi::rocksdb_backup_engine_t;
unsafe {
be = ffi_try!(ffi::rocksdb_backup_engine_open(opts.inner, cpath.as_ptr()));
be = ffi_try!(ffi::rocksdb_backup_engine_open_opts(
opts.inner,
env.0.inner
));
}

if be.is_null() {
return Err(Error::new("Could not initialize backup engine.".to_owned()));
}

Ok(Self { inner: be })
Ok(Self {
inner: be,
_outlive: env.clone(),
})
}

/// Captures the state of the database in the latest backup.
Expand Down Expand Up @@ -217,27 +224,52 @@ impl BackupEngine {
}

impl BackupEngineOptions {
//
}
/// Initializes BackupEngineOptions with the directory to be used for storing/accessing the
/// backup files.
pub fn new<P: AsRef<Path>>(backup_dir: P) -> Result<Self, Error> {
let backup_dir = backup_dir.as_ref();
let c_backup_dir = if let Ok(c) = CString::new(backup_dir.to_string_lossy().as_bytes()) {
c
} else {
return Err(Error::new(
"Failed to convert backup_dir to CString \
when constructing BackupEngineOptions"
.to_owned(),
));
};

impl RestoreOptions {
pub fn set_keep_log_files(&mut self, keep_log_files: bool) {
unsafe {
ffi::rocksdb_restore_options_set_keep_log_files(
let opts = ffi::rocksdb_backup_engine_options_create(c_backup_dir.as_ptr());
assert!(!opts.is_null(), "Could not create RocksDB backup options");

Ok(Self { inner: opts })
}
}

/// Sets the number of operations (such as file copies or file checksums) that RocksDB may
/// perform in parallel when executing a backup or restore.
///
/// Default: 1
pub fn set_max_background_operations(&mut self, max_background_operations: i32) {
unsafe {
ffi::rocksdb_backup_engine_options_set_max_background_operations(
self.inner,
c_int::from(keep_log_files),
max_background_operations,
);
}
}
}

impl Default for BackupEngineOptions {
fn default() -> Self {
impl RestoreOptions {
/// Sets `keep_log_files`. If true, restore won't overwrite the existing log files in wal_dir.
/// It will also move all log files from archive directory to wal_dir. Use this option in
/// combination with BackupEngineOptions::backup_log_files = false for persisting in-memory
/// databases.
///
/// Default: false
pub fn set_keep_log_files(&mut self, keep_log_files: bool) {
unsafe {
let opts = ffi::rocksdb_options_create();
assert!(!opts.is_null(), "Could not create RocksDB backup options");

Self { inner: opts }
ffi::rocksdb_restore_options_set_keep_log_files(self.inner, i32::from(keep_log_files));
}
}
}
Expand All @@ -264,7 +296,7 @@ impl Drop for BackupEngine {
impl Drop for BackupEngineOptions {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_options_destroy(self.inner);
ffi::rocksdb_backup_engine_options_destroy(self.inner);
}
}
}
Expand Down
124 changes: 1 addition & 123 deletions src/db_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{
compaction_filter_factory::{self, CompactionFilterFactory},
comparator::{self, ComparatorCallback, CompareFn},
db::DBAccess,
env::Env,
ffi,
ffi_util::{from_cstr, to_cpath, CStrLike},
merge_operator::{
Expand Down Expand Up @@ -82,127 +83,6 @@ impl Cache {
}
}

/// An Env is an interface used by the rocksdb implementation to access
/// operating system functionality like the filesystem etc. Callers
/// may wish to provide a custom Env object when opening a database to
/// get fine gain control; e.g., to rate limit file system operations.
///
/// All Env implementations are safe for concurrent access from
/// multiple threads without any external synchronization.
///
/// Note: currently, C API behinds C++ API for various settings.
/// See also: `rocksdb/include/env.h`
#[derive(Clone)]
pub struct Env(Arc<EnvWrapper>);

pub(crate) struct EnvWrapper {
inner: *mut ffi::rocksdb_env_t,
}

impl Drop for EnvWrapper {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_env_destroy(self.inner);
}
}
}

impl Env {
/// Returns default env
pub fn new() -> Result<Self, Error> {
let env = unsafe { ffi::rocksdb_create_default_env() };
if env.is_null() {
Err(Error::new("Could not create mem env".to_owned()))
} else {
Ok(Self(Arc::new(EnvWrapper { inner: env })))
}
}

/// Returns a new environment that stores its data in memory and delegates
/// all non-file-storage tasks to base_env.
pub fn mem_env() -> Result<Self, Error> {
let env = unsafe { ffi::rocksdb_create_mem_env() };
if env.is_null() {
Err(Error::new("Could not create mem env".to_owned()))
} else {
Ok(Self(Arc::new(EnvWrapper { inner: env })))
}
}

/// Sets the number of background worker threads of a specific thread pool for this environment.
/// `LOW` is the default pool.
///
/// Default: 1
pub fn set_background_threads(&mut self, num_threads: c_int) {
unsafe {
ffi::rocksdb_env_set_background_threads(self.0.inner, num_threads);
}
}

/// Sets the size of the high priority thread pool that can be used to
/// prevent compactions from stalling memtable flushes.
pub fn set_high_priority_background_threads(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_env_set_high_priority_background_threads(self.0.inner, n);
}
}

/// Sets the size of the low priority thread pool that can be used to
/// prevent compactions from stalling memtable flushes.
pub fn set_low_priority_background_threads(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_env_set_low_priority_background_threads(self.0.inner, n);
}
}

/// Sets the size of the bottom priority thread pool that can be used to
/// prevent compactions from stalling memtable flushes.
pub fn set_bottom_priority_background_threads(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_env_set_bottom_priority_background_threads(self.0.inner, n);
}
}

/// Wait for all threads started by StartThread to terminate.
pub fn join_all_threads(&mut self) {
unsafe {
ffi::rocksdb_env_join_all_threads(self.0.inner);
}
}

/// Lowering IO priority for threads from the specified pool.
pub fn lower_thread_pool_io_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_thread_pool_io_priority(self.0.inner);
}
}

/// Lowering IO priority for high priority thread pool.
pub fn lower_high_priority_thread_pool_io_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_high_priority_thread_pool_io_priority(self.0.inner);
}
}

/// Lowering CPU priority for threads from the specified pool.
pub fn lower_thread_pool_cpu_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_thread_pool_cpu_priority(self.0.inner);
}
}

/// Lowering CPU priority for high priority thread pool.
pub fn lower_high_priority_thread_pool_cpu_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_high_priority_thread_pool_cpu_priority(self.0.inner);
}
}

fn clone(&self) -> Self {
Self(self.0.clone())
}
}

#[derive(Default)]
pub(crate) struct OptionsMustOutliveDB {
env: Option<Env>,
Expand Down Expand Up @@ -385,7 +265,6 @@ unsafe impl Send for CuckooTableOptions {}
unsafe impl Send for ReadOptions {}
unsafe impl Send for IngestExternalFileOptions {}
unsafe impl Send for CacheWrapper {}
unsafe impl Send for EnvWrapper {}

// Sync is similarly safe for many types because they do not expose interior mutability, and their
// use within the rocksdb library is generally behind a const reference
Expand All @@ -396,7 +275,6 @@ unsafe impl Sync for CuckooTableOptions {}
unsafe impl Sync for ReadOptions {}
unsafe impl Sync for IngestExternalFileOptions {}
unsafe impl Sync for CacheWrapper {}
unsafe impl Sync for EnvWrapper {}

impl Drop for Options {
fn drop(&mut self) {
Expand Down
Loading

0 comments on commit 2823760

Please sign in to comment.