Skip to content

Commit

Permalink
txn: skip resolve commit fail for left pessimistic locks (tikv#10652)
Browse files Browse the repository at this point in the history
* skip resolve commit fail for left pessimistic locks

Signed-off-by: cfzjywxk <[email protected]>

* remove log and change fn name

Signed-off-by: cfzjywxk <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
cfzjywxk and ti-chi-bot authored Aug 2, 2021
1 parent 479ce68 commit 9eb7fc1
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 2 deletions.
4 changes: 4 additions & 0 deletions components/txn_types/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,10 @@ impl Lock {
pub fn is_pessimistic_txn(&self) -> bool {
!self.for_update_ts.is_zero()
}

pub fn is_pessimistic_lock(&self) -> bool {
self.lock_type == LockType::Pessimistic
}
}

#[cfg(test)]
Expand Down
199 changes: 199 additions & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2123,6 +2123,7 @@ mod tests {
use crate::storage::lock_manager::DiagnosticContext;
use crate::storage::mvcc::LockType;
use crate::storage::txn::commands::{AcquirePessimisticLock, Prewrite};
use crate::storage::txn::tests::must_rollback;
use crate::storage::{
config::BlockCacheConfig,
kv::{Error as EngineError, ErrorInner as EngineErrorInner},
Expand Down Expand Up @@ -6644,4 +6645,202 @@ mod tests {
on_proposed_case.run();
on_proposed_fallback_case.run();
}

#[test]
fn test_resolve_commit_pessimistic_locks() {
let storage = TestStorageBuilder::new(DummyLockManager {}, false)
.build()
.unwrap();
let (tx, rx) = channel();

// Pessimistically lock k1, k2, k3, k4, after the pessimistic retry k2 is no longer needed
// and the pessimistic lock on k2 is left.
storage
.sched_txn_command(
new_acquire_pessimistic_lock_command(
vec![
(Key::from_raw(b"k1"), false),
(Key::from_raw(b"k2"), false),
(Key::from_raw(b"k3"), false),
(Key::from_raw(b"k4"), false),
(Key::from_raw(b"k5"), false),
(Key::from_raw(b"k6"), false),
],
10,
10,
false,
),
expect_ok_callback(tx.clone(), 0),
)
.unwrap();
rx.recv().unwrap();

// Prewrite keys except the k2.
storage
.sched_txn_command(
commands::PrewritePessimistic::with_defaults(
vec![
(Mutation::Put((Key::from_raw(b"k1"), b"v1".to_vec())), true),
(Mutation::Put((Key::from_raw(b"k3"), b"v2".to_vec())), true),
(Mutation::Put((Key::from_raw(b"k4"), b"v4".to_vec())), true),
(Mutation::Put((Key::from_raw(b"k5"), b"v5".to_vec())), true),
(Mutation::Put((Key::from_raw(b"k6"), b"v6".to_vec())), true),
],
b"k1".to_vec(),
10.into(),
10.into(),
),
expect_ok_callback(tx.clone(), 0),
)
.unwrap();
rx.recv().unwrap();

// Commit the primary key.
storage
.sched_txn_command(
commands::Commit::new(
vec![Key::from_raw(b"k1")],
10.into(),
20.into(),
Context::default(),
),
expect_ok_callback(tx.clone(), 0),
)
.unwrap();
rx.recv().unwrap();

// Pessimistically rollback the k2 lock.
// Non lite lock resolve on k1 and k2, there should no errors as lock on k2 is pessimistic type.
must_rollback(&storage.engine, b"k2", 10, false);
let mut temp_map = HashMap::default();
temp_map.insert(10.into(), 20.into());
storage
.sched_txn_command(
commands::ResolveLock::new(
temp_map.clone(),
None,
vec![
(
Key::from_raw(b"k1"),
mvcc::Lock::new(
mvcc::LockType::Put,
b"k1".to_vec(),
10.into(),
20,
Some(b"v1".to_vec()),
10.into(),
0,
11.into(),
),
),
(
Key::from_raw(b"k2"),
mvcc::Lock::new(
mvcc::LockType::Pessimistic,
b"k1".to_vec(),
10.into(),
20,
None,
10.into(),
0,
11.into(),
),
),
],
Context::default(),
),
expect_ok_callback(tx.clone(), 0),
)
.unwrap();
rx.recv().unwrap();

// Non lite lock resolve on k3 and k4, there should be no errors.
storage
.sched_txn_command(
commands::ResolveLock::new(
temp_map.clone(),
None,
vec![
(
Key::from_raw(b"k3"),
mvcc::Lock::new(
mvcc::LockType::Put,
b"k1".to_vec(),
10.into(),
20,
Some(b"v3".to_vec()),
10.into(),
0,
11.into(),
),
),
(
Key::from_raw(b"k4"),
mvcc::Lock::new(
mvcc::LockType::Put,
b"k1".to_vec(),
10.into(),
20,
Some(b"v4".to_vec()),
10.into(),
0,
11.into(),
),
),
],
Context::default(),
),
expect_ok_callback(tx.clone(), 0),
)
.unwrap();
rx.recv().unwrap();

// Unlock the k6 first.
// Non lite lock resolve on k5 and k6, error should be reported.
must_rollback(&storage.engine, b"k6", 10, true);
storage
.sched_txn_command(
commands::ResolveLock::new(
temp_map,
None,
vec![
(
Key::from_raw(b"k5"),
mvcc::Lock::new(
mvcc::LockType::Put,
b"k1".to_vec(),
10.into(),
20,
Some(b"v5".to_vec()),
10.into(),
0,
11.into(),
),
),
(
Key::from_raw(b"k6"),
mvcc::Lock::new(
mvcc::LockType::Put,
b"k1".to_vec(),
10.into(),
20,
Some(b"v6".to_vec()),
10.into(),
0,
11.into(),
),
),
],
Context::default(),
),
expect_fail_callback(tx, 6, |e| match e {
Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Mvcc(mvcc::Error(
box mvcc::ErrorInner::TxnLockNotFound { .. },
))))) => (),
e => panic!("unexpected error chain: {:?}", e),
}),
)
.unwrap();
rx.recv().unwrap();
}
}
17 changes: 15 additions & 2 deletions src/storage/txn/commands/resolve_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

use crate::storage::kv::WriteData;
use crate::storage::lock_manager::LockManager;
use crate::storage::mvcc::{MvccTxn, SnapshotReader, MAX_TXN_WRITE_SIZE};
use crate::storage::mvcc::{
Error as MvccError, ErrorInner as MvccErrorInner, MvccTxn, SnapshotReader, MAX_TXN_WRITE_SIZE,
};
use crate::storage::txn::commands::{
Command, CommandExt, ReaderWithStats, ReleasedLocks, ResolveLockReadPhase, ResponsePolicy,
TypedCommand, WriteCommand, WriteContext, WriteResult,
Expand Down Expand Up @@ -93,7 +95,18 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for ResolveLock {
false,
)?
} else if commit_ts > current_lock.ts {
commit(&mut txn, &mut reader, current_key.clone(), commit_ts)?
// Continue to resolve locks if the not found committed locks are pessimistic type.
// They could be left if the transaction is finally committed and pessimistic conflict
// retry happens during execution.
match commit(&mut txn, &mut reader, current_key.clone(), commit_ts) {
Ok(res) => res,
Err(MvccError(box MvccErrorInner::TxnLockNotFound { .. }))
if current_lock.is_pessimistic_lock() =>
{
None
}
Err(err) => return Err(err.into()),
}
} else {
return Err(Error::from(ErrorInner::InvalidTxnTso {
start_ts: current_lock.ts,
Expand Down

0 comments on commit 9eb7fc1

Please sign in to comment.