Skip to content

Commit

Permalink
storage: Add config storage.api_version and add DataEncode to kvdb (t…
Browse files Browse the repository at this point in the history
…ikv#11021)

* storage: Add config storage.api_version and add DataEncode to kvdb

Signed-off-by: Andy Lok <[email protected]>

* Fix TiDB data range

Signed-off-by: Andy Lok <[email protected]>

* Add test

Signed-off-by: Andy Lok <[email protected]>

* fix text

Signed-off-by: Andy Lok <[email protected]>

* fix ci

Signed-off-by: Andy Lok <[email protected]>

* edit kvproto

Signed-off-by: Andy Lok <[email protected]>

* fix proto

Signed-off-by: Andy Lok <[email protected]>

* fix test

Signed-off-by: Andy Lok <[email protected]>

* address comment

Signed-off-by: Andy Lok <[email protected]>

* fix test

Signed-off-by: Andy Lok <[email protected]>

* address comment

Signed-off-by: Andy Lok <[email protected]>

* fix test

Signed-off-by: Andy Lok <[email protected]>

* fix test

Signed-off-by: Andy Lok <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
andylokandy and ti-chi-bot authored Oct 11, 2021
1 parent 3adc033 commit 6d0edd7
Show file tree
Hide file tree
Showing 15 changed files with 273 additions and 66 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions components/backup/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use kvproto::brpb::Error as ErrorPb;
use kvproto::errorpb::{Error as RegionError, ServerIsBusy};
use kvproto::kvrpcpb::KeyError;
use thiserror::Error;
use tikv::storage::kv::{Error as EngineError, ErrorInner as EngineErrorInner};
use tikv::storage::kv::{Error as KvError, ErrorInner as EngineErrorInner};
use tikv::storage::mvcc::{Error as MvccError, ErrorInner as MvccErrorInner};
use tikv::storage::txn::{Error as TxnError, ErrorInner as TxnErrorInner};

Expand All @@ -26,13 +26,13 @@ impl From<Error> for ErrorPb {
err.mut_cluster_id_error().set_current(current);
err.mut_cluster_id_error().set_request(request);
}
Error::Engine(EngineError(box EngineErrorInner::Request(e)))
| Error::Txn(TxnError(box TxnErrorInner::Engine(EngineError(
Error::Kv(KvError(box EngineErrorInner::Request(e)))
| Error::Txn(TxnError(box TxnErrorInner::Engine(KvError(
box EngineErrorInner::Request(e),
))))
| Error::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError(
box MvccErrorInner::Engine(EngineError(box EngineErrorInner::Request(e))),
)))) => {
| Error::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError(box MvccErrorInner::Kv(
KvError(box EngineErrorInner::Request(e)),
))))) => {
if e.has_not_leader() {
BACKUP_RANGE_ERROR_VEC
.with_label_values(&["not_leader"])
Expand Down Expand Up @@ -75,7 +75,7 @@ impl From<Error> for ErrorPb {
e.set_locked(info);
err.set_kv_error(e);
}
timeout @ Error::Engine(EngineError(box EngineErrorInner::Timeout(_))) => {
timeout @ Error::Kv(KvError(box EngineErrorInner::Timeout(_))) => {
BACKUP_RANGE_ERROR_VEC.with_label_values(&["timeout"]).inc();
let mut busy = ServerIsBusy::default();
let reason = format!("{}", timeout);
Expand Down Expand Up @@ -104,7 +104,7 @@ pub enum Error {
#[error("IO error {0}")]
Io(#[from] IoError),
#[error("Engine error {0}")]
Engine(#[from] EngineError),
Kv(#[from] KvError),
#[error("Engine error {0}")]
EngineTrait(#[from] EngineTraitError),
#[error("Transaction error {0}")]
Expand Down
20 changes: 10 additions & 10 deletions components/cdc/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{error, result};
use engine_traits::Error as EngineTraitsError;
use kvproto::errorpb;
use thiserror::Error;
use tikv::storage::kv::{Error as EngineError, ErrorInner as EngineErrorInner};
use tikv::storage::kv::{Error as KvError, ErrorInner as EngineErrorInner};
use tikv::storage::mvcc::{Error as MvccError, ErrorInner as MvccErrorInner};
use tikv::storage::txn::{Error as TxnError, ErrorInner as TxnErrorInner};
use txn_types::Error as TxnTypesError;
Expand All @@ -23,7 +23,7 @@ pub enum Error {
#[error("IO error {0}")]
Io(#[from] IoError),
#[error("Engine error {0}")]
Engine(#[from] EngineError),
Kv(#[from] KvError),
#[error("Transaction error {0}")]
Txn(#[from] TxnError),
#[error("Mvcc error {0}")]
Expand Down Expand Up @@ -63,26 +63,26 @@ impl Error {
pub fn has_region_error(&self) -> bool {
matches!(
self,
Error::Engine(EngineError(box EngineErrorInner::Request(_)))
| Error::Txn(TxnError(box TxnErrorInner::Engine(EngineError(
Error::Kv(KvError(box EngineErrorInner::Request(_)))
| Error::Txn(TxnError(box TxnErrorInner::Engine(KvError(
box EngineErrorInner::Request(_),
))))
| Error::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError(
box MvccErrorInner::Engine(EngineError(box EngineErrorInner::Request(_))),
box MvccErrorInner::Kv(KvError(box EngineErrorInner::Request(_))),
))))
| Error::Request(_)
)
}

pub fn extract_region_error(self) -> errorpb::Error {
match self {
Error::Engine(EngineError(box EngineErrorInner::Request(e)))
| Error::Txn(TxnError(box TxnErrorInner::Engine(EngineError(
Error::Kv(KvError(box EngineErrorInner::Request(e)))
| Error::Txn(TxnError(box TxnErrorInner::Engine(KvError(
box EngineErrorInner::Request(e),
))))
| Error::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError(
box MvccErrorInner::Engine(EngineError(box EngineErrorInner::Request(e))),
))))
| Error::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError(box MvccErrorInner::Kv(
KvError(box EngineErrorInner::Request(e)),
)))))
| Error::Request(box e) => e,
// TODO: it should be None, add more cdc errors.
other => {
Expand Down
1 change: 1 addition & 0 deletions components/external_storage/export/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ fn create_backend_inner(backend: &Backend) -> io::Result<Box<dyn ExternalStorage
return Err(bad_backend(Backend::CloudDynamic(dyn_backend.clone())));
}
},
Backend::Hdfs(..) => unimplemented!(),
#[cfg(not(any(feature = "cloud-gcp", feature = "cloud-aws")))]
_ => return Err(bad_backend(backend.clone())),
};
Expand Down
4 changes: 4 additions & 0 deletions components/keys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ pub const DATA_PREFIX: u8 = b'z';
pub const DATA_PREFIX_KEY: &[u8] = &[DATA_PREFIX];
pub const DATA_MIN_KEY: &[u8] = &[DATA_PREFIX];
pub const DATA_MAX_KEY: &[u8] = &[DATA_PREFIX + 1];
pub const DATA_TIDB_RANGES: &[(&[u8], &[u8])] = &[(&[b'm'], &[b'm' + 1]), (&[b't'], &[b't' + 1])];
pub const DATA_TIDB_RANGES_COMPLEMENT: &[(&[u8], &[u8])] =
&[(&[], &[b'm']), (&[b'm' + 1], &[b't']), (&[b't' + 1], &[])];

// Following keys are all local keys, so the first byte must be 0x01.
pub const STORE_IDENT_KEY: &[u8] = &[LOCAL_PREFIX, 0x01];
pub const PREPARE_BOOTSTRAP_KEY: &[u8] = &[LOCAL_PREFIX, 0x02];
pub const DATA_ENCODE_KEY: &[u8] = &[LOCAL_PREFIX, 0x04];
// We save two types region data in DB, for raft and other meta data.
// When the store starts, we should iterate all region meta data to
// construct peer, no need to travel large raft data, so we separate them
Expand Down
14 changes: 7 additions & 7 deletions components/resolved_ts/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use engine_traits::Error as EngineTraitsError;
use kvproto::errorpb::Error as ErrorHeader;
use raftstore::Error as RaftstoreError;
use thiserror::Error;
use tikv::storage::kv::{Error as EngineError, ErrorInner as EngineErrorInner};
use tikv::storage::kv::{Error as KvError, ErrorInner as EngineErrorInner};
use tikv::storage::mvcc::{Error as MvccError, ErrorInner as MvccErrorInner};
use tikv::storage::txn::{Error as TxnError, ErrorInner as TxnErrorInner};
use txn_types::Error as TxnTypesError;
Expand All @@ -16,7 +16,7 @@ pub enum Error {
#[error("IO error {0}")]
Io(#[from] IoError),
#[error("Engine error {0}")]
Engine(#[from] EngineError),
Kv(#[from] KvError),
#[error("Transaction error {0}")]
Txn(#[from] TxnError),
#[error("Mvcc error {0}")]
Expand All @@ -40,13 +40,13 @@ impl Error {

pub fn extract_error_header(self) -> ErrorHeader {
match self {
Error::Engine(EngineError(box EngineErrorInner::Request(e)))
| Error::Txn(TxnError(box TxnErrorInner::Engine(EngineError(
Error::Kv(KvError(box EngineErrorInner::Request(e)))
| Error::Txn(TxnError(box TxnErrorInner::Engine(KvError(
box EngineErrorInner::Request(e),
))))
| Error::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError(
box MvccErrorInner::Engine(EngineError(box EngineErrorInner::Request(e))),
))))
| Error::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError(box MvccErrorInner::Kv(
KvError(box EngineErrorInner::Request(e)),
)))))
| Error::Request(box e) => e,
other => {
let mut e = ErrorHeader::default();
Expand Down
6 changes: 2 additions & 4 deletions components/test_storage/src/assert_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,12 @@ impl<E: Engine> AssertionStorage<E> {
fn expect_not_leader_or_stale_command(&self, err: storage::Error) {
match err {
StorageError(box StorageErrorInner::Txn(TxnError(box TxnErrorInner::Mvcc(
MvccError(box MvccErrorInner::Engine(KvError(box KvErrorInner::Request(ref e)))),
MvccError(box MvccErrorInner::Kv(KvError(box KvErrorInner::Request(ref e)))),
))))
| StorageError(box StorageErrorInner::Txn(TxnError(box TxnErrorInner::Engine(
KvError(box KvErrorInner::Request(ref e)),
))))
| StorageError(box StorageErrorInner::Engine(KvError(box KvErrorInner::Request(
ref e,
)))) => {
| StorageError(box StorageErrorInner::Kv(KvError(box KvErrorInner::Request(ref e)))) => {
assert!(
e.has_not_leader() | e.has_stale_command(),
"invalid error {:?}",
Expand Down
4 changes: 2 additions & 2 deletions src/coprocessor/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl From<MvccError> for Error {
fn from(err: MvccError) -> Self {
match err {
MvccError(box MvccErrorInner::KeyIsLocked(info)) => Error::Locked(info),
MvccError(box MvccErrorInner::Engine(engine_error)) => Error::from(engine_error),
MvccError(box MvccErrorInner::Kv(kv_error)) => Error::from(kv_error),
e => Error::Other(e.to_string()),
}
}
Expand All @@ -88,7 +88,7 @@ impl From<TxnError> for Error {
fn from(err: storage::txn::Error) -> Self {
match err {
TxnError(box TxnErrorInner::Mvcc(mvcc_error)) => Error::from(mvcc_error),
TxnError(box TxnErrorInner::Engine(engine_error)) => Error::from(engine_error),
TxnError(box TxnErrorInner::Engine(kv_error)) => Error::from(kv_error),
e => Error::Other(e.to_string()),
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/coprocessor_v2/raw_storage_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::ops::Range;
use tikv_util::future::paired_future_callback;

use crate::storage::errors::extract_kv_pairs;
use crate::storage::kv::{Error as EngineError, ErrorInner as EngineErrorInner};
use crate::storage::kv::{Error as KvError, ErrorInner as KvErrorInner};
use crate::storage::{self, lock_manager::LockManager, Engine, Storage};

/// Implementation of the [`RawStorage`] trait.
Expand Down Expand Up @@ -173,9 +173,9 @@ impl From<storage::errors::Error> for PluginErrorShim {
fn from(error: storage::errors::Error) -> Self {
let inner = match *error.0 {
// Key not in region
storage::errors::ErrorInner::Engine(EngineError(box EngineErrorInner::Request(
ref req_err,
))) if req_err.has_key_not_in_region() => {
storage::errors::ErrorInner::Kv(KvError(box KvErrorInner::Request(ref req_err)))
if req_err.has_key_not_in_region() =>
{
let key_err = req_err.get_key_not_in_region();
PluginError::KeyNotInRegion {
key: key_err.get_key().to_owned(),
Expand All @@ -185,9 +185,9 @@ impl From<storage::errors::Error> for PluginErrorShim {
}
}
// Timeout
storage::errors::ErrorInner::Engine(EngineError(box EngineErrorInner::Timeout(
duration,
))) => PluginError::Timeout(duration),
storage::errors::ErrorInner::Kv(KvError(box KvErrorInner::Timeout(duration))) => {
PluginError::Timeout(duration)
}
// Other errors are passed as-is inside their `Result` so we get a `&Result` when using `Any::downcast_ref`.
_ => PluginError::Other(Box::new(storage::Result::<()>::Err(error))),
};
Expand Down
45 changes: 44 additions & 1 deletion src/storage/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::storage::txn::flow_controller::FlowController;
use engine_rocks::raw::{Cache, LRUCacheOptions, MemoryAllocator};
use engine_traits::{ColumnFamilyOptions, KvEngine, CF_DEFAULT};
use file_system::{get_io_rate_limiter, IOPriority, IORateLimitMode, IORateLimiter, IOType};
use kvproto::kvrpcpb::ApiVersion;
use libc::c_int;
use online_config::{ConfigChange, ConfigManager, ConfigValue, OnlineConfig, Result as CfgResult};
use std::error::Error;
Expand Down Expand Up @@ -58,6 +59,9 @@ pub struct Config {
pub enable_ttl: bool,
/// Interval to check TTL for all SSTs,
pub ttl_check_poll_interval: ReadableDuration,
#[online_config(skip)]
#[serde(with = "api_version_serde")]
pub api_version: ApiVersion,
#[online_config(submodule)]
pub flow_control: FlowControlConfig,
#[online_config(submodule)]
Expand All @@ -83,6 +87,7 @@ impl Default for Config {
flow_control: FlowControlConfig::default(),
block_cache: BlockCacheConfig::default(),
io_rate_limit: IORateLimitConfig::default(),
api_version: ApiVersion::V1,
}
}
}
Expand All @@ -100,7 +105,45 @@ impl Config {
);
self.scheduler_concurrency = MAX_SCHED_CONCURRENCY;
}
self.io_rate_limit.validate()
self.io_rate_limit.validate()?;
if self.api_version == ApiVersion::V2 && !self.enable_ttl {
warn!("storage.enable_ttl is deprecated in API V2 since API V2 forces to enable TTL.");
self.enable_ttl = true;
};

Ok(())
}
}

mod api_version_serde {
use kvproto::kvrpcpb::ApiVersion;
use serde::{Deserialize, Deserializer, Serializer};

pub fn serialize<S>(value: &ApiVersion, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
Ok(match value {
ApiVersion::V1 => serializer.serialize_u32(1)?,
ApiVersion::V2 => serializer.serialize_u32(2)?,
})
}

pub fn deserialize<'de, D>(deserializer: D) -> Result<ApiVersion, D::Error>
where
D: Deserializer<'de>,
{
let value = u32::deserialize(deserializer)?;
Ok(match value {
1 => ApiVersion::V1,
2 => ApiVersion::V2,
_ => {
return Err(serde::de::Error::custom(format!(
"unknown storage.api_version: {}",
value
)));
}
})
}
}

Expand Down
24 changes: 14 additions & 10 deletions src/storage/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tikv_util::deadline::DeadlineError;
use txn_types::{KvPair, TimeStamp};

use crate::storage::{
kv::{self, Error as EngineError, ErrorInner as EngineErrorInner},
kv::{self, Error as KvError, ErrorInner as KvErrorInner},
mvcc::{Error as MvccError, ErrorInner as MvccErrorInner},
txn::{self, Error as TxnError, ErrorInner as TxnErrorInner},
Result,
Expand All @@ -24,11 +24,14 @@ use crate::storage::{
/// handling functionality in a single place instead of being spread out.
pub enum ErrorInner {
#[error("{0}")]
Engine(#[from] kv::Error),
Kv(#[from] kv::Error),

#[error("{0}")]
Txn(#[from] txn::Error),

#[error("{0}")]
Engine(#[from] engine_traits::Error),

#[error("storage is closed.")]
Closed,

Expand Down Expand Up @@ -89,8 +92,9 @@ impl<T: Into<ErrorInner>> From<T> for Error {
impl ErrorCodeExt for Error {
fn error_code(&self) -> ErrorCode {
match self.0.as_ref() {
ErrorInner::Engine(e) => e.error_code(),
ErrorInner::Kv(e) => e.error_code(),
ErrorInner::Txn(e) => e.error_code(),
ErrorInner::Engine(e) => e.error_code(),
ErrorInner::Closed => error_code::storage::CLOSED,
ErrorInner::Other(_) => error_code::storage::UNKNOWN,
ErrorInner::Io(_) => error_code::storage::IO,
Expand Down Expand Up @@ -181,12 +185,12 @@ pub fn get_tag_from_header(header: &errorpb::Error) -> &'static str {
pub fn extract_region_error<T>(res: &Result<T>) -> Option<errorpb::Error> {
match *res {
// TODO: use `Error::cause` instead.
Err(Error(box ErrorInner::Engine(EngineError(box EngineErrorInner::Request(ref e)))))
| Err(Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Engine(EngineError(
box EngineErrorInner::Request(ref e),
Err(Error(box ErrorInner::Kv(KvError(box KvErrorInner::Request(ref e)))))
| Err(Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Engine(KvError(
box KvErrorInner::Request(ref e),
))))))
| Err(Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError(
box MvccErrorInner::Engine(EngineError(box EngineErrorInner::Request(ref e))),
box MvccErrorInner::Kv(KvError(box KvErrorInner::Request(ref e))),
)))))) => Some(e.to_owned()),
Err(Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::MaxTimestampNotSynced {
..
Expand Down Expand Up @@ -242,10 +246,10 @@ pub fn extract_key_error(err: &Error) -> kvrpcpb::KeyError {
Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError(
box MvccErrorInner::KeyIsLocked(info),
)))))
| Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Engine(EngineError(
box EngineErrorInner::KeyIsLocked(info),
| Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Engine(KvError(
box KvErrorInner::KeyIsLocked(info),
)))))
| Error(box ErrorInner::Engine(EngineError(box EngineErrorInner::KeyIsLocked(info)))) => {
| Error(box ErrorInner::Kv(KvError(box KvErrorInner::KeyIsLocked(info)))) => {
key_error.set_locked(info.clone());
}
// failed in prewrite or pessimistic lock
Expand Down
Loading

0 comments on commit 6d0edd7

Please sign in to comment.