From 9eb7fc1129d94e75f334c1f4a7467829774a6962 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Mon, 2 Aug 2021 15:11:07 +0800 Subject: [PATCH] txn: skip resolve commit fail for left pessimistic locks (#10652) * skip resolve commit fail for left pessimistic locks Signed-off-by: cfzjywxk * remove log and change fn name Signed-off-by: cfzjywxk Co-authored-by: Ti Chi Robot --- components/txn_types/src/lock.rs | 4 + src/storage/mod.rs | 199 +++++++++++++++++++++++ src/storage/txn/commands/resolve_lock.rs | 17 +- 3 files changed, 218 insertions(+), 2 deletions(-) diff --git a/components/txn_types/src/lock.rs b/components/txn_types/src/lock.rs index 41e92dffb61..5520f096000 100644 --- a/components/txn_types/src/lock.rs +++ b/components/txn_types/src/lock.rs @@ -364,6 +364,10 @@ impl Lock { pub fn is_pessimistic_txn(&self) -> bool { !self.for_update_ts.is_zero() } + + pub fn is_pessimistic_lock(&self) -> bool { + self.lock_type == LockType::Pessimistic + } } #[cfg(test)] diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 8a69adbb245..57824a8cd6e 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -2123,6 +2123,7 @@ mod tests { use crate::storage::lock_manager::DiagnosticContext; use crate::storage::mvcc::LockType; use crate::storage::txn::commands::{AcquirePessimisticLock, Prewrite}; + use crate::storage::txn::tests::must_rollback; use crate::storage::{ config::BlockCacheConfig, kv::{Error as EngineError, ErrorInner as EngineErrorInner}, @@ -6644,4 +6645,202 @@ mod tests { on_proposed_case.run(); on_proposed_fallback_case.run(); } + + #[test] + fn test_resolve_commit_pessimistic_locks() { + let storage = TestStorageBuilder::new(DummyLockManager {}, false) + .build() + .unwrap(); + let (tx, rx) = channel(); + + // Pessimistically lock k1, k2, k3, k4, after the pessimistic retry k2 is no longer needed + // and the pessimistic lock on k2 is left. + storage + .sched_txn_command( + new_acquire_pessimistic_lock_command( + vec![ + (Key::from_raw(b"k1"), false), + (Key::from_raw(b"k2"), false), + (Key::from_raw(b"k3"), false), + (Key::from_raw(b"k4"), false), + (Key::from_raw(b"k5"), false), + (Key::from_raw(b"k6"), false), + ], + 10, + 10, + false, + ), + expect_ok_callback(tx.clone(), 0), + ) + .unwrap(); + rx.recv().unwrap(); + + // Prewrite keys except the k2. + storage + .sched_txn_command( + commands::PrewritePessimistic::with_defaults( + vec![ + (Mutation::Put((Key::from_raw(b"k1"), b"v1".to_vec())), true), + (Mutation::Put((Key::from_raw(b"k3"), b"v2".to_vec())), true), + (Mutation::Put((Key::from_raw(b"k4"), b"v4".to_vec())), true), + (Mutation::Put((Key::from_raw(b"k5"), b"v5".to_vec())), true), + (Mutation::Put((Key::from_raw(b"k6"), b"v6".to_vec())), true), + ], + b"k1".to_vec(), + 10.into(), + 10.into(), + ), + expect_ok_callback(tx.clone(), 0), + ) + .unwrap(); + rx.recv().unwrap(); + + // Commit the primary key. + storage + .sched_txn_command( + commands::Commit::new( + vec![Key::from_raw(b"k1")], + 10.into(), + 20.into(), + Context::default(), + ), + expect_ok_callback(tx.clone(), 0), + ) + .unwrap(); + rx.recv().unwrap(); + + // Pessimistically rollback the k2 lock. + // Non lite lock resolve on k1 and k2, there should no errors as lock on k2 is pessimistic type. + must_rollback(&storage.engine, b"k2", 10, false); + let mut temp_map = HashMap::default(); + temp_map.insert(10.into(), 20.into()); + storage + .sched_txn_command( + commands::ResolveLock::new( + temp_map.clone(), + None, + vec![ + ( + Key::from_raw(b"k1"), + mvcc::Lock::new( + mvcc::LockType::Put, + b"k1".to_vec(), + 10.into(), + 20, + Some(b"v1".to_vec()), + 10.into(), + 0, + 11.into(), + ), + ), + ( + Key::from_raw(b"k2"), + mvcc::Lock::new( + mvcc::LockType::Pessimistic, + b"k1".to_vec(), + 10.into(), + 20, + None, + 10.into(), + 0, + 11.into(), + ), + ), + ], + Context::default(), + ), + expect_ok_callback(tx.clone(), 0), + ) + .unwrap(); + rx.recv().unwrap(); + + // Non lite lock resolve on k3 and k4, there should be no errors. + storage + .sched_txn_command( + commands::ResolveLock::new( + temp_map.clone(), + None, + vec![ + ( + Key::from_raw(b"k3"), + mvcc::Lock::new( + mvcc::LockType::Put, + b"k1".to_vec(), + 10.into(), + 20, + Some(b"v3".to_vec()), + 10.into(), + 0, + 11.into(), + ), + ), + ( + Key::from_raw(b"k4"), + mvcc::Lock::new( + mvcc::LockType::Put, + b"k1".to_vec(), + 10.into(), + 20, + Some(b"v4".to_vec()), + 10.into(), + 0, + 11.into(), + ), + ), + ], + Context::default(), + ), + expect_ok_callback(tx.clone(), 0), + ) + .unwrap(); + rx.recv().unwrap(); + + // Unlock the k6 first. + // Non lite lock resolve on k5 and k6, error should be reported. + must_rollback(&storage.engine, b"k6", 10, true); + storage + .sched_txn_command( + commands::ResolveLock::new( + temp_map, + None, + vec![ + ( + Key::from_raw(b"k5"), + mvcc::Lock::new( + mvcc::LockType::Put, + b"k1".to_vec(), + 10.into(), + 20, + Some(b"v5".to_vec()), + 10.into(), + 0, + 11.into(), + ), + ), + ( + Key::from_raw(b"k6"), + mvcc::Lock::new( + mvcc::LockType::Put, + b"k1".to_vec(), + 10.into(), + 20, + Some(b"v6".to_vec()), + 10.into(), + 0, + 11.into(), + ), + ), + ], + Context::default(), + ), + expect_fail_callback(tx, 6, |e| match e { + Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Mvcc(mvcc::Error( + box mvcc::ErrorInner::TxnLockNotFound { .. }, + ))))) => (), + e => panic!("unexpected error chain: {:?}", e), + }), + ) + .unwrap(); + rx.recv().unwrap(); + } } diff --git a/src/storage/txn/commands/resolve_lock.rs b/src/storage/txn/commands/resolve_lock.rs index ea531dc3a59..8bdd18454ce 100644 --- a/src/storage/txn/commands/resolve_lock.rs +++ b/src/storage/txn/commands/resolve_lock.rs @@ -2,7 +2,9 @@ use crate::storage::kv::WriteData; use crate::storage::lock_manager::LockManager; -use crate::storage::mvcc::{MvccTxn, SnapshotReader, MAX_TXN_WRITE_SIZE}; +use crate::storage::mvcc::{ + Error as MvccError, ErrorInner as MvccErrorInner, MvccTxn, SnapshotReader, MAX_TXN_WRITE_SIZE, +}; use crate::storage::txn::commands::{ Command, CommandExt, ReaderWithStats, ReleasedLocks, ResolveLockReadPhase, ResponsePolicy, TypedCommand, WriteCommand, WriteContext, WriteResult, @@ -93,7 +95,18 @@ impl WriteCommand for ResolveLock { false, )? } else if commit_ts > current_lock.ts { - commit(&mut txn, &mut reader, current_key.clone(), commit_ts)? + // Continue to resolve locks if the not found committed locks are pessimistic type. + // They could be left if the transaction is finally committed and pessimistic conflict + // retry happens during execution. + match commit(&mut txn, &mut reader, current_key.clone(), commit_ts) { + Ok(res) => res, + Err(MvccError(box MvccErrorInner::TxnLockNotFound { .. })) + if current_lock.is_pessimistic_lock() => + { + None + } + Err(err) => return Err(err.into()), + } } else { return Err(Error::from(ErrorInner::InvalidTxnTso { start_ts: current_lock.ts,