Skip to content

Commit

Permalink
sst_importer: download SST from external storage + key rewrite (tikv#…
Browse files Browse the repository at this point in the history
…5581)

* sst_importer: download SST from external storage + key rewrite

Signed-off-by: kennytm <[email protected]>

* sst_importer: update kvproto; respect required range; add some logs

Signed-off-by: kennytm <[email protected]>

* sst_importer: fix clippy warning

Signed-off-by: kennytm <[email protected]>

* sst_importer: do check invalid prefix; use data_key for SST range

Signed-off-by: kennytm <[email protected]>

* sst_importer: use origin key for rewrite rules

Signed-off-by: kennytm <[email protected]>

* sst_importer: fix clippy warning

Signed-off-by: kennytm <[email protected]>

* sst_importer: make download range use post-rewrite keys; return new range

Signed-off-by: kennytm <[email protected]>
  • Loading branch information
kennytm authored and overvenus committed Oct 21, 2019
1 parent 64ede94 commit eaeb39a
Show file tree
Hide file tree
Showing 10 changed files with 842 additions and 16 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/engine/src/rocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

mod db;
mod sst;
pub use sst::{SstWriter, SstWriterBuilder};
pub use sst::{SstReader, SstWriter, SstWriterBuilder};

pub mod util;

Expand Down
27 changes: 25 additions & 2 deletions components/engine/src/rocks/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
use std::sync::Arc;

use super::util::get_fastest_supported_compression_type;
use super::{ColumnFamilyOptions, DBCompressionType, Env, EnvOptions, ExternalSstFileInfo, DB};
use super::{
ColumnFamilyOptions, DBCompressionType, DBIterator, Env, EnvOptions, ExternalSstFileInfo, DB,
};
use crate::{CfName, CF_DEFAULT};
use rocksdb::SstFileWriter;
use rocksdb::{SstFileReader, SstFileWriter};

/// A builder builds a SstWriter.
pub struct SstWriterBuilder {
Expand Down Expand Up @@ -134,6 +136,27 @@ impl SstWriter {
}
}

/// SstReader is used to read an SST file.
pub struct SstReader {
reader: SstFileReader,
}

