Skip to content

Commit

Permalink
storage: add tests for pipelined pessimistic lock (tikv#7706)
Browse files Browse the repository at this point in the history
Signed-off-by: youjiali1995 <[email protected]>
  • Loading branch information
youjiali1995 authored May 13, 2020
1 parent 18675bf commit 3a1002b
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 90 deletions.
4 changes: 4 additions & 0 deletions src/storage/kv/rocksdb_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ impl TestEngineBuilder {
}

fn write_modifies(engine: &Engines, modifies: Vec<Modify>) -> Result<()> {
fail_point!("rockskv_write_modifies", |_| Err(box_err!("write failed")));

let mut wb = engine.kv.c().write_batch();
for rev in modifies {
let res = match rev {
Expand Down Expand Up @@ -260,6 +262,8 @@ impl Engine for RocksEngine {
type Snap = RocksSnapshot;

fn async_write(&self, _: &Context, modifies: Vec<Modify>, cb: Callback<()>) -> Result<()> {
fail_point!("rockskv_async_write", |_| Err(box_err!("write failed")));

if modifies.is_empty() {
return Err(Error::from(ErrorInner::EmptyRequest));
}
Expand Down
178 changes: 93 additions & 85 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1362,48 +1362,28 @@ impl<E: Engine, L: LockManager> TestStorageBuilder<E, L> {
}
}

#[cfg(test)]
mod tests {
pub mod test_util {
use super::*;

use crate::config::TitanDBConfig;
use crate::storage::{
config::BlockCacheConfig,
lock_manager::{Lock, WaitTimeout},
mvcc::{Error as MvccError, ErrorInner as MvccErrorInner},
txn::{commands, Error as TxnError, ErrorInner as TxnErrorInner},
};
use engine::rocks::util::CFOptions;
use engine_traits::{CF_LOCK, CF_RAFT, CF_WRITE};
use futures03::executor::block_on;
use kvproto::kvrpcpb::{CommandPri, LockInfo};
use crate::storage::txn::commands;
use std::{
fmt::Debug,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{channel, Sender},
Arc,
},
time::Duration,
sync::mpsc::{channel, Sender},
};
use tikv_util::collections::HashMap;
use tikv_util::config::ReadableSize;
use txn_types::Mutation;

fn expect_none(x: Result<Option<Value>>) {
pub fn expect_none(x: Result<Option<Value>>) {
assert_eq!(x.unwrap(), None);
}

fn expect_value(v: Vec<u8>, x: Result<Option<Value>>) {
pub fn expect_value(v: Vec<u8>, x: Result<Option<Value>>) {
assert_eq!(x.unwrap().unwrap(), v);
}

fn expect_multi_values(v: Vec<Option<KvPair>>, x: Result<Vec<Result<KvPair>>>) {
pub fn expect_multi_values(v: Vec<Option<KvPair>>, x: Result<Vec<Result<KvPair>>>) {
let x: Vec<Option<KvPair>> = x.unwrap().into_iter().map(Result::ok).collect();
assert_eq!(x, v);
}

fn expect_error<T, F>(err_matcher: F, x: Result<T>)
pub fn expect_error<T, F>(err_matcher: F, x: Result<T>)
where
F: FnOnce(Error) + Send + 'static,
{
Expand All @@ -1413,14 +1393,14 @@ mod tests {
}
}

fn expect_ok_callback<T: Debug>(done: Sender<i32>, id: i32) -> Callback<T> {
pub fn expect_ok_callback<T: Debug>(done: Sender<i32>, id: i32) -> Callback<T> {
Box::new(move |x: Result<T>| {
x.unwrap();
done.send(id).unwrap();
})
}

fn expect_fail_callback<T, F>(done: Sender<i32>, id: i32, err_matcher: F) -> Callback<T>
pub fn expect_fail_callback<T, F>(done: Sender<i32>, id: i32, err_matcher: F) -> Callback<T>
where
F: FnOnce(Error) + Send + 'static,
{
Expand All @@ -1430,7 +1410,7 @@ mod tests {
})
}

fn expect_too_busy_callback<T>(done: Sender<i32>, id: i32) -> Callback<T> {
pub fn expect_too_busy_callback<T>(done: Sender<i32>, id: i32) -> Callback<T> {
Box::new(move |x: Result<T>| {
expect_error(
|err| match err {
Expand All @@ -1443,7 +1423,7 @@ mod tests {
})
}

fn expect_value_callback<T: PartialEq + Debug + Send + 'static>(
pub fn expect_value_callback<T: PartialEq + Debug + Send + 'static>(
done: Sender<i32>,
id: i32,
value: T,
Expand All @@ -1454,6 +1434,88 @@ mod tests {
})
}

pub fn expect_pessimistic_lock_res_callback(
done: Sender<i32>,
pessimistic_lock_res: PessimisticLockRes,
) -> Callback<Result<PessimisticLockRes>> {
Box::new(move |res: Result<Result<PessimisticLockRes>>| {
assert_eq!(res.unwrap().unwrap(), pessimistic_lock_res);
done.send(0).unwrap();
})
}

type PessimisticLockCommand = TypedCommand<Result<PessimisticLockRes>>;
pub fn new_acquire_pessimistic_lock_command(
keys: Vec<(Key, bool)>,
start_ts: impl Into<TimeStamp>,
for_update_ts: impl Into<TimeStamp>,
return_values: bool,
) -> PessimisticLockCommand {
let primary = keys[0].0.clone().to_raw().unwrap();
let for_update_ts: TimeStamp = for_update_ts.into();
commands::AcquirePessimisticLock::new(
keys,
primary,
start_ts.into(),
3000,
false,
for_update_ts,
None,
return_values,
for_update_ts.next(),
Context::default(),
)
}

pub fn delete_pessimistic_lock<E: Engine, L: LockManager>(
storage: &Storage<E, L>,
key: Key,
start_ts: u64,
for_update_ts: u64,
) {
let (tx, rx) = channel();
storage
.sched_txn_command(
commands::PessimisticRollback::new(
vec![key],
start_ts.into(),
for_update_ts.into(),
Context::default(),
),
expect_ok_callback(tx, 0),
)
.unwrap();
rx.recv().unwrap();
}
}

#[cfg(test)]
mod tests {
use super::{test_util::*, *};

use crate::config::TitanDBConfig;
use crate::storage::{
config::BlockCacheConfig,
lock_manager::{Lock, WaitTimeout},
mvcc::{Error as MvccError, ErrorInner as MvccErrorInner},
txn::{commands, Error as TxnError, ErrorInner as TxnErrorInner},
};
use engine::rocks::util::CFOptions;
use engine_traits::{CF_LOCK, CF_RAFT, CF_WRITE};
use futures03::executor::block_on;
use kvproto::kvrpcpb::{CommandPri, LockInfo};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{channel, Sender},
Arc,
},
time::Duration,
};
use tikv_util::collections::HashMap;
use tikv_util::config::ReadableSize;
use txn_types::Mutation;

#[test]
fn test_get_put() {
let storage = TestStorageBuilder::new().build().unwrap();
Expand Down Expand Up @@ -4300,61 +4362,7 @@ mod tests {
assert_eq!(cmd.ts, None);
}

type PessimisticLockCommand = TypedCommand<Result<PessimisticLockRes>>;
fn new_acquire_pessimistic_lock_command(
keys: Vec<(Key, bool)>,
start_ts: impl Into<TimeStamp>,
for_update_ts: impl Into<TimeStamp>,
return_values: bool,
) -> PessimisticLockCommand {
let primary = keys[0].0.clone().to_raw().unwrap();
let for_update_ts: TimeStamp = for_update_ts.into();
commands::AcquirePessimisticLock::new(
keys,
primary,
start_ts.into(),
3000,
false,
for_update_ts,
None,
return_values,
for_update_ts.next(),
Context::default(),
)
}

fn test_pessimistic_lock_impl(pipelined_pessimistic_lock: bool) {
fn delete_pessimistic_lock<E: Engine, L: LockManager>(
storage: &Storage<E, L>,
key: Key,
start_ts: u64,
for_update_ts: u64,
) {
let (tx, rx) = channel();
storage
.sched_txn_command(
commands::PessimisticRollback::new(
vec![key],
start_ts.into(),
for_update_ts.into(),
Context::default(),
),
expect_ok_callback(tx, 0),
)
.unwrap();
rx.recv().unwrap();
}

fn expect_pessimistic_lock_res_callback(
done: Sender<i32>,
pessimistic_lock_res: PessimisticLockRes,
) -> Callback<Result<PessimisticLockRes>> {
Box::new(move |res: Result<Result<PessimisticLockRes>>| {
assert_eq!(res.unwrap().unwrap(), pessimistic_lock_res);
done.send(0).unwrap();
})
}

let storage = TestStorageBuilder::new()
.set_lock_mgr(DummyLockManager {})
.set_pipelined_pessimistic_lock(pipelined_pessimistic_lock)
Expand Down
58 changes: 55 additions & 3 deletions src/storage/mvcc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ pub mod tests {
for_update_ts: TimeStamp,
txn_size: u64,
min_commit_ts: TimeStamp,
pipelined_pessimistic_lock: bool,
) {
let ctx = Context::default();
let snapshot = engine.snapshot(&ctx).unwrap();
Expand All @@ -416,7 +417,7 @@ pub mod tests {
for_update_ts,
txn_size,
min_commit_ts,
false,
pipelined_pessimistic_lock,
)
.unwrap();
}
Expand All @@ -441,6 +442,7 @@ pub mod tests {
TimeStamp::default(),
0,
TimeStamp::default(),
false,
);
}

Expand All @@ -464,6 +466,31 @@ pub mod tests {
for_update_ts.into(),
0,
TimeStamp::default(),
false,
);
}

pub fn must_pipelined_pessimistic_prewrite_put<E: Engine>(
engine: &E,
key: &[u8],
value: &[u8],
pk: &[u8],
ts: impl Into<TimeStamp>,
for_update_ts: impl Into<TimeStamp>,
is_pessimistic_lock: bool,
) {
must_prewrite_put_impl(
engine,
key,
value,
pk,
ts,
is_pessimistic_lock,
0,
for_update_ts.into(),
0,
TimeStamp::default(),
true,
);
}

Expand All @@ -488,6 +515,7 @@ pub mod tests {
for_update_ts.into(),
0,
TimeStamp::default(),
false,
);
}

Expand Down Expand Up @@ -515,6 +543,7 @@ pub mod tests {
for_update_ts,
0,
min_commit_ts,
false,
);
}

Expand All @@ -526,6 +555,7 @@ pub mod tests {
ts: impl Into<TimeStamp>,
for_update_ts: impl Into<TimeStamp>,
is_pessimistic_lock: bool,
pipelined_pessimistic_lock: bool,
) -> Error {
let ctx = Context::default();
let snapshot = engine.snapshot(&ctx).unwrap();
Expand All @@ -544,7 +574,7 @@ pub mod tests {
for_update_ts,
0,
TimeStamp::default(),
false,
pipelined_pessimistic_lock,
)
.unwrap_err()
}
Expand All @@ -557,7 +587,7 @@ pub mod tests {
pk: &[u8],
ts: impl Into<TimeStamp>,
) -> Error {
must_prewrite_put_err_impl(engine, key, value, pk, ts, TimeStamp::zero(), false)
must_prewrite_put_err_impl(engine, key, value, pk, ts, TimeStamp::zero(), false, false)
}

pub fn must_pessimistic_prewrite_put_err<E: Engine>(
Expand All @@ -577,6 +607,28 @@ pub mod tests {
ts,
for_update_ts,
is_pessimistic_lock,
false,
)
}

pub fn must_pipelined_pessimistic_prewrite_put_err<E: Engine>(
engine: &E,
key: &[u8],
value: &[u8],
pk: &[u8],
ts: impl Into<TimeStamp>,
for_update_ts: impl Into<TimeStamp>,
is_pessimistic_lock: bool,
) -> Error {
must_prewrite_put_err_impl(
engine,
key,
value,
pk,
ts,
for_update_ts,
is_pessimistic_lock,
true,
)
}

Expand Down
Loading

0 comments on commit 3a1002b

Please sign in to comment.