From eaeb39a2c85684de08c48cf4b9426b3faf4defe6 Mon Sep 17 00:00:00 2001 From: kennytm Date: Mon, 21 Oct 2019 11:50:28 +0800 Subject: [PATCH] sst_importer: download SST from external storage + key rewrite (#5581) * sst_importer: download SST from external storage + key rewrite Signed-off-by: kennytm * sst_importer: update kvproto; respect required range; add some logs Signed-off-by: kennytm * sst_importer: fix clippy warning Signed-off-by: kennytm * sst_importer: do check invalid prefix; use data_key for SST range Signed-off-by: kennytm * sst_importer: use origin key for rewrite rules Signed-off-by: kennytm * sst_importer: fix clippy warning Signed-off-by: kennytm * sst_importer: make download range use post-rewrite keys; return new range Signed-off-by: kennytm --- Cargo.lock | 4 +- components/engine/src/rocks/mod.rs | 2 +- components/engine/src/rocks/sst.rs | 27 +- components/keys/src/lib.rs | 40 ++ components/keys/src/rewrite.rs | 103 ++++ components/sst_importer/Cargo.toml | 3 +- components/sst_importer/src/errors.rs | 12 + components/sst_importer/src/sst_importer.rs | 551 +++++++++++++++++++- src/import/sst_service.rs | 35 ++ tests/integrations/import/sst_service.rs | 81 ++- 10 files changed, 842 insertions(+), 16 deletions(-) create mode 100644 components/keys/src/rewrite.rs diff --git a/Cargo.lock b/Cargo.lock index 9810f7a3dde..342cf97aee8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1326,7 +1326,7 @@ dependencies = [ [[package]] name = "kvproto" version = "0.0.2" -source = "git+https://github.com/pingcap/kvproto.git#7a367e846b9f04a59a6d8cee4112de37851074dc" +source = "git+https://github.com/pingcap/kvproto.git#199062ebc51c64d90f8241b6a51c38c1b9d052da" dependencies = [ "futures", "grpcio", @@ -2702,8 +2702,10 @@ version = "0.1.0" dependencies = [ "crc", "engine", + "external_storage", "futures", "grpcio", + "hex", "keys", "kvproto", "lazy_static", diff --git a/components/engine/src/rocks/mod.rs b/components/engine/src/rocks/mod.rs index a2cb061ea40..70eeb0dc036 100644 --- a/components/engine/src/rocks/mod.rs +++ b/components/engine/src/rocks/mod.rs @@ -2,7 +2,7 @@ mod db; mod sst; -pub use sst::{SstWriter, SstWriterBuilder}; +pub use sst::{SstReader, SstWriter, SstWriterBuilder}; pub mod util; diff --git a/components/engine/src/rocks/sst.rs b/components/engine/src/rocks/sst.rs index 40e99a52bb9..9481c0a4caf 100644 --- a/components/engine/src/rocks/sst.rs +++ b/components/engine/src/rocks/sst.rs @@ -3,9 +3,11 @@ use std::sync::Arc; use super::util::get_fastest_supported_compression_type; -use super::{ColumnFamilyOptions, DBCompressionType, Env, EnvOptions, ExternalSstFileInfo, DB}; +use super::{ + ColumnFamilyOptions, DBCompressionType, DBIterator, Env, EnvOptions, ExternalSstFileInfo, DB, +}; use crate::{CfName, CF_DEFAULT}; -use rocksdb::SstFileWriter; +use rocksdb::{SstFileReader, SstFileWriter}; /// A builder builds a SstWriter. pub struct SstWriterBuilder { @@ -134,6 +136,27 @@ impl SstWriter { } } +/// SstReader is used to read an SST file. +pub struct SstReader { + reader: SstFileReader, +} + +impl SstReader { + pub fn open(path: &str) -> Result { + let mut reader = SstFileReader::new(ColumnFamilyOptions::new()); + reader.open(path)?; + Ok(SstReader { reader }) + } + + pub fn verify_checksum(&self) -> Result<(), String> { + self.reader.verify_checksum() + } + + pub fn iter(&self) -> DBIterator<&SstFileReader> { + self.reader.iter() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/components/keys/src/lib.rs b/components/keys/src/lib.rs index 001fd3291a6..abae4bb3046 100644 --- a/components/keys/src/lib.rs +++ b/components/keys/src/lib.rs @@ -12,6 +12,7 @@ use byteorder::{BigEndian, ByteOrder}; use kvproto::metapb::Region; use std::mem; +pub mod rewrite; mod types; pub use types::{Key, KvPair, Value}; @@ -231,6 +232,45 @@ pub fn data_end_key(region_end_key: &[u8]) -> Vec { } } +pub fn origin_end_key(key: &[u8]) -> &[u8] { + if key == DATA_MAX_KEY { + b"" + } else { + origin_key(key) + } +} + +pub(crate) fn next_key_no_alloc(key: &[u8]) -> Option<(&[u8], u8)> { + let pos = key.iter().rposition(|b| *b != 0xff)?; + Some((&key[..pos], key[pos] + 1)) +} + +/// Computes the next key of the given key. +/// +/// If the key has no successor key (e.g. the input is "\xff\xff"), the result +/// would be an empty vector. +/// +/// # Examples +/// +/// ``` +/// assert_eq!(next_key(b"123"), b"124"); +/// assert_eq!(next_key(b"12\xff"), b"13"); +/// assert_eq!(next_key(b"\xff\xff"), b""); +/// assert_eq!(next_key(b"\xff\xfe"), b"\xff\xff"); +/// assert_eq!(next_key(b"T"), b"U"); +/// assert_eq!(next_key(b""), b""); +/// ``` +pub fn next_key(key: &[u8]) -> Vec { + if let Some((s, e)) = next_key_no_alloc(key) { + let mut res = Vec::with_capacity(s.len() + 1); + res.extend_from_slice(s); + res.push(e); + res + } else { + Vec::new() + } +} + #[derive(Debug, Display, Fail)] pub enum Error { #[display(fmt = "{} is not a valid raft log key", "hex::encode_upper(_0)")] diff --git a/components/keys/src/rewrite.rs b/components/keys/src/rewrite.rs new file mode 100644 index 00000000000..bb30ed1104a --- /dev/null +++ b/components/keys/src/rewrite.rs @@ -0,0 +1,103 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +//! Key rewriting + +/// An error indicating the key cannot be rewritten because it does not start +/// with the given prefix. +#[derive(PartialEq, Eq, Debug, Clone)] +pub struct WrongPrefix; + +/// Rewrites the prefix of a byte array. +pub fn rewrite_prefix( + old_prefix: &[u8], + new_prefix: &[u8], + src: &[u8], +) -> Result, WrongPrefix> { + if !src.starts_with(old_prefix) { + return Err(WrongPrefix); + } + let mut result = Vec::with_capacity(src.len() - old_prefix.len() + new_prefix.len()); + result.extend_from_slice(new_prefix); + result.extend_from_slice(&src[old_prefix.len()..]); + Ok(result) +} + +/// Rewrites the prefix of a byte array used as the end key. +/// +/// Besides values supported by `rewrite_prefix`, if the src is exactly the +/// successor of `old_prefix`, this method will also return a Ok with the +/// successor of `new_prefix`. +pub fn rewrite_prefix_of_end_key( + old_prefix: &[u8], + new_prefix: &[u8], + src: &[u8], +) -> Result, WrongPrefix> { + if let dest @ Ok(_) = rewrite_prefix(old_prefix, new_prefix, src) { + return dest; + } + + if super::next_key_no_alloc(old_prefix) != src.split_last().map(|(&e, s)| (s, e)) { + // src is not the sucessor of old_prefix + return Err(WrongPrefix); + } + + Ok(super::next_key(new_prefix)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_rewrite_prefix() { + assert_eq!( + rewrite_prefix(b"t123", b"t456", b"t123789"), + Ok(b"t456789".to_vec()), + ); + assert_eq!( + rewrite_prefix(b"", b"t654", b"321"), + Ok(b"t654321".to_vec()), + ); + assert_eq!( + rewrite_prefix(b"t234", b"t567", b"t567890"), + Err(WrongPrefix), + ); + assert_eq!(rewrite_prefix(b"t123", b"t567", b"t124"), Err(WrongPrefix),); + } + + #[test] + fn test_rewrite_prefix_of_end_key() { + assert_eq!( + rewrite_prefix_of_end_key(b"t123", b"t456", b"t123789"), + Ok(b"t456789".to_vec()), + ); + assert_eq!( + rewrite_prefix_of_end_key(b"", b"t654", b"321"), + Ok(b"t654321".to_vec()), + ); + assert_eq!( + rewrite_prefix_of_end_key(b"t234", b"t567", b"t567890"), + Err(WrongPrefix), + ); + assert_eq!( + rewrite_prefix_of_end_key(b"t123", b"t567", b"t124"), + Ok(b"t568".to_vec()), + ); + assert_eq!( + rewrite_prefix_of_end_key(b"t135\xff\xff", b"t248\xff\xff\xff", b"t136"), + Ok(b"t249".to_vec()), + ); + assert_eq!( + rewrite_prefix_of_end_key(b"t147", b"t258", b"t148\xff\xff"), + Err(WrongPrefix), + ); + assert_eq!( + rewrite_prefix_of_end_key(b"\xff\xff", b"\xff\xfe", b""), + Ok(b"\xff\xff".to_vec()), + ); + assert_eq!( + rewrite_prefix_of_end_key(b"\xff\xfe", b"\xff\xff", b"\xff\xff"), + Ok(b"".to_vec()), + ); + } +} diff --git a/components/sst_importer/Cargo.toml b/components/sst_importer/Cargo.toml index 8108caa9dd8..5101b08bd3a 100644 --- a/components/sst_importer/Cargo.toml +++ b/components/sst_importer/Cargo.toml @@ -16,10 +16,11 @@ uuid = { version = "0.7", features = [ "serde", "v4" ] } slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] } slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "0e23a5baff302a9d7bccd85f8f31e43339c2f2c1" } kvproto = { git = "https://github.com/pingcap/kvproto.git" } - engine = { path = "../engine" } keys = { path = "../keys" } tikv_alloc = { path = "../tikv_alloc", default-features = false } +external_storage = { path = "../external_storage" } +hex = "0.3" [dependencies.prometheus] git = "https://github.com/pingcap/rust-prometheus.git" diff --git a/components/sst_importer/src/errors.rs b/components/sst_importer/src/errors.rs index 6485e4e3566..41016c07d40 100644 --- a/components/sst_importer/src/errors.rs +++ b/components/sst_importer/src/errors.rs @@ -60,6 +60,18 @@ quick_error! { display("Invalid SST path {:?}", path) } InvalidChunk {} + CannotReadExternalStorage(url: String, name: String, err: IoError) { + cause(err) + display("Cannot read {}/{}", url, name) + } + WrongKeyPrefix(what: &'static str, key: Vec, prefix: Vec) { + display("\ + {} has wrong prefix: key {} does not start with {}", + what, + hex::encode_upper(&key), + hex::encode_upper(&prefix), + ) + } } } diff --git a/components/sst_importer/src/sst_importer.rs b/components/sst_importer/src/sst_importer.rs index 129fabd39f2..1756e2f7ba1 100644 --- a/components/sst_importer/src/sst_importer.rs +++ b/components/sst_importer/src/sst_importer.rs @@ -4,13 +4,16 @@ use std::fmt; use std::fs::{self, File, OpenOptions}; use std::io::Write; use std::path::{Path, PathBuf}; +use std::sync::Arc; use crc::crc32::{self, Hasher32}; use kvproto::import_sstpb::*; use uuid::{Builder as UuidBuilder, Uuid}; +use engine::rocks::util::io_limiter::{IOLimiter, LimitReader}; use engine::rocks::util::{get_cf_handle, prepare_sst_for_ingestion, validate_sst_for_ingestion}; -use engine::rocks::{IngestExternalFileOptions, DB}; +use engine::rocks::{IngestExternalFileOptions, SeekKey, SstReader, SstWriterBuilder, DB}; +use external_storage::create_storage; use super::{Error, Result}; @@ -65,6 +68,216 @@ impl SSTImporter { } } + // Downloads an SST file from an external storage. + // + // This method is blocking. It performs the following transformations before + // writing to disk: + // + // 1. only KV pairs in the half-inclusive range (`[start, end)`) are used. + // (set the range to `["", "")` to import everything). + // 2. keys are rewritten according to the given rewrite rule. + // + // Both the range and rewrite keys are specified using origin keys. However, + // the SST itself should be data keys (contain the `z` prefix). The range + // should be specified using keys after rewriting, to be consistent with the + // region info in PD. + pub fn download( + &self, + meta: &SstMeta, + url: &str, + name: &str, + rewrite_rule: &RewriteRule, + speed_limit: u64, + ) -> Result> { + debug!("download start"; + "meta" => ?meta, + "url" => url, + "name" => name, + "rewrite_rule" => ?rewrite_rule, + "speed_limit" => speed_limit, + ); + match self.do_download(meta, url, name, rewrite_rule, speed_limit) { + Ok(r) => { + info!("download"; "meta" => ?meta, "range" => ?r); + Ok(r) + } + Err(e) => { + error!("download failed"; "meta" => ?meta, "err" => %e); + Err(e) + } + } + } + + fn do_download( + &self, + meta: &SstMeta, + url: &str, + name: &str, + rewrite_rule: &RewriteRule, + speed_limit: u64, + ) -> Result> { + let path = self.dir.join(meta)?; + + // open the external storage and limit the read speed. + let limiter = if speed_limit > 0 { + Some(Arc::new(IOLimiter::new(speed_limit))) + } else { + None + }; + + // prepare to download the file from the external_storage + let ext_storage = create_storage(url)?; + let mut ext_reader = ext_storage + .read(name) + .map_err(|e| Error::CannotReadExternalStorage(url.to_owned(), name.to_owned(), e))?; + let mut ext_reader = LimitReader::new(limiter, &mut ext_reader); + + // do the I/O copy from external_storage to the local file. + { + let mut file_writer = File::create(&path.temp)?; + let file_length = std::io::copy(&mut ext_reader, &mut file_writer)?; + if meta.length != 0 && meta.length != file_length { + let reason = format!("length {}, expect {}", file_length, meta.length); + return Err(Error::FileCorrupted(path.temp, reason)); + } + file_writer.sync_data()?; + } + + // now validate the SST file. + let path_str = path.temp.to_str().unwrap(); + let sst_reader = SstReader::open(path_str)?; + sst_reader.verify_checksum()?; + + debug!("downloaded file and verified"; + "meta" => ?meta, + "url" => url, + "name" => name, + "path" => path_str, + ); + + // undo key rewrite so we could compare with the keys inside SST + let old_prefix = rewrite_rule.get_old_key_prefix(); + let new_prefix = rewrite_rule.get_new_key_prefix(); + + let range_start = meta.get_range().get_start(); + let range_end = meta.get_range().get_end(); + + let range_start = keys::rewrite::rewrite_prefix(new_prefix, old_prefix, range_start) + .or_else(|_| { + if range_start.is_empty() { + Ok(if old_prefix.is_empty() { + new_prefix.to_vec() + } else { + Vec::new() + }) + } else { + Err(Error::WrongKeyPrefix( + "SST start range", + range_start.to_vec(), + new_prefix.to_vec(), + )) + } + })?; + let range_end = keys::rewrite::rewrite_prefix_of_end_key(new_prefix, old_prefix, range_end) + .or_else(|_| { + if range_end.is_empty() { + Ok(if old_prefix.is_empty() { + keys::next_key(new_prefix) + } else { + Vec::new() + }) + } else { + Err(Error::WrongKeyPrefix( + "SST end range", + range_end.to_vec(), + new_prefix.to_vec(), + )) + } + })?; + + // read and first and last keys from the SST, determine if we could + // simply move the entire SST instead of iterating and generate a new one. + let mut iter = sst_reader.iter(); + let direct_retval = (|| { + if rewrite_rule.old_key_prefix != rewrite_rule.new_key_prefix { + // must iterate if we perform key rewrite + return None; + } + if !iter.seek(SeekKey::Start) { + // the SST is empty, so no need to iterate at all (should be impossible?) + return Some(meta.get_range().clone()); + } + let start_key = keys::origin_key(iter.key()); + if *start_key < *range_start { + // SST's start is before the range to consume, so needs to iterate to skip over + return None; + } + let start_key = start_key.to_vec(); + + // seek to end and fetch the last (inclusive) key of the SST. + iter.seek(SeekKey::End); + let last_key = keys::origin_end_key(iter.key()); + if !range_end.is_empty() && *last_key >= *range_end { + // SST's end is after the range to consume + return None; + } + + // range contained the entire SST, no need to iterate, just moving the file is ok + let mut range = Range::default(); + range.set_start(start_key); + range.set_end(keys::next_key(last_key)); + Some(range) + })(); + + if let Some(range) = direct_retval { + // TODO: what about encrypted SSTs? + fs::rename(&path.temp, &path.save)?; + return Ok(Some(range)); + } + + // perform iteration and key rewrite. + let mut sst_writer = SstWriterBuilder::new().build(path.save.to_str().unwrap())?; + let mut key = keys::data_key(new_prefix); + let new_prefix_data_key_len = key.len(); + let mut first_key = None; + + iter.seek(SeekKey::Key(&keys::data_key(&range_start))); + while iter.valid() { + let old_key = keys::origin_key(iter.key()); + if !range_end.is_empty() && *old_key >= *range_end { + break; + } + if !old_key.starts_with(old_prefix) { + return Err(Error::WrongKeyPrefix( + "Key in SST", + old_key.to_vec(), + old_prefix.to_vec(), + )); + } + + key.truncate(new_prefix_data_key_len); + key.extend_from_slice(&old_key[old_prefix.len()..]); + sst_writer.put(&key, iter.value())?; + iter.next(); + if first_key.is_none() { + first_key = Some(keys::origin_key(&key).to_vec()); + } + } + + let _ = fs::remove_file(&path.temp); + + if let Some(start_key) = first_key { + sst_writer.finish()?; + let mut final_range = Range::default(); + final_range.set_start(start_key); + final_range.set_end(keys::next_key(keys::origin_key(&key))); + Ok(Some(final_range)) + } else { + // nothing is written: prevents finishing the SST at all. + Ok(None) + } + } + pub fn list_ssts(&self) -> Result> { self.dir.list_ssts() } @@ -145,7 +358,14 @@ impl ImportDir { let path = self.join(meta)?; let cf = meta.get_cf_name(); prepare_sst_for_ingestion(&path.save, &path.clone)?; - validate_sst_for_ingestion(db, cf, &path.clone, meta.get_length(), meta.get_crc32())?; + let length = meta.get_length(); + let crc32 = meta.get_crc32(); + if length != 0 || crc32 != 0 { + // we only validate if the length and CRC32 are explicitly provided. + validate_sst_for_ingestion(db, cf, &path.clone, length, crc32)?; + } else { + debug!("skipping SST validation since length and crc32 are both 0"); + } let handle = get_cf_handle(db, cf)?; let mut opts = IngestExternalFileOptions::new(); @@ -448,4 +668,331 @@ mod tests { let new_meta = path_to_sst_meta(path).unwrap(); assert_eq!(meta, new_meta); } + + fn create_sample_external_sst_file() -> Result<(tempfile::TempDir, SstMeta)> { + let ext_sst_dir = tempfile::tempdir()?; + let mut sst_writer = SstWriterBuilder::new() + .build(ext_sst_dir.path().join("sample.sst").to_str().unwrap())?; + sst_writer.put(b"zt123_r01", b"abc")?; + sst_writer.put(b"zt123_r04", b"xyz")?; + sst_writer.put(b"zt123_r07", b"pqrst")?; + // sst_writer.delete(b"t123_r10")?; // FIXME: can't handle DELETE ops yet. + sst_writer.put(b"zt123_r13", b"www")?; + let sst_info = sst_writer.finish()?; + + // make up the SST meta for downloading. + let mut meta = SstMeta::default(); + let uuid = Uuid::new_v4(); + meta.set_uuid(uuid.as_bytes().to_vec()); + meta.set_cf_name("default".to_owned()); + meta.set_length(sst_info.file_size()); + meta.set_region_id(4); + meta.mut_region_epoch().set_conf_ver(5); + meta.mut_region_epoch().set_version(6); + + Ok((ext_sst_dir, meta)) + } + + fn new_rewrite_rule(old_key_prefix: &[u8], new_key_prefix: &[u8]) -> RewriteRule { + let mut rule = RewriteRule::new(); + rule.set_old_key_prefix(old_key_prefix.to_vec()); + rule.set_new_key_prefix(new_key_prefix.to_vec()); + rule + } + + #[test] + fn test_download_sst_no_key_rewrite() { + // creates a sample SST file. + let (ext_sst_dir, meta) = create_sample_external_sst_file().unwrap(); + + // performs the download. + let importer_dir = tempfile::tempdir().unwrap(); + let importer = SSTImporter::new(&importer_dir).unwrap(); + + let range = importer + .download( + &meta, + &format!("local://{}", ext_sst_dir.path().display()), + "sample.sst", + &RewriteRule::default(), + 0, + ) + .unwrap() + .unwrap(); + + dbg!(&range); + + assert_eq!(range.get_start(), b"t123_r01"); + assert_eq!(range.get_end(), b"t123_r14"); + + // verifies that the file is saved to the correct place. + let sst_file_path = importer.dir.join(&meta).unwrap().save; + let sst_file_metadata = sst_file_path.metadata().unwrap(); + assert!(sst_file_metadata.is_file()); + assert_eq!(sst_file_metadata.len(), meta.get_length()); + + // verifies the SST content is correct. + let sst_reader = SstReader::open(sst_file_path.to_str().unwrap()).unwrap(); + sst_reader.verify_checksum().unwrap(); + let mut iter = sst_reader.iter(); + iter.seek(SeekKey::Start); + assert_eq!( + iter.collect::>(), + vec![ + (b"zt123_r01".to_vec(), b"abc".to_vec()), + (b"zt123_r04".to_vec(), b"xyz".to_vec()), + (b"zt123_r07".to_vec(), b"pqrst".to_vec()), + (b"zt123_r13".to_vec(), b"www".to_vec()), + ] + ); + } + + #[test] + fn test_download_sst_with_key_rewrite() { + // creates a sample SST file. + let (ext_sst_dir, meta) = create_sample_external_sst_file().unwrap(); + + // performs the download. + let importer_dir = tempfile::tempdir().unwrap(); + let importer = SSTImporter::new(&importer_dir).unwrap(); + + let range = importer + .download( + &meta, + &format!("local://{}", ext_sst_dir.path().display()), + "sample.sst", + &new_rewrite_rule(b"t123", b"t567"), + 0, + ) + .unwrap() + .unwrap(); + + assert_eq!(range.get_start(), b"t567_r01"); + assert_eq!(range.get_end(), b"t567_r14"); + + // verifies that the file is saved to the correct place. + // (the file size may be changed, so not going to check the file size) + let sst_file_path = importer.dir.join(&meta).unwrap().save; + assert!(sst_file_path.is_file()); + + // verifies the SST content is correct. + let sst_reader = SstReader::open(sst_file_path.to_str().unwrap()).unwrap(); + sst_reader.verify_checksum().unwrap(); + let mut iter = sst_reader.iter(); + iter.seek(SeekKey::Start); + assert_eq!( + iter.collect::>(), + vec![ + (b"zt567_r01".to_vec(), b"abc".to_vec()), + (b"zt567_r04".to_vec(), b"xyz".to_vec()), + (b"zt567_r07".to_vec(), b"pqrst".to_vec()), + (b"zt567_r13".to_vec(), b"www".to_vec()), + ] + ); + } + + #[test] + fn test_download_sst_then_ingest() { + // creates a sample SST file. + let (ext_sst_dir, mut meta) = create_sample_external_sst_file().unwrap(); + + // performs the download. + let importer_dir = tempfile::tempdir().unwrap(); + let importer = SSTImporter::new(&importer_dir).unwrap(); + + let range = importer + .download( + &meta, + &format!("local://{}", ext_sst_dir.path().display()), + "sample.sst", + &new_rewrite_rule(b"t123", b"t9102"), + 0, + ) + .unwrap() + .unwrap(); + + assert_eq!(range.get_start(), b"t9102_r01"); + assert_eq!(range.get_end(), b"t9102_r14"); + + // performs the ingest + let ingest_dir = tempfile::tempdir().unwrap(); + let db = new_engine( + ingest_dir.path().to_str().unwrap(), + None, + &["default"], + None, + ) + .unwrap(); + + meta.set_length(0); // disable validation. + meta.set_crc32(0); + importer.ingest(&meta, &db).unwrap(); + + // verifies the DB content is correct. + let mut iter = db.iter(); + iter.seek(SeekKey::Start); + assert_eq!( + iter.collect::>(), + vec![ + (b"zt9102_r01".to_vec(), b"abc".to_vec()), + (b"zt9102_r04".to_vec(), b"xyz".to_vec()), + (b"zt9102_r07".to_vec(), b"pqrst".to_vec()), + (b"zt9102_r13".to_vec(), b"www".to_vec()), + ] + ); + } + + #[test] + fn test_download_sst_partial_range() { + let (ext_sst_dir, mut meta) = create_sample_external_sst_file().unwrap(); + let importer_dir = tempfile::tempdir().unwrap(); + let importer = SSTImporter::new(&importer_dir).unwrap(); + + // note: the range doesn't contain the DATA_PREFIX 'z'. + meta.mut_range().set_start(b"t123_r02".to_vec()); + meta.mut_range().set_end(b"t123_r13".to_vec()); + + let range = importer + .download( + &meta, + &format!("local://{}", ext_sst_dir.path().display()), + "sample.sst", + &RewriteRule::default(), + 0, + ) + .unwrap() + .unwrap(); + + assert_eq!(range.get_start(), b"t123_r04"); + assert_eq!(range.get_end(), b"t123_r08"); + + // verifies that the file is saved to the correct place. + // (the file size is changed, so not going to check the file size) + let sst_file_path = importer.dir.join(&meta).unwrap().save; + assert!(sst_file_path.is_file()); + + // verifies the SST content is correct. + let sst_reader = SstReader::open(sst_file_path.to_str().unwrap()).unwrap(); + sst_reader.verify_checksum().unwrap(); + let mut iter = sst_reader.iter(); + iter.seek(SeekKey::Start); + assert_eq!( + iter.collect::>(), + vec![ + (b"zt123_r04".to_vec(), b"xyz".to_vec()), + (b"zt123_r07".to_vec(), b"pqrst".to_vec()), + ] + ); + } + + #[test] + fn test_download_sst_partial_range_with_key_rewrite() { + let (ext_sst_dir, mut meta) = create_sample_external_sst_file().unwrap(); + let importer_dir = tempfile::tempdir().unwrap(); + let importer = SSTImporter::new(&importer_dir).unwrap(); + + meta.mut_range().set_start(b"t5_r02".to_vec()); + meta.mut_range().set_end(b"t5_r13".to_vec()); + + let range = importer + .download( + &meta, + &format!("local://{}", ext_sst_dir.path().display()), + "sample.sst", + &new_rewrite_rule(b"t123", b"t5"), + 0, + ) + .unwrap() + .unwrap(); + + assert_eq!(range.get_start(), b"t5_r04"); + assert_eq!(range.get_end(), b"t5_r08"); + + // verifies that the file is saved to the correct place. + let sst_file_path = importer.dir.join(&meta).unwrap().save; + assert!(sst_file_path.is_file()); + + // verifies the SST content is correct. + let sst_reader = SstReader::open(sst_file_path.to_str().unwrap()).unwrap(); + sst_reader.verify_checksum().unwrap(); + let mut iter = sst_reader.iter(); + iter.seek(SeekKey::Start); + assert_eq!( + iter.collect::>(), + vec![ + (b"zt5_r04".to_vec(), b"xyz".to_vec()), + (b"zt5_r07".to_vec(), b"pqrst".to_vec()), + ] + ); + } + + #[test] + fn test_download_sst_invalid() { + let ext_sst_dir = tempfile::tempdir().unwrap(); + fs::write(ext_sst_dir.path().join("sample.sst"), b"not an SST file").unwrap(); + + let importer_dir = tempfile::tempdir().unwrap(); + let importer = SSTImporter::new(&importer_dir).unwrap(); + + let mut meta = SstMeta::new(); + meta.set_uuid(vec![0u8; 16]); + + let result = importer.download( + &meta, + &format!("local://{}", ext_sst_dir.path().display()), + "sample.sst", + &RewriteRule::default(), + 0, + ); + match &result { + Err(Error::RocksDB(msg)) if msg.starts_with("Corruption:") => {} + _ => panic!("unexpected download result: {:?}", result), + } + } + + #[test] + fn test_download_sst_empty() { + let (ext_sst_dir, mut meta) = create_sample_external_sst_file().unwrap(); + let importer_dir = tempfile::tempdir().unwrap(); + let importer = SSTImporter::new(&importer_dir).unwrap(); + + meta.mut_range().set_start(vec![b'x']); + meta.mut_range().set_end(vec![b'y']); + + let result = importer.download( + &meta, + &format!("local://{}", ext_sst_dir.path().display()), + "sample.sst", + &RewriteRule::default(), + 0, + ); + + match result { + Ok(None) => {} + _ => panic!("unexpected download result: {:?}", result), + } + } + + #[test] + fn test_download_sst_wrong_key_prefix() { + let (ext_sst_dir, meta) = create_sample_external_sst_file().unwrap(); + let importer_dir = tempfile::tempdir().unwrap(); + let importer = SSTImporter::new(&importer_dir).unwrap(); + + let result = importer.download( + &meta, + &format!("local://{}", ext_sst_dir.path().display()), + "sample.sst", + &new_rewrite_rule(b"xxx", b"yyy"), + 0, + ); + + match &result { + Err(Error::WrongKeyPrefix(_, key, prefix)) => { + assert_eq!(key, b"t123_r01"); + assert_eq!(prefix, b"xxx"); + } + _ => panic!("unexpected download result: {:?}", result), + } + } } diff --git a/src/import/sst_service.rs b/src/import/sst_service.rs index bbc107a4589..93e876792cd 100644 --- a/src/import/sst_service.rs +++ b/src/import/sst_service.rs @@ -143,6 +143,41 @@ impl ImportSst for ImportSSTService { ) } + /// Downloads the file and performs key-rewrite for later ingesting. + fn download( + &mut self, + ctx: RpcContext<'_>, + req: DownloadRequest, + sink: UnarySink, + ) { + let label = "download"; + let timer = Instant::now_coarse(); + let importer = Arc::clone(&self.importer); + + ctx.spawn(self.threads.spawn_fn(move || { + let res = importer.download( + req.get_sst(), + req.get_url(), + req.get_name(), + req.get_rewrite_rule(), + req.get_speed_limit(), + ); + + future::result(res) + .map_err(Error::from) + .map(|range| { + let mut resp = DownloadResponse::default(); + if let Some(r) = range { + resp.set_range(r); + } else { + resp.set_is_empty(true); + } + resp + }) + .then(move |res| send_rpc_response!(res, sink, label, timer)) + })); + } + /// Ingest the file by sending a raft command to raftstore. /// /// If the ingestion fails because the region is not found or the epoch does diff --git a/tests/integrations/import/sst_service.rs b/tests/integrations/import/sst_service.rs index b41c5850733..a7ab4a7d08e 100644 --- a/tests/integrations/import/sst_service.rs +++ b/tests/integrations/import/sst_service.rs @@ -109,20 +109,71 @@ fn test_ingest_sst() { assert!(!resp.has_error()); // Check ingested kvs - for i in sst_range.0..sst_range.1 { - let mut m = RawGetRequest::default(); - m.set_context(ctx.clone()); - m.set_key(vec![i]); - let resp = tikv.raw_get(&m).unwrap(); - assert!(resp.get_error().is_empty()); - assert!(!resp.has_region_error()); - assert_eq!(resp.get_value(), &[i]); - } + check_ingested_kvs(&tikv, &ctx, sst_range); // Upload the same file again to check if the ingested file has been deleted. send_upload_sst(&import, &meta, &data).unwrap(); } +#[test] +fn test_download_sst() { + use grpcio::{Error, RpcStatus}; + + let (_cluster, ctx, tikv, import) = new_cluster_and_tikv_import_client(); + let temp_dir = Builder::new() + .prefix("test_download_sst") + .tempdir() + .unwrap(); + + let sst_path = temp_dir.path().join("test.sst"); + let sst_range = (0, 100); + let (mut meta, _) = gen_sst_file(sst_path, sst_range); + meta.set_region_id(ctx.get_region_id()); + meta.set_region_epoch(ctx.get_region_epoch().clone()); + + // Checks that downloading a non-existing storage returns error. + let mut download = DownloadRequest::default(); + download.set_sst(meta.clone()); + download.set_url(format!("local://{}", temp_dir.path().display())); + download.set_name("missing.sst".to_owned()); + + let result = import.download(&download); + match &result { + Err(Error::RpcFailure(RpcStatus { + details: Some(msg), .. + })) if msg.contains("CannotReadExternalStorage") => {} + _ => panic!("unexpected download reply: {:?}", result), + } + + // Checks that downloading an empty SST returns OK (but cannot be ingested) + download.set_name("test.sst".to_owned()); + download.mut_sst().mut_range().set_start(vec![sst_range.1]); + download + .mut_sst() + .mut_range() + .set_end(vec![sst_range.1 + 1]); + let result = import.download(&download).unwrap(); + assert!(result.get_is_empty()); + + // Now perform a proper download. + download.mut_sst().mut_range().set_start(Vec::new()); + download.mut_sst().mut_range().set_end(Vec::new()); + let result = import.download(&download).unwrap(); + assert!(!result.get_is_empty()); + assert_eq!(result.get_range().get_start(), &[sst_range.0]); + assert_eq!(result.get_range().get_end(), &[sst_range.1]); + + // Do an ingest and verify the result is correct. + + let mut ingest = IngestRequest::default(); + ingest.set_context(ctx.clone()); + ingest.set_sst(meta); + let resp = import.ingest(&ingest).unwrap(); + assert!(!resp.has_error()); + + check_ingested_kvs(&tikv, &ctx, sst_range); +} + #[test] fn test_cleanup_sst() { let (mut cluster, ctx, _, import) = new_cluster_and_tikv_import_client(); @@ -191,6 +242,18 @@ fn send_upload_sst( stream.forward(tx).and_then(|_| rx).wait() } +fn check_ingested_kvs(tikv: &TikvClient, ctx: &Context, sst_range: (u8, u8)) { + for i in sst_range.0..sst_range.1 { + let mut m = RawGetRequest::default(); + m.set_context(ctx.clone()); + m.set_key(vec![i]); + let resp = tikv.raw_get(&m).unwrap(); + assert!(resp.get_error().is_empty()); + assert!(!resp.has_region_error()); + assert_eq!(resp.get_value(), &[i]); + } +} + fn check_sst_deleted(client: &ImportSstClient, meta: &SstMeta, data: &[u8]) { for _ in 0..10 { if send_upload_sst(client, meta, data).is_ok() {