From 6d0edd7e690d376e651aecf6f0bbefdeec22c390 Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Tue, 12 Oct 2021 02:19:26 +0800 Subject: [PATCH] storage: Add config storage.api_version and add DataEncode to kvdb (#11021) * storage: Add config storage.api_version and add DataEncode to kvdb Signed-off-by: Andy Lok * Fix TiDB data range Signed-off-by: Andy Lok * Add test Signed-off-by: Andy Lok * fix text Signed-off-by: Andy Lok * fix ci Signed-off-by: Andy Lok * edit kvproto Signed-off-by: Andy Lok * fix proto Signed-off-by: Andy Lok * fix test Signed-off-by: Andy Lok * address comment Signed-off-by: Andy Lok * fix test Signed-off-by: Andy Lok * address comment Signed-off-by: Andy Lok * fix test Signed-off-by: Andy Lok * fix test Signed-off-by: Andy Lok Co-authored-by: Ti Chi Robot --- Cargo.lock | 2 +- components/backup/src/errors.rs | 16 +- components/cdc/src/errors.rs | 20 +- .../external_storage/export/src/export.rs | 1 + components/keys/src/lib.rs | 4 + components/resolved_ts/src/errors.rs | 14 +- components/test_storage/src/assert_storage.rs | 6 +- src/coprocessor/error.rs | 4 +- src/coprocessor_v2/raw_storage_impl.rs | 14 +- src/storage/config.rs | 45 ++++- src/storage/errors.rs | 24 ++- src/storage/mod.rs | 179 ++++++++++++++++-- src/storage/mvcc/mod.rs | 6 +- tests/failpoints/cases/test_storage.rs | 2 +- tests/integrations/config/mod.rs | 2 + 15 files changed, 273 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca44c5825da..85ecfab60b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2128,7 +2128,7 @@ dependencies = [ [[package]] name = "kvproto" version = "0.0.2" -source = "git+https://github.com/pingcap/kvproto.git#df38c15b57b3049293c71ca828a42d83faffe221" +source = "git+https://github.com/pingcap/kvproto.git#d6dd964d6b8cfcb28d1ffa07d9002ba32f0b0418" dependencies = [ "futures 0.3.15", "grpcio", diff --git a/components/backup/src/errors.rs b/components/backup/src/errors.rs index ecaef0a7cc8..49337d242a4 100644 --- a/components/backup/src/errors.rs +++ b/components/backup/src/errors.rs @@ -8,7 +8,7 @@ use kvproto::brpb::Error as ErrorPb; use kvproto::errorpb::{Error as RegionError, ServerIsBusy}; use kvproto::kvrpcpb::KeyError; use thiserror::Error; -use tikv::storage::kv::{Error as EngineError, ErrorInner as EngineErrorInner}; +use tikv::storage::kv::{Error as KvError, ErrorInner as EngineErrorInner}; use tikv::storage::mvcc::{Error as MvccError, ErrorInner as MvccErrorInner}; use tikv::storage::txn::{Error as TxnError, ErrorInner as TxnErrorInner}; @@ -26,13 +26,13 @@ impl From for ErrorPb { err.mut_cluster_id_error().set_current(current); err.mut_cluster_id_error().set_request(request); } - Error::Engine(EngineError(box EngineErrorInner::Request(e))) - | Error::Txn(TxnError(box TxnErrorInner::Engine(EngineError( + Error::Kv(KvError(box EngineErrorInner::Request(e))) + | Error::Txn(TxnError(box TxnErrorInner::Engine(KvError( box EngineErrorInner::Request(e), )))) - | Error::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError( - box MvccErrorInner::Engine(EngineError(box EngineErrorInner::Request(e))), - )))) => { + | Error::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError(box MvccErrorInner::Kv( + KvError(box EngineErrorInner::Request(e)), + ))))) => { if e.has_not_leader() { BACKUP_RANGE_ERROR_VEC .with_label_values(&["not_leader"]) @@ -75,7 +75,7 @@ impl From for ErrorPb { e.set_locked(info); err.set_kv_error(e); } - timeout @ Error::Engine(EngineError(box EngineErrorInner::Timeout(_))) => { + timeout @ Error::Kv(KvError(box EngineErrorInner::Timeout(_))) => { BACKUP_RANGE_ERROR_VEC.with_label_values(&["timeout"]).inc(); let mut busy = ServerIsBusy::default(); let reason = format!("{}", timeout); @@ -104,7 +104,7 @@ pub enum Error { #[error("IO error {0}")] Io(#[from] IoError), #[error("Engine error {0}")] - Engine(#[from] EngineError), + Kv(#[from] KvError), #[error("Engine error {0}")] EngineTrait(#[from] EngineTraitError), #[error("Transaction error {0}")] diff --git a/components/cdc/src/errors.rs b/components/cdc/src/errors.rs index ce757e26f95..f484b7339b6 100644 --- a/components/cdc/src/errors.rs +++ b/components/cdc/src/errors.rs @@ -6,7 +6,7 @@ use std::{error, result}; use engine_traits::Error as EngineTraitsError; use kvproto::errorpb; use thiserror::Error; -use tikv::storage::kv::{Error as EngineError, ErrorInner as EngineErrorInner}; +use tikv::storage::kv::{Error as KvError, ErrorInner as EngineErrorInner}; use tikv::storage::mvcc::{Error as MvccError, ErrorInner as MvccErrorInner}; use tikv::storage::txn::{Error as TxnError, ErrorInner as TxnErrorInner}; use txn_types::Error as TxnTypesError; @@ -23,7 +23,7 @@ pub enum Error { #[error("IO error {0}")] Io(#[from] IoError), #[error("Engine error {0}")] - Engine(#[from] EngineError), + Kv(#[from] KvError), #[error("Transaction error {0}")] Txn(#[from] TxnError), #[error("Mvcc error {0}")] @@ -63,12 +63,12 @@ impl Error { pub fn has_region_error(&self) -> bool { matches!( self, - Error::Engine(EngineError(box EngineErrorInner::Request(_))) - | Error::Txn(TxnError(box TxnErrorInner::Engine(EngineError( + Error::Kv(KvError(box EngineErrorInner::Request(_))) + | Error::Txn(TxnError(box TxnErrorInner::Engine(KvError( box EngineErrorInner::Request(_), )))) | Error::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError( - box MvccErrorInner::Engine(EngineError(box EngineErrorInner::Request(_))), + box MvccErrorInner::Kv(KvError(box EngineErrorInner::Request(_))), )))) | Error::Request(_) ) @@ -76,13 +76,13 @@ impl Error { pub fn extract_region_error(self) -> errorpb::Error { match self { - Error::Engine(EngineError(box EngineErrorInner::Request(e))) - | Error::Txn(TxnError(box TxnErrorInner::Engine(EngineError( + Error::Kv(KvError(box EngineErrorInner::Request(e))) + | Error::Txn(TxnError(box TxnErrorInner::Engine(KvError( box EngineErrorInner::Request(e), )))) - | Error::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError( - box MvccErrorInner::Engine(EngineError(box EngineErrorInner::Request(e))), - )))) + | Error::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError(box MvccErrorInner::Kv( + KvError(box EngineErrorInner::Request(e)), + ))))) | Error::Request(box e) => e, // TODO: it should be None, add more cdc errors. other => { diff --git a/components/external_storage/export/src/export.rs b/components/external_storage/export/src/export.rs index 136d0cc828b..06d2ce80517 100644 --- a/components/external_storage/export/src/export.rs +++ b/components/external_storage/export/src/export.rs @@ -170,6 +170,7 @@ fn create_backend_inner(backend: &Backend) -> io::Result unimplemented!(), #[cfg(not(any(feature = "cloud-gcp", feature = "cloud-aws")))] _ => return Err(bad_backend(backend.clone())), }; diff --git a/components/keys/src/lib.rs b/components/keys/src/lib.rs index a1253ef64b0..1755f811e6e 100644 --- a/components/keys/src/lib.rs +++ b/components/keys/src/lib.rs @@ -27,10 +27,14 @@ pub const DATA_PREFIX: u8 = b'z'; pub const DATA_PREFIX_KEY: &[u8] = &[DATA_PREFIX]; pub const DATA_MIN_KEY: &[u8] = &[DATA_PREFIX]; pub const DATA_MAX_KEY: &[u8] = &[DATA_PREFIX + 1]; +pub const DATA_TIDB_RANGES: &[(&[u8], &[u8])] = &[(&[b'm'], &[b'm' + 1]), (&[b't'], &[b't' + 1])]; +pub const DATA_TIDB_RANGES_COMPLEMENT: &[(&[u8], &[u8])] = + &[(&[], &[b'm']), (&[b'm' + 1], &[b't']), (&[b't' + 1], &[])]; // Following keys are all local keys, so the first byte must be 0x01. pub const STORE_IDENT_KEY: &[u8] = &[LOCAL_PREFIX, 0x01]; pub const PREPARE_BOOTSTRAP_KEY: &[u8] = &[LOCAL_PREFIX, 0x02]; +pub const DATA_ENCODE_KEY: &[u8] = &[LOCAL_PREFIX, 0x04]; // We save two types region data in DB, for raft and other meta data. // When the store starts, we should iterate all region meta data to // construct peer, no need to travel large raft data, so we separate them diff --git a/components/resolved_ts/src/errors.rs b/components/resolved_ts/src/errors.rs index 0eff2f028fc..4f299b4d110 100644 --- a/components/resolved_ts/src/errors.rs +++ b/components/resolved_ts/src/errors.rs @@ -6,7 +6,7 @@ use engine_traits::Error as EngineTraitsError; use kvproto::errorpb::Error as ErrorHeader; use raftstore::Error as RaftstoreError; use thiserror::Error; -use tikv::storage::kv::{Error as EngineError, ErrorInner as EngineErrorInner}; +use tikv::storage::kv::{Error as KvError, ErrorInner as EngineErrorInner}; use tikv::storage::mvcc::{Error as MvccError, ErrorInner as MvccErrorInner}; use tikv::storage::txn::{Error as TxnError, ErrorInner as TxnErrorInner}; use txn_types::Error as TxnTypesError; @@ -16,7 +16,7 @@ pub enum Error { #[error("IO error {0}")] Io(#[from] IoError), #[error("Engine error {0}")] - Engine(#[from] EngineError), + Kv(#[from] KvError), #[error("Transaction error {0}")] Txn(#[from] TxnError), #[error("Mvcc error {0}")] @@ -40,13 +40,13 @@ impl Error { pub fn extract_error_header(self) -> ErrorHeader { match self { - Error::Engine(EngineError(box EngineErrorInner::Request(e))) - | Error::Txn(TxnError(box TxnErrorInner::Engine(EngineError( + Error::Kv(KvError(box EngineErrorInner::Request(e))) + | Error::Txn(TxnError(box TxnErrorInner::Engine(KvError( box EngineErrorInner::Request(e), )))) - | Error::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError( - box MvccErrorInner::Engine(EngineError(box EngineErrorInner::Request(e))), - )))) + | Error::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError(box MvccErrorInner::Kv( + KvError(box EngineErrorInner::Request(e)), + ))))) | Error::Request(box e) => e, other => { let mut e = ErrorHeader::default(); diff --git a/components/test_storage/src/assert_storage.rs b/components/test_storage/src/assert_storage.rs index da38ba881a2..6733f055085 100644 --- a/components/test_storage/src/assert_storage.rs +++ b/components/test_storage/src/assert_storage.rs @@ -274,14 +274,12 @@ impl AssertionStorage { fn expect_not_leader_or_stale_command(&self, err: storage::Error) { match err { StorageError(box StorageErrorInner::Txn(TxnError(box TxnErrorInner::Mvcc( - MvccError(box MvccErrorInner::Engine(KvError(box KvErrorInner::Request(ref e)))), + MvccError(box MvccErrorInner::Kv(KvError(box KvErrorInner::Request(ref e)))), )))) | StorageError(box StorageErrorInner::Txn(TxnError(box TxnErrorInner::Engine( KvError(box KvErrorInner::Request(ref e)), )))) - | StorageError(box StorageErrorInner::Engine(KvError(box KvErrorInner::Request( - ref e, - )))) => { + | StorageError(box StorageErrorInner::Kv(KvError(box KvErrorInner::Request(ref e)))) => { assert!( e.has_not_leader() | e.has_stale_command(), "invalid error {:?}", diff --git a/src/coprocessor/error.rs b/src/coprocessor/error.rs index 92f7a302225..1d05bbb87e9 100644 --- a/src/coprocessor/error.rs +++ b/src/coprocessor/error.rs @@ -78,7 +78,7 @@ impl From for Error { fn from(err: MvccError) -> Self { match err { MvccError(box MvccErrorInner::KeyIsLocked(info)) => Error::Locked(info), - MvccError(box MvccErrorInner::Engine(engine_error)) => Error::from(engine_error), + MvccError(box MvccErrorInner::Kv(kv_error)) => Error::from(kv_error), e => Error::Other(e.to_string()), } } @@ -88,7 +88,7 @@ impl From for Error { fn from(err: storage::txn::Error) -> Self { match err { TxnError(box TxnErrorInner::Mvcc(mvcc_error)) => Error::from(mvcc_error), - TxnError(box TxnErrorInner::Engine(engine_error)) => Error::from(engine_error), + TxnError(box TxnErrorInner::Engine(kv_error)) => Error::from(kv_error), e => Error::Other(e.to_string()), } } diff --git a/src/coprocessor_v2/raw_storage_impl.rs b/src/coprocessor_v2/raw_storage_impl.rs index a867707e113..c61fe1f892b 100644 --- a/src/coprocessor_v2/raw_storage_impl.rs +++ b/src/coprocessor_v2/raw_storage_impl.rs @@ -8,7 +8,7 @@ use std::ops::Range; use tikv_util::future::paired_future_callback; use crate::storage::errors::extract_kv_pairs; -use crate::storage::kv::{Error as EngineError, ErrorInner as EngineErrorInner}; +use crate::storage::kv::{Error as KvError, ErrorInner as KvErrorInner}; use crate::storage::{self, lock_manager::LockManager, Engine, Storage}; /// Implementation of the [`RawStorage`] trait. @@ -173,9 +173,9 @@ impl From for PluginErrorShim { fn from(error: storage::errors::Error) -> Self { let inner = match *error.0 { // Key not in region - storage::errors::ErrorInner::Engine(EngineError(box EngineErrorInner::Request( - ref req_err, - ))) if req_err.has_key_not_in_region() => { + storage::errors::ErrorInner::Kv(KvError(box KvErrorInner::Request(ref req_err))) + if req_err.has_key_not_in_region() => + { let key_err = req_err.get_key_not_in_region(); PluginError::KeyNotInRegion { key: key_err.get_key().to_owned(), @@ -185,9 +185,9 @@ impl From for PluginErrorShim { } } // Timeout - storage::errors::ErrorInner::Engine(EngineError(box EngineErrorInner::Timeout( - duration, - ))) => PluginError::Timeout(duration), + storage::errors::ErrorInner::Kv(KvError(box KvErrorInner::Timeout(duration))) => { + PluginError::Timeout(duration) + } // Other errors are passed as-is inside their `Result` so we get a `&Result` when using `Any::downcast_ref`. _ => PluginError::Other(Box::new(storage::Result::<()>::Err(error))), }; diff --git a/src/storage/config.rs b/src/storage/config.rs index 9b1ac08dde5..e6e5a56308f 100644 --- a/src/storage/config.rs +++ b/src/storage/config.rs @@ -9,6 +9,7 @@ use crate::storage::txn::flow_controller::FlowController; use engine_rocks::raw::{Cache, LRUCacheOptions, MemoryAllocator}; use engine_traits::{ColumnFamilyOptions, KvEngine, CF_DEFAULT}; use file_system::{get_io_rate_limiter, IOPriority, IORateLimitMode, IORateLimiter, IOType}; +use kvproto::kvrpcpb::ApiVersion; use libc::c_int; use online_config::{ConfigChange, ConfigManager, ConfigValue, OnlineConfig, Result as CfgResult}; use std::error::Error; @@ -58,6 +59,9 @@ pub struct Config { pub enable_ttl: bool, /// Interval to check TTL for all SSTs, pub ttl_check_poll_interval: ReadableDuration, + #[online_config(skip)] + #[serde(with = "api_version_serde")] + pub api_version: ApiVersion, #[online_config(submodule)] pub flow_control: FlowControlConfig, #[online_config(submodule)] @@ -83,6 +87,7 @@ impl Default for Config { flow_control: FlowControlConfig::default(), block_cache: BlockCacheConfig::default(), io_rate_limit: IORateLimitConfig::default(), + api_version: ApiVersion::V1, } } } @@ -100,7 +105,45 @@ impl Config { ); self.scheduler_concurrency = MAX_SCHED_CONCURRENCY; } - self.io_rate_limit.validate() + self.io_rate_limit.validate()?; + if self.api_version == ApiVersion::V2 && !self.enable_ttl { + warn!("storage.enable_ttl is deprecated in API V2 since API V2 forces to enable TTL."); + self.enable_ttl = true; + }; + + Ok(()) + } +} + +mod api_version_serde { + use kvproto::kvrpcpb::ApiVersion; + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(value: &ApiVersion, serializer: S) -> Result + where + S: Serializer, + { + Ok(match value { + ApiVersion::V1 => serializer.serialize_u32(1)?, + ApiVersion::V2 => serializer.serialize_u32(2)?, + }) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let value = u32::deserialize(deserializer)?; + Ok(match value { + 1 => ApiVersion::V1, + 2 => ApiVersion::V2, + _ => { + return Err(serde::de::Error::custom(format!( + "unknown storage.api_version: {}", + value + ))); + } + }) } } diff --git a/src/storage/errors.rs b/src/storage/errors.rs index 38536c9debc..c40526333b0 100644 --- a/src/storage/errors.rs +++ b/src/storage/errors.rs @@ -13,7 +13,7 @@ use tikv_util::deadline::DeadlineError; use txn_types::{KvPair, TimeStamp}; use crate::storage::{ - kv::{self, Error as EngineError, ErrorInner as EngineErrorInner}, + kv::{self, Error as KvError, ErrorInner as KvErrorInner}, mvcc::{Error as MvccError, ErrorInner as MvccErrorInner}, txn::{self, Error as TxnError, ErrorInner as TxnErrorInner}, Result, @@ -24,11 +24,14 @@ use crate::storage::{ /// handling functionality in a single place instead of being spread out. pub enum ErrorInner { #[error("{0}")] - Engine(#[from] kv::Error), + Kv(#[from] kv::Error), #[error("{0}")] Txn(#[from] txn::Error), + #[error("{0}")] + Engine(#[from] engine_traits::Error), + #[error("storage is closed.")] Closed, @@ -89,8 +92,9 @@ impl> From for Error { impl ErrorCodeExt for Error { fn error_code(&self) -> ErrorCode { match self.0.as_ref() { - ErrorInner::Engine(e) => e.error_code(), + ErrorInner::Kv(e) => e.error_code(), ErrorInner::Txn(e) => e.error_code(), + ErrorInner::Engine(e) => e.error_code(), ErrorInner::Closed => error_code::storage::CLOSED, ErrorInner::Other(_) => error_code::storage::UNKNOWN, ErrorInner::Io(_) => error_code::storage::IO, @@ -181,12 +185,12 @@ pub fn get_tag_from_header(header: &errorpb::Error) -> &'static str { pub fn extract_region_error(res: &Result) -> Option { match *res { // TODO: use `Error::cause` instead. - Err(Error(box ErrorInner::Engine(EngineError(box EngineErrorInner::Request(ref e))))) - | Err(Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Engine(EngineError( - box EngineErrorInner::Request(ref e), + Err(Error(box ErrorInner::Kv(KvError(box KvErrorInner::Request(ref e))))) + | Err(Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Engine(KvError( + box KvErrorInner::Request(ref e), )))))) | Err(Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError( - box MvccErrorInner::Engine(EngineError(box EngineErrorInner::Request(ref e))), + box MvccErrorInner::Kv(KvError(box KvErrorInner::Request(ref e))), )))))) => Some(e.to_owned()), Err(Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::MaxTimestampNotSynced { .. @@ -242,10 +246,10 @@ pub fn extract_key_error(err: &Error) -> kvrpcpb::KeyError { Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError( box MvccErrorInner::KeyIsLocked(info), ))))) - | Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Engine(EngineError( - box EngineErrorInner::KeyIsLocked(info), + | Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Engine(KvError( + box KvErrorInner::KeyIsLocked(info), ))))) - | Error(box ErrorInner::Engine(EngineError(box EngineErrorInner::KeyIsLocked(info)))) => { + | Error(box ErrorInner::Kv(KvError(box KvErrorInner::KeyIsLocked(info)))) => { key_error.set_locked(info.clone()); } // failed in prewrite or pessimistic lock diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 073bf42bbdb..d1c59b65024 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -78,13 +78,19 @@ use crate::storage::{ types::StorageCallbackType, }; use concurrency_manager::ConcurrencyManager; -use engine_traits::{CfName, CF_DEFAULT, DATA_CFS}; +use engine_traits::{ + CfName, Iterable, KvEngine, Peekable, SyncMutable, CF_DEFAULT, DATA_CFS, DATA_KEY_PREFIX_LEN, +}; use futures::prelude::*; +use kvproto::kvrpcpb::ApiVersion; use kvproto::kvrpcpb::{ ChecksumAlgorithm, CommandPri, Context, GetRequest, IsolationLevel, KeyRange, LockInfo, RawGetRequest, }; use kvproto::pdpb::QueryKind; +use kvproto::raft_serverpb::DataEncode; +use protobuf::well_known_types::Int32Value; +use protobuf::ProtobufEnum; use raftstore::store::util::build_key_range; use raftstore::store::{ReadStats, WriteStats}; use rand::prelude::*; @@ -208,6 +214,12 @@ impl Storage { flow_controller: Arc, reporter: R, ) -> Result { + let config_data_encode = match config.api_version { + ApiVersion::V1 => DataEncode::V1, + ApiVersion::V2 => DataEncode::V2, + }; + Self::ensure_to_use_data_encode(&engine, config_data_encode)?; + let sched = TxnScheduler::new( engine.clone(), lock_mgr, @@ -316,6 +328,60 @@ impl Storage { true } + /// Change the data encode flag in kvdb if the config file has a different value. + /// During the switch bewteen V1 and V2, only TiDB data are allowed to exist, otherwise return error. + fn ensure_to_use_data_encode(engine: &E, config_data_encode: DataEncode) -> Result<()> { + let kv = engine.kv_engine(); + let kv_data_encode = kv + .get_msg::(keys::DATA_ENCODE_KEY)? + .unwrap_or_default() + .value; + let kv_data_encode = DataEncode::from_i32(kv_data_encode).expect("unknown data encode"); + if kv_data_encode != config_data_encode { + // Check if there are only TiDB data in the engine + let snapshot = kv.snapshot(); + for cf in DATA_CFS { + for (start, end) in keys::DATA_TIDB_RANGES_COMPLEMENT { + let mut unexpected_data_key = None; + snapshot.scan_cf( + cf, + &keys::data_key(start), + &keys::data_key(end), + false, + |key, _| { + unexpected_data_key = Some(key[DATA_KEY_PREFIX_LEN..].to_vec()); + Ok(false) + }, + )?; + if let Some(unexpected_data_key) = unexpected_data_key { + error!( + "unable to switch data encode (triggered by switching storage.api_version)"; + "current" => ?kv_data_encode, + "target" => ?config_data_encode, + "found data key that is not written by TiDB" => log_wrappers::hex_encode_upper(&unexpected_data_key), + ); + return Err(box_err!( + "unable to switch data encode (triggered by switching storage.api_version) from {:?} to {:?} \ + because found data key that is not written by TiDB: {:?}", + kv_data_encode, + config_data_encode, + log_wrappers::hex_encode_upper(&unexpected_data_key) + )); + } + } + } + // Check completed. Switch the encode flag. + kv.put_msg( + keys::DATA_ENCODE_KEY, + &Int32Value { + value: config_data_encode.value(), + ..Default::default() + }, + )?; + } + Ok(()) + } + /// Get value of the given key from a snapshot. /// /// Only writes that are committed before `start_ts` are visible. @@ -2020,6 +2086,11 @@ impl TestStorageBuilder { self } + pub fn set_api_version(mut self, api_version: ApiVersion) -> Self { + self.config.api_version = api_version; + self + } + /// Build a `Storage`. pub fn build(self) -> Result> { let read_pool = build_read_pool_for_test( @@ -2251,7 +2322,7 @@ mod tests { use crate::storage::txn::tests::must_rollback; use crate::storage::{ config::BlockCacheConfig, - kv::{Error as EngineError, ErrorInner as EngineErrorInner}, + kv::{Error as KvError, ErrorInner as EngineErrorInner}, lock_manager::{Lock, WaitTimeout}, mvcc::{Error as MvccError, ErrorInner as MvccErrorInner}, raw::ttl::current_ts, @@ -2262,7 +2333,7 @@ mod tests { use engine_traits::{ALL_CFS, CF_LOCK, CF_RAFT, CF_WRITE}; use errors::extract_key_error; use futures::executor::block_on; - use kvproto::kvrpcpb::{CommandPri, Op}; + use kvproto::kvrpcpb::{ApiVersion, CommandPri, Op}; use std::{ sync::{ atomic::{AtomicBool, Ordering}, @@ -2393,9 +2464,7 @@ mod tests { ), expect_fail_callback(tx, 0, |e| match e { Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Mvcc(mvcc::Error( - box mvcc::ErrorInner::Engine(EngineError(box EngineErrorInner::Request( - .., - ))), + box mvcc::ErrorInner::Kv(KvError(box EngineErrorInner::Request(..))), ))))) => {} e => panic!("unexpected error chain: {:?}", e), }), @@ -2405,7 +2474,7 @@ mod tests { expect_error( |e| match e { Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Mvcc(mvcc::Error( - box mvcc::ErrorInner::Engine(EngineError(box EngineErrorInner::Request(..))), + box mvcc::ErrorInner::Kv(KvError(box EngineErrorInner::Request(..))), ))))) => (), e => panic!("unexpected error chain: {:?}", e), }, @@ -2414,7 +2483,7 @@ mod tests { expect_error( |e| match e { Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Mvcc(mvcc::Error( - box mvcc::ErrorInner::Engine(EngineError(box EngineErrorInner::Request(..))), + box mvcc::ErrorInner::Kv(KvError(box EngineErrorInner::Request(..))), ))))) => (), e => panic!("unexpected error chain: {:?}", e), }, @@ -2432,7 +2501,7 @@ mod tests { expect_error( |e| match e { Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Mvcc(mvcc::Error( - box mvcc::ErrorInner::Engine(EngineError(box EngineErrorInner::Request(..))), + box mvcc::ErrorInner::Kv(KvError(box EngineErrorInner::Request(..))), ))))) => (), e => panic!("unexpected error chain: {:?}", e), }, @@ -2455,9 +2524,7 @@ mod tests { expect_error( |e| match e { Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Mvcc(mvcc::Error( - box mvcc::ErrorInner::Engine(EngineError(box EngineErrorInner::Request( - .., - ))), + box mvcc::ErrorInner::Kv(KvError(box EngineErrorInner::Request(..))), ))))) => {} e => panic!("unexpected error chain: {:?}", e), }, @@ -7001,4 +7068,92 @@ mod tests { .unwrap(); rx.recv().unwrap(); } + + #[test] + fn test_switch_api_version() { + let engine = TestEngineBuilder::new().build().unwrap(); + + // Write TiDB data. + let tidb_key = keys::data_key(b"m_tidb_data"); + engine + .put( + &Context::default(), + Key::from_raw(&tidb_key), + b"val".to_vec(), + ) + .unwrap(); + + // Default API Version is V1, should be ablle to swith to V2. + let storage = TestStorageBuilder::<_, DummyLockManager>::from_engine_and_lock_mgr( + engine.clone(), + DummyLockManager {}, + ) + .set_api_version(ApiVersion::V2) + .build() + .unwrap(); + drop(storage); + + // Should be able to switch back to V1. + let storage = TestStorageBuilder::<_, DummyLockManager>::from_engine_and_lock_mgr( + engine.clone(), + DummyLockManager {}, + ) + .set_api_version(ApiVersion::V1) + .build() + .unwrap(); + drop(storage); + + // Write non-TiDB data. + let non_tidb_key = keys::data_key(b"k1"); + engine + .put( + &Context::default(), + Key::from_raw(&non_tidb_key), + b"val".to_vec(), + ) + .unwrap(); + + // Should not able to switch from V1 to V2 now. + assert!( + TestStorageBuilder::<_, DummyLockManager>::from_engine_and_lock_mgr( + engine, + DummyLockManager {}, + ) + .set_api_version(ApiVersion::V2) + .build() + .is_err() + ); + + // Prepare a new storage and switch it to V2. + let engine = TestEngineBuilder::new().build().unwrap(); + let storage = TestStorageBuilder::<_, DummyLockManager>::from_engine_and_lock_mgr( + engine.clone(), + DummyLockManager {}, + ) + .set_api_version(ApiVersion::V2) + .build() + .unwrap(); + drop(storage); + + // Write non-TiDB data. + let non_tidb_key = keys::data_key(b"k1"); + engine + .put( + &Context::default(), + Key::from_raw(&non_tidb_key), + b"val".to_vec(), + ) + .unwrap(); + + // Should not able to switch from V2 to V1 now. + assert!( + TestStorageBuilder::<_, DummyLockManager>::from_engine_and_lock_mgr( + engine, + DummyLockManager {}, + ) + .set_api_version(ApiVersion::V1) + .build() + .is_err() + ); + } } diff --git a/src/storage/mvcc/mod.rs b/src/storage/mvcc/mod.rs index d104a043b55..713d9a1d792 100644 --- a/src/storage/mvcc/mod.rs +++ b/src/storage/mvcc/mod.rs @@ -28,7 +28,7 @@ use tikv_util::{panic_when_unexpected_key_or_data, set_panic_mark}; #[derive(Debug, Error)] pub enum ErrorInner { #[error("{0}")] - Engine(#[from] crate::storage::kv::Error), + Kv(#[from] crate::storage::kv::Error), #[error("{0}")] Io(#[from] io::Error), @@ -152,7 +152,7 @@ pub enum ErrorInner { impl ErrorInner { pub fn maybe_clone(&self) -> Option { match self { - ErrorInner::Engine(e) => e.maybe_clone().map(ErrorInner::Engine), + ErrorInner::Kv(e) => e.maybe_clone().map(ErrorInner::Kv), ErrorInner::Codec(e) => e.maybe_clone().map(ErrorInner::Codec), ErrorInner::KeyIsLocked(info) => Some(ErrorInner::KeyIsLocked(info.clone())), ErrorInner::BadFormat(e) => e.maybe_clone().map(ErrorInner::BadFormat), @@ -313,7 +313,7 @@ pub type Result = std::result::Result; impl ErrorCodeExt for Error { fn error_code(&self) -> ErrorCode { match self.0.as_ref() { - ErrorInner::Engine(e) => e.error_code(), + ErrorInner::Kv(e) => e.error_code(), ErrorInner::Io(_) => error_code::storage::IO, ErrorInner::Codec(e) => e.error_code(), ErrorInner::KeyIsLocked(_) => error_code::storage::KEY_IS_LOCKED, diff --git a/tests/failpoints/cases/test_storage.rs b/tests/failpoints/cases/test_storage.rs index 0d4cc5a990f..cf1bcb52e2f 100644 --- a/tests/failpoints/cases/test_storage.rs +++ b/tests/failpoints/cases/test_storage.rs @@ -81,7 +81,7 @@ fn test_scheduler_leader_change_twice() { Err(Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Engine(KvError( box KvErrorInner::Request(ref e), )))))) - | Err(Error(box ErrorInner::Engine(KvError(box KvErrorInner::Request(ref e))))) => { + | Err(Error(box ErrorInner::Kv(KvError(box KvErrorInner::Request(ref e))))) => { assert!(e.has_stale_command(), "{:?}", e); } res => { diff --git a/tests/integrations/config/mod.rs b/tests/integrations/config/mod.rs index 8483e33c6a9..f8bdc9a82b2 100644 --- a/tests/integrations/config/mod.rs +++ b/tests/integrations/config/mod.rs @@ -5,6 +5,7 @@ use std::io::Read; use std::iter::FromIterator; use std::path::PathBuf; +use kvproto::kvrpcpb::ApiVersion; use slog::Level; use batch_system::Config as BatchSystemConfig; @@ -617,6 +618,7 @@ fn test_serde_custom_tikv_config() { enable_async_apply_prewrite: true, enable_ttl: true, ttl_check_poll_interval: ReadableDuration::hours(0), + api_version: ApiVersion::V1, flow_control: FlowControlConfig { enable: false, l0_files_threshold: 10,