impl SstReader {
pub fn open(path: &str) -> Result<Self, String> {
let mut reader = SstFileReader::new(ColumnFamilyOptions::new());
reader.open(path)?;
Ok(SstReader { reader })
}

pub fn verify_checksum(&self) -> Result<(), String> {
self.reader.verify_checksum()
}

pub fn iter(&self) -> DBIterator<&SstFileReader> {
self.reader.iter()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
40 changes: 40 additions & 0 deletions components/keys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use byteorder::{BigEndian, ByteOrder};
use kvproto::metapb::Region;
use std::mem;

pub mod rewrite;
mod types;

pub use types::{Key, KvPair, Value};
Expand Down Expand Up @@ -231,6 +232,45 @@ pub fn data_end_key(region_end_key: &[u8]) -> Vec<u8> {
}
}

pub fn origin_end_key(key: &[u8]) -> &[u8] {
if key == DATA_MAX_KEY {
b""
} else {
origin_key(key)
}
}

pub(crate) fn next_key_no_alloc(key: &[u8]) -> Option<(&[u8], u8)> {
let pos = key.iter().rposition(|b| *b != 0xff)?;
Some((&key[..pos], key[pos] + 1))
}

/// Computes the next key of the given key.
///
/// If the key has no successor key (e.g. the input is "\xff\xff"), the result
/// would be an empty vector.
///
/// # Examples
///
/// ```
/// assert_eq!(next_key(b"123"), b"124");
/// assert_eq!(next_key(b"12\xff"), b"13");
/// assert_eq!(next_key(b"\xff\xff"), b"");
/// assert_eq!(next_key(b"\xff\xfe"), b"\xff\xff");
/// assert_eq!(next_key(b"T"), b"U");
/// assert_eq!(next_key(b""), b"");
/// ```
pub fn next_key(key: &[u8]) -> Vec<u8> {
if let Some((s, e)) = next_key_no_alloc(key) {
let mut res = Vec::with_capacity(s.len() + 1);
res.extend_from_slice(s);
res.push(e);
res
} else {
Vec::new()
}
}

#[derive(Debug, Display, Fail)]
pub enum Error {
#[display(fmt = "{} is not a valid raft log key", "hex::encode_upper(_0)")]
Expand Down
103 changes: 103 additions & 0 deletions components/keys/src/rewrite.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

//! Key rewriting
/// An error indicating the key cannot be rewritten because it does not start
/// with the given prefix.
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct WrongPrefix;

/// Rewrites the prefix of a byte array.
pub fn rewrite_prefix(
old_prefix: &[u8],
new_prefix: &[u8],
src: &[u8],
) -> Result<Vec<u8>, WrongPrefix> {
if !src.starts_with(old_prefix) {
return Err(WrongPrefix);
}
let mut result = Vec::with_capacity(src.len() - old_prefix.len() + new_prefix.len());
result.extend_from_slice(new_prefix);
result.extend_from_slice(&src[old_prefix.len()..]);
Ok(result)
}

/// Rewrites the prefix of a byte array used as the end key.
///
/// Besides values supported by `rewrite_prefix`, if the src is exactly the
/// successor of `old_prefix`, this method will also return a Ok with the
/// successor of `new_prefix`.
pub fn rewrite_prefix_of_end_key(
old_prefix: &[u8],
new_prefix: &[u8],
src: &[u8],
) -> Result<Vec<u8>, WrongPrefix> {
if let dest @ Ok(_) = rewrite_prefix(old_prefix, new_prefix, src) {
return dest;
}

if super::next_key_no_alloc(old_prefix) != src.split_last().map(|(&e, s)| (s, e)) {
// src is not the sucessor of old_prefix
return Err(WrongPrefix);
}

Ok(super::next_key(new_prefix))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_rewrite_prefix() {
assert_eq!(
rewrite_prefix(b"t123", b"t456", b"t123789"),
Ok(b"t456789".to_vec()),
);
assert_eq!(
rewrite_prefix(b"", b"t654", b"321"),
Ok(b"t654321".to_vec()),
);
assert_eq!(
rewrite_prefix(b"t234", b"t567", b"t567890"),
Err(WrongPrefix),
);
assert_eq!(rewrite_prefix(b"t123", b"t567", b"t124"), Err(WrongPrefix),);
}

#[test]
fn test_rewrite_prefix_of_end_key() {
assert_eq!(
rewrite_prefix_of_end_key(b"t123", b"t456", b"t123789"),
Ok(b"t456789".to_vec()),
);
assert_eq!(
rewrite_prefix_of_end_key(b"", b"t654", b"321"),
Ok(b"t654321".to_vec()),
);
assert_eq!(
rewrite_prefix_of_end_key(b"t234", b"t567", b"t567890"),
Err(WrongPrefix),
);
assert_eq!(
rewrite_prefix_of_end_key(b"t123", b"t567", b"t124"),
Ok(b"t568".to_vec()),
);
assert_eq!(
rewrite_prefix_of_end_key(b"t135\xff\xff", b"t248\xff\xff\xff", b"t136"),
Ok(b"t249".to_vec()),
);
assert_eq!(
rewrite_prefix_of_end_key(b"t147", b"t258", b"t148\xff\xff"),
Err(WrongPrefix),
);
assert_eq!(
rewrite_prefix_of_end_key(b"\xff\xff", b"\xff\xfe", b""),
Ok(b"\xff\xff".to_vec()),
);
assert_eq!(
rewrite_prefix_of_end_key(b"\xff\xfe", b"\xff\xff", b"\xff\xff"),
Ok(b"".to_vec()),
);
}
}
3 changes: 2 additions & 1 deletion components/sst_importer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ uuid = { version = "0.7", features = [ "serde", "v4" ] }
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "0e23a5baff302a9d7bccd85f8f31e43339c2f2c1" }
kvproto = { git = "https://github.com/pingcap/kvproto.git" }

engine = { path = "../engine" }
keys = { path = "../keys" }
tikv_alloc = { path = "../tikv_alloc", default-features = false }
external_storage = { path = "../external_storage" }
hex = "0.3"

[dependencies.prometheus]
git = "https://github.com/pingcap/rust-prometheus.git"
Expand Down
12 changes: 12 additions & 0 deletions components/sst_importer/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ quick_error! {
display("Invalid SST path {:?}", path)
}
InvalidChunk {}
CannotReadExternalStorage(url: String, name: String, err: IoError) {
cause(err)
display("Cannot read {}/{}", url, name)
}
WrongKeyPrefix(what: &'static str, key: Vec<u8>, prefix: Vec<u8>) {
display("\
{} has wrong prefix: key {} does not start with {}",
what,
hex::encode_upper(&key),
hex::encode_upper(&prefix),
)
}
}
}

Expand Down
Loading

0 comments on commit eaeb39a

Please sign in to comment.