From 3a1002b5ee9c943bad0ed70ad9620b3d354bd742 Mon Sep 17 00:00:00 2001 From: Lei Zhao Date: Wed, 13 May 2020 17:12:25 +0800 Subject: [PATCH] storage: add tests for pipelined pessimistic lock (#7706) Signed-off-by: youjiali1995 --- src/storage/kv/rocksdb_engine.rs | 4 + src/storage/mod.rs | 178 +++++++++++++------------ src/storage/mvcc/mod.rs | 58 +++++++- src/storage/mvcc/txn.rs | 63 +++++++++ src/storage/txn/process.rs | 11 ++ src/storage/txn/scheduler.rs | 2 +- tests/failpoints/cases/test_storage.rs | 130 +++++++++++++++++- 7 files changed, 356 insertions(+), 90 deletions(-) diff --git a/src/storage/kv/rocksdb_engine.rs b/src/storage/kv/rocksdb_engine.rs index fc875b770ef..532541aaa6d 100644 --- a/src/storage/kv/rocksdb_engine.rs +++ b/src/storage/kv/rocksdb_engine.rs @@ -211,6 +211,8 @@ impl TestEngineBuilder { } fn write_modifies(engine: &Engines, modifies: Vec) -> Result<()> { + fail_point!("rockskv_write_modifies", |_| Err(box_err!("write failed"))); + let mut wb = engine.kv.c().write_batch(); for rev in modifies { let res = match rev { @@ -260,6 +262,8 @@ impl Engine for RocksEngine { type Snap = RocksSnapshot; fn async_write(&self, _: &Context, modifies: Vec, cb: Callback<()>) -> Result<()> { + fail_point!("rockskv_async_write", |_| Err(box_err!("write failed"))); + if modifies.is_empty() { return Err(Error::from(ErrorInner::EmptyRequest)); } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index f1b1c228470..f6ef6815202 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1362,48 +1362,28 @@ impl TestStorageBuilder { } } -#[cfg(test)] -mod tests { +pub mod test_util { use super::*; - - use crate::config::TitanDBConfig; - use crate::storage::{ - config::BlockCacheConfig, - lock_manager::{Lock, WaitTimeout}, - mvcc::{Error as MvccError, ErrorInner as MvccErrorInner}, - txn::{commands, Error as TxnError, ErrorInner as TxnErrorInner}, - }; - use engine::rocks::util::CFOptions; - use engine_traits::{CF_LOCK, CF_RAFT, CF_WRITE}; - use futures03::executor::block_on; - use kvproto::kvrpcpb::{CommandPri, LockInfo}; + use crate::storage::txn::commands; use std::{ fmt::Debug, - sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::{channel, Sender}, - Arc, - }, - time::Duration, + sync::mpsc::{channel, Sender}, }; - use tikv_util::collections::HashMap; - use tikv_util::config::ReadableSize; - use txn_types::Mutation; - fn expect_none(x: Result>) { + pub fn expect_none(x: Result>) { assert_eq!(x.unwrap(), None); } - fn expect_value(v: Vec, x: Result>) { + pub fn expect_value(v: Vec, x: Result>) { assert_eq!(x.unwrap().unwrap(), v); } - fn expect_multi_values(v: Vec>, x: Result>>) { + pub fn expect_multi_values(v: Vec>, x: Result>>) { let x: Vec> = x.unwrap().into_iter().map(Result::ok).collect(); assert_eq!(x, v); } - fn expect_error(err_matcher: F, x: Result) + pub fn expect_error(err_matcher: F, x: Result) where F: FnOnce(Error) + Send + 'static, { @@ -1413,14 +1393,14 @@ mod tests { } } - fn expect_ok_callback(done: Sender, id: i32) -> Callback { + pub fn expect_ok_callback(done: Sender, id: i32) -> Callback { Box::new(move |x: Result| { x.unwrap(); done.send(id).unwrap(); }) } - fn expect_fail_callback(done: Sender, id: i32, err_matcher: F) -> Callback + pub fn expect_fail_callback(done: Sender, id: i32, err_matcher: F) -> Callback where F: FnOnce(Error) + Send + 'static, { @@ -1430,7 +1410,7 @@ mod tests { }) } - fn expect_too_busy_callback(done: Sender, id: i32) -> Callback { + pub fn expect_too_busy_callback(done: Sender, id: i32) -> Callback { Box::new(move |x: Result| { expect_error( |err| match err { @@ -1443,7 +1423,7 @@ mod tests { }) } - fn expect_value_callback( + pub fn expect_value_callback( done: Sender, id: i32, value: T, @@ -1454,6 +1434,88 @@ mod tests { }) } + pub fn expect_pessimistic_lock_res_callback( + done: Sender, + pessimistic_lock_res: PessimisticLockRes, + ) -> Callback> { + Box::new(move |res: Result>| { + assert_eq!(res.unwrap().unwrap(), pessimistic_lock_res); + done.send(0).unwrap(); + }) + } + + type PessimisticLockCommand = TypedCommand>; + pub fn new_acquire_pessimistic_lock_command( + keys: Vec<(Key, bool)>, + start_ts: impl Into, + for_update_ts: impl Into, + return_values: bool, + ) -> PessimisticLockCommand { + let primary = keys[0].0.clone().to_raw().unwrap(); + let for_update_ts: TimeStamp = for_update_ts.into(); + commands::AcquirePessimisticLock::new( + keys, + primary, + start_ts.into(), + 3000, + false, + for_update_ts, + None, + return_values, + for_update_ts.next(), + Context::default(), + ) + } + + pub fn delete_pessimistic_lock( + storage: &Storage, + key: Key, + start_ts: u64, + for_update_ts: u64, + ) { + let (tx, rx) = channel(); + storage + .sched_txn_command( + commands::PessimisticRollback::new( + vec![key], + start_ts.into(), + for_update_ts.into(), + Context::default(), + ), + expect_ok_callback(tx, 0), + ) + .unwrap(); + rx.recv().unwrap(); + } +} + +#[cfg(test)] +mod tests { + use super::{test_util::*, *}; + + use crate::config::TitanDBConfig; + use crate::storage::{ + config::BlockCacheConfig, + lock_manager::{Lock, WaitTimeout}, + mvcc::{Error as MvccError, ErrorInner as MvccErrorInner}, + txn::{commands, Error as TxnError, ErrorInner as TxnErrorInner}, + }; + use engine::rocks::util::CFOptions; + use engine_traits::{CF_LOCK, CF_RAFT, CF_WRITE}; + use futures03::executor::block_on; + use kvproto::kvrpcpb::{CommandPri, LockInfo}; + use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{channel, Sender}, + Arc, + }, + time::Duration, + }; + use tikv_util::collections::HashMap; + use tikv_util::config::ReadableSize; + use txn_types::Mutation; + #[test] fn test_get_put() { let storage = TestStorageBuilder::new().build().unwrap(); @@ -4300,61 +4362,7 @@ mod tests { assert_eq!(cmd.ts, None); } - type PessimisticLockCommand = TypedCommand>; - fn new_acquire_pessimistic_lock_command( - keys: Vec<(Key, bool)>, - start_ts: impl Into, - for_update_ts: impl Into, - return_values: bool, - ) -> PessimisticLockCommand { - let primary = keys[0].0.clone().to_raw().unwrap(); - let for_update_ts: TimeStamp = for_update_ts.into(); - commands::AcquirePessimisticLock::new( - keys, - primary, - start_ts.into(), - 3000, - false, - for_update_ts, - None, - return_values, - for_update_ts.next(), - Context::default(), - ) - } - fn test_pessimistic_lock_impl(pipelined_pessimistic_lock: bool) { - fn delete_pessimistic_lock( - storage: &Storage, - key: Key, - start_ts: u64, - for_update_ts: u64, - ) { - let (tx, rx) = channel(); - storage - .sched_txn_command( - commands::PessimisticRollback::new( - vec![key], - start_ts.into(), - for_update_ts.into(), - Context::default(), - ), - expect_ok_callback(tx, 0), - ) - .unwrap(); - rx.recv().unwrap(); - } - - fn expect_pessimistic_lock_res_callback( - done: Sender, - pessimistic_lock_res: PessimisticLockRes, - ) -> Callback> { - Box::new(move |res: Result>| { - assert_eq!(res.unwrap().unwrap(), pessimistic_lock_res); - done.send(0).unwrap(); - }) - } - let storage = TestStorageBuilder::new() .set_lock_mgr(DummyLockManager {}) .set_pipelined_pessimistic_lock(pipelined_pessimistic_lock) diff --git a/src/storage/mvcc/mod.rs b/src/storage/mvcc/mod.rs index 2e043f5f04f..9686e061521 100644 --- a/src/storage/mvcc/mod.rs +++ b/src/storage/mvcc/mod.rs @@ -399,6 +399,7 @@ pub mod tests { for_update_ts: TimeStamp, txn_size: u64, min_commit_ts: TimeStamp, + pipelined_pessimistic_lock: bool, ) { let ctx = Context::default(); let snapshot = engine.snapshot(&ctx).unwrap(); @@ -416,7 +417,7 @@ pub mod tests { for_update_ts, txn_size, min_commit_ts, - false, + pipelined_pessimistic_lock, ) .unwrap(); } @@ -441,6 +442,7 @@ pub mod tests { TimeStamp::default(), 0, TimeStamp::default(), + false, ); } @@ -464,6 +466,31 @@ pub mod tests { for_update_ts.into(), 0, TimeStamp::default(), + false, + ); + } + + pub fn must_pipelined_pessimistic_prewrite_put( + engine: &E, + key: &[u8], + value: &[u8], + pk: &[u8], + ts: impl Into, + for_update_ts: impl Into, + is_pessimistic_lock: bool, + ) { + must_prewrite_put_impl( + engine, + key, + value, + pk, + ts, + is_pessimistic_lock, + 0, + for_update_ts.into(), + 0, + TimeStamp::default(), + true, ); } @@ -488,6 +515,7 @@ pub mod tests { for_update_ts.into(), 0, TimeStamp::default(), + false, ); } @@ -515,6 +543,7 @@ pub mod tests { for_update_ts, 0, min_commit_ts, + false, ); } @@ -526,6 +555,7 @@ pub mod tests { ts: impl Into, for_update_ts: impl Into, is_pessimistic_lock: bool, + pipelined_pessimistic_lock: bool, ) -> Error { let ctx = Context::default(); let snapshot = engine.snapshot(&ctx).unwrap(); @@ -544,7 +574,7 @@ pub mod tests { for_update_ts, 0, TimeStamp::default(), - false, + pipelined_pessimistic_lock, ) .unwrap_err() } @@ -557,7 +587,7 @@ pub mod tests { pk: &[u8], ts: impl Into, ) -> Error { - must_prewrite_put_err_impl(engine, key, value, pk, ts, TimeStamp::zero(), false) + must_prewrite_put_err_impl(engine, key, value, pk, ts, TimeStamp::zero(), false, false) } pub fn must_pessimistic_prewrite_put_err( @@ -577,6 +607,28 @@ pub mod tests { ts, for_update_ts, is_pessimistic_lock, + false, + ) + } + + pub fn must_pipelined_pessimistic_prewrite_put_err( + engine: &E, + key: &[u8], + value: &[u8], + pk: &[u8], + ts: impl Into, + for_update_ts: impl Into, + is_pessimistic_lock: bool, + ) -> Error { + must_prewrite_put_err_impl( + engine, + key, + value, + pk, + ts, + for_update_ts, + is_pessimistic_lock, + true, ) } diff --git a/src/storage/mvcc/txn.rs b/src/storage/mvcc/txn.rs index f51bd2926be..c9a6d26c706 100644 --- a/src/storage/mvcc/txn.rs +++ b/src/storage/mvcc/txn.rs @@ -463,6 +463,8 @@ impl MvccTxn { Ok(()) } + // TiKV may fails to write pessimistic locks due to pipelined process. + // If the data is not changed after acquiring the lock, we can still prewrite the key. fn amend_pessimistic_lock( &mut self, pipelined_pessimistic_lock: bool, @@ -482,6 +484,15 @@ impl MvccTxn { .into()); } if let Some((commit_ts, _)) = self.reader.seek_write(key, TimeStamp::max())? { + // The invariants of pessimistic locks are: + // 1. lock's for_update_ts >= key's latest commit_ts + // 2. lock's for_update_ts >= txn's start_ts + // 3. If the data is changed after acquiring the pessimistic lock, key's new commit_ts > lock's for_update_ts + // + // So, if the key's latest commit_ts is still less than or equal to lock's for_update_ts, the data is not changed. + // However, we can't get lock's for_update_ts in current implementation (txn's for_update_ts is updated for each DML), + // we can only use txn's start_ts to check -- If the key's commit_ts is less than txn's start_ts, it's less than + // lock's for_update_ts too. if commit_ts >= self.start_ts { warn!( "prewrite failed (pessimistic lock not found)"; @@ -1493,6 +1504,7 @@ mod tests { ts(60, 0), 1, ts(60, 1), + false, ); // The min_commit_ts is ts(70, 0) other than ts(60, 1) in prewrite request. must_large_txn_locked(&engine, k, ts(60, 0), 100, ts(70, 1), false); @@ -2587,6 +2599,7 @@ mod tests { TimeStamp::zero(), 1, /* min_commit_ts */ TimeStamp::zero(), + false, ); must_check_txn_status( &engine, @@ -2685,6 +2698,7 @@ mod tests { TimeStamp::zero(), expected_lock_info.get_txn_size(), TimeStamp::zero(), + false, ); } else { expected_lock_info.set_lock_type(Op::PessimisticLock); @@ -2865,4 +2879,53 @@ mod tests { must_pessimistic_locked(&engine, k, 75, 75); must_pessimistic_rollback(&engine, k, 75, 75); } + + #[test] + fn test_amend_pessimistic_lock() { + fn fail_to_write_pessimistic_lock( + engine: &E, + key: &[u8], + start_ts: impl Into, + for_update_ts: impl Into, + ) { + let start_ts = start_ts.into(); + let for_update_ts = for_update_ts.into(); + must_acquire_pessimistic_lock(engine, key, key, start_ts, for_update_ts); + // Delete the pessimistic lock to pretend write failure. + must_pessimistic_rollback(engine, key, start_ts, for_update_ts); + } + + let engine = TestEngineBuilder::new().build().unwrap(); + let (k, mut v) = (b"k", b"v".to_vec()); + + // Key not exist; should succeed. + fail_to_write_pessimistic_lock(&engine, k, 10, 10); + must_pipelined_pessimistic_prewrite_put(&engine, k, &v, k, 10, 10, true); + must_commit(&engine, k, 10, 20); + must_get(&engine, k, 20, &v); + + // for_update_ts(30) >= start_ts(30) > commit_ts(20); should succeed. + v.push(0); + fail_to_write_pessimistic_lock(&engine, k, 30, 30); + must_pipelined_pessimistic_prewrite_put(&engine, k, &v, k, 30, 30, true); + must_commit(&engine, k, 30, 40); + must_get(&engine, k, 40, &v); + + // for_update_ts(40) >= commit_ts(40) > start_ts(35); should fail. + fail_to_write_pessimistic_lock(&engine, k, 35, 40); + must_pipelined_pessimistic_prewrite_put_err(&engine, k, &v, k, 35, 40, true); + + // KeyIsLocked; should fail. + must_acquire_pessimistic_lock(&engine, k, k, 50, 50); + must_pipelined_pessimistic_prewrite_put_err(&engine, k, &v, k, 60, 60, true); + must_pessimistic_rollback(&engine, k, 50, 50); + + // Pessimistic lock not exist and not pipelined; should fail. + must_pessimistic_prewrite_put_err(&engine, k, &v, k, 70, 70, true); + + // The txn has been rolled back; should fail. + must_acquire_pessimistic_lock(&engine, k, k, 80, 80); + must_cleanup(&engine, k, 80, TimeStamp::max()); + must_pipelined_pessimistic_prewrite_put_err(&engine, k, &v, k, 80, 80, true); + } } diff --git a/src/storage/txn/process.rs b/src/storage/txn/process.rs index a6d269e80d9..d6a478894c3 100644 --- a/src/storage/txn/process.rs +++ b/src/storage/txn/process.rs @@ -270,6 +270,13 @@ impl Executor { } else { let sched = scheduler.clone(); let sched_pool = self.take_pool(); + // The normal write process is respond to clients and release latches + // after async write finished. If pipelined pessimistic lock is enabled, + // the process becomes parallel and there are two msgs for one command: + // 1. Msg::PipelinedWrite: respond to clients + // 2. Msg::WriteFinished: deque context and release latches + // The order between these two msgs is uncertain due to thread scheduling + // so we clone the result for each msg. let (write_finished_pr, pipelined_write_pr) = if pipelined { (pr.maybe_clone().unwrap(), pr) } else { @@ -280,6 +287,8 @@ impl Executor { sched_pool .pool .spawn(async move { + fail_point!("scheduler_async_write_finish"); + notify_scheduler( sched, Msg::WriteFinished { @@ -305,6 +314,8 @@ impl Executor { let err = e.into(); Msg::FinishedWithErr { cid, err, tag } } else if pipelined { + fail_point!("scheduler_pipelined_write_finish"); + // The write task is scheduled to engine successfully. // Respond to client early. Msg::PipelinedWrite { diff --git a/src/storage/txn/scheduler.rs b/src/storage/txn/scheduler.rs index ccc40e8582d..a2f97e50195 100644 --- a/src/storage/txn/scheduler.rs +++ b/src/storage/txn/scheduler.rs @@ -254,7 +254,7 @@ impl SchedulerInner { self.task_contexts[id_index(cid)] .lock() .get_mut(&cid) - .map(|tctx| tctx.cb.take().unwrap()) + .and_then(|tctx| tctx.cb.take()) } fn too_busy(&self) -> bool { diff --git a/tests/failpoints/cases/test_storage.rs b/tests/failpoints/cases/test_storage.rs index 7b532923347..71dba891f76 100644 --- a/tests/failpoints/cases/test_storage.rs +++ b/tests/failpoints/cases/test_storage.rs @@ -11,7 +11,7 @@ use kvproto::tikvpb::TikvClient; use test_raftstore::{must_get_equal, must_get_none, new_server_cluster}; use tikv::storage::kv::{Error as KvError, ErrorInner as KvErrorInner}; use tikv::storage::txn::{commands, Error as TxnError, ErrorInner as TxnErrorInner}; -use tikv::storage::{self, lock_manager::DummyLockManager, *}; +use tikv::storage::{self, lock_manager::DummyLockManager, test_util::*, *}; use tikv_util::{collections::HashMap, HandyRwLock}; use txn_types::Key; use txn_types::{Mutation, TimeStamp}; @@ -194,3 +194,131 @@ fn test_raftkv_early_error_report() { } fail::remove(raftkv_fp); } + +#[test] +fn test_pipelined_pessimistic_lock() { + let rockskv_async_write_fp = "rockskv_async_write"; + let rockskv_write_modifies_fp = "rockskv_write_modifies"; + let scheduler_async_write_finish_fp = "scheduler_async_write_finish"; + let scheduler_pipelined_write_finish_fp = "scheduler_pipelined_write_finish"; + + let storage = TestStorageBuilder::new() + .set_lock_mgr(DummyLockManager {}) + .set_pipelined_pessimistic_lock(true) + .build() + .unwrap(); + + let (tx, rx) = channel(); + let (key, val) = (Key::from_raw(b"key"), b"val".to_vec()); + + // Even if storage fails to write the lock to engine, client should + // receive the successful response. + fail::cfg(rockskv_write_modifies_fp, "return()").unwrap(); + fail::cfg(scheduler_async_write_finish_fp, "pause").unwrap(); + storage + .sched_txn_command( + new_acquire_pessimistic_lock_command(vec![(key.clone(), false)], 10, 10, true), + expect_pessimistic_lock_res_callback( + tx.clone(), + PessimisticLockRes::Values(vec![None]), + ), + ) + .unwrap(); + rx.recv().unwrap(); + fail::remove(rockskv_write_modifies_fp); + fail::remove(scheduler_async_write_finish_fp); + storage + .sched_txn_command( + commands::PrewritePessimistic::new( + vec![(Mutation::Put((key.clone(), val.clone())), true)], + key.to_raw().unwrap(), + 10.into(), + 3000, + 10.into(), + 1, + 11.into(), + Context::default(), + ), + expect_ok_callback(tx.clone(), 0), + ) + .unwrap(); + rx.recv().unwrap(); + storage + .sched_txn_command( + commands::Commit::new(vec![key.clone()], 10.into(), 20.into(), Context::default()), + expect_ok_callback(tx.clone(), 0), + ) + .unwrap(); + rx.recv().unwrap(); + + // Should report failure if storage fails to schedule write request to engine. + fail::cfg(rockskv_async_write_fp, "return()").unwrap(); + storage + .sched_txn_command( + new_acquire_pessimistic_lock_command(vec![(key.clone(), false)], 30, 30, true), + expect_fail_callback(tx.clone(), 0, |_| ()), + ) + .unwrap(); + rx.recv().unwrap(); + fail::remove(rockskv_async_write_fp); + + // Shouldn't release latches until async write finished. + fail::cfg(scheduler_async_write_finish_fp, "pause").unwrap(); + for blocked in &[false, true] { + storage + .sched_txn_command( + new_acquire_pessimistic_lock_command(vec![(key.clone(), false)], 40, 40, true), + expect_pessimistic_lock_res_callback( + tx.clone(), + PessimisticLockRes::Values(vec![Some(val.clone())]), + ), + ) + .unwrap(); + + if !*blocked { + rx.recv().unwrap(); + } else { + // Blocked by latches. + rx.recv_timeout(Duration::from_millis(500)).unwrap_err(); + } + } + fail::remove(scheduler_async_write_finish_fp); + rx.recv().unwrap(); + delete_pessimistic_lock(&storage, key.clone(), 40, 40); + + // Pipelined write is finished before async write. + fail::cfg(scheduler_async_write_finish_fp, "pause").unwrap(); + storage + .sched_txn_command( + new_acquire_pessimistic_lock_command(vec![(key.clone(), false)], 50, 50, true), + expect_pessimistic_lock_res_callback( + tx.clone(), + PessimisticLockRes::Values(vec![Some(val.clone())]), + ), + ) + .unwrap(); + rx.recv().unwrap(); + fail::remove(scheduler_async_write_finish_fp); + delete_pessimistic_lock(&storage, key.clone(), 50, 50); + + // Async write is finished before pipelined write due to thread scheduling. + // Storage should handle it properly. + fail::cfg(scheduler_pipelined_write_finish_fp, "pause").unwrap(); + storage + .sched_txn_command( + new_acquire_pessimistic_lock_command( + vec![(key.clone(), false), (Key::from_raw(b"nonexist"), false)], + 60, + 60, + true, + ), + expect_pessimistic_lock_res_callback( + tx, + PessimisticLockRes::Values(vec![Some(val), None]), + ), + ) + .unwrap(); + rx.recv().unwrap(); + fail::remove(scheduler_pipelined_write_finish_fp); + delete_pessimistic_lock(&storage, key, 60, 60); +}