diff --git a/src/storage/txn/actions/prewrite.rs b/src/storage/txn/actions/prewrite.rs index 2ef02d9b92e..e8a1d762a80 100644 --- a/src/storage/txn/actions/prewrite.rs +++ b/src/storage/txn/actions/prewrite.rs @@ -66,7 +66,16 @@ pub fn prewrite( }; if mutation.should_not_write { - return Ok((TimeStamp::zero(), OldValue::Unspecified)); + // `checkNotExists` is equivalent to a get operation, so it should update the max_ts. + txn.concurrency_manager.update_max_ts(txn_props.start_ts); + let min_commit_ts = if mutation.need_min_commit_ts() { + // Don't calculate the min_commit_ts according to the concurrency manager's max_ts + // for a should_not_write mutation because it's not persisted and doesn't change data. + cmp::max(txn_props.min_commit_ts, txn_props.start_ts.next()) + } else { + TimeStamp::zero() + }; + return Ok((min_commit_ts, OldValue::Unspecified)); } let old_value = if txn_props.need_old_value && mutation.mutation_type.may_have_old_value() { @@ -370,12 +379,15 @@ impl<'a> PrewriteMutation<'a> { } } + fn need_min_commit_ts(&self) -> bool { + matches!( + &self.txn_props.commit_kind, + CommitKind::Async(_) | CommitKind::OnePc(_) + ) + } + fn try_one_pc(&self) -> bool { - match &self.txn_props.commit_kind { - CommitKind::TwoPc => false, - CommitKind::OnePc(_) => true, - CommitKind::Async(_) => false, - } + matches!(&self.txn_props.commit_kind, CommitKind::OnePc(_)) } } @@ -605,62 +617,106 @@ pub mod tests { #[test] fn test_async_commit_prewrite_min_commit_ts() { let engine = crate::storage::TestEngineBuilder::new().build().unwrap(); - let cm = ConcurrencyManager::new(42.into()); + let cm = ConcurrencyManager::new(41.into()); let snapshot = engine.snapshot(Default::default()).unwrap(); - // min_commit_ts must be > max_ts + // should_not_write mutations don't write locks or change data so that they needn't ask + // the concurrency manager for max_ts. Its min_commit_ts may be less than or equal to max_ts. + let mut props = optimistic_async_props(b"k0", 10.into(), 50.into(), 2, false); + props.min_commit_ts = 11.into(); let mut txn = MvccTxn::new(snapshot.clone(), 10.into(), false, cm.clone()); let (min_ts, _) = prewrite( &mut txn, - &optimistic_async_props(b"k1", 10.into(), 50.into(), 2, false), - Mutation::Put((Key::from_raw(b"k1"), b"v1".to_vec())), - &Some(vec![b"k2".to_vec()]), - false, - ) - .unwrap(); - assert!(min_ts > 42.into()); - assert!(min_ts < 50.into()); - - // min_commit_ts must be > start_ts - let mut txn = MvccTxn::new(snapshot, 44.into(), false, cm); - let (min_ts, _) = prewrite( - &mut txn, - &optimistic_async_props(b"k3", 44.into(), 50.into(), 2, false), - Mutation::Put((Key::from_raw(b"k3"), b"v1".to_vec())), - &Some(vec![b"k4".to_vec()]), + &props, + Mutation::CheckNotExists(Key::from_raw(b"k0")), + &Some(vec![]), false, ) .unwrap(); - assert!(min_ts > 44.into()); - assert!(min_ts < 50.into()); - - // min_commit_ts must be > for_update_ts - let mut props = optimistic_async_props(b"k5", 44.into(), 50.into(), 2, false); - props.kind = TransactionKind::Pessimistic(45.into()); - let (min_ts, _) = prewrite( + assert!(min_ts > props.start_ts); + assert!(min_ts >= props.min_commit_ts); + assert!(min_ts < 41.into()); + + // `checkNotExists` is equivalent to a get operation, so it should update the max_ts. + let mut props = optimistic_txn_props(b"k0", 42.into()); + props.min_commit_ts = 43.into(); + let mut txn = MvccTxn::new(snapshot.clone(), 42.into(), false, cm.clone()); + prewrite( &mut txn, &props, - Mutation::Put((Key::from_raw(b"k5"), b"v1".to_vec())), - &Some(vec![b"k6".to_vec()]), + Mutation::CheckNotExists(Key::from_raw(b"k0")), + &Some(vec![]), false, ) .unwrap(); - assert!(min_ts > 45.into()); - assert!(min_ts < 50.into()); + assert_eq!(cm.max_ts(), props.start_ts); - // min_commit_ts must be >= txn min_commit_ts - let mut props = optimistic_async_props(b"k7", 44.into(), 50.into(), 2, false); - props.min_commit_ts = 46.into(); + // should_write mutations' min_commit_ts must be > max_ts + let mut txn = MvccTxn::new(snapshot.clone(), 10.into(), false, cm.clone()); let (min_ts, _) = prewrite( &mut txn, - &props, - Mutation::Put((Key::from_raw(b"k7"), b"v1".to_vec())), - &Some(vec![b"k8".to_vec()]), + &optimistic_async_props(b"k1", 10.into(), 50.into(), 2, false), + Mutation::Put((Key::from_raw(b"k1"), b"v1".to_vec())), + &Some(vec![b"k2".to_vec()]), false, ) .unwrap(); - assert!(min_ts >= 46.into()); + assert!(min_ts > 42.into()); assert!(min_ts < 50.into()); + + for &should_not_write in &[false, true] { + let mutation = if should_not_write { + Mutation::CheckNotExists(Key::from_raw(b"k3")) + } else { + Mutation::Put((Key::from_raw(b"k3"), b"v1".to_vec())) + }; + + // min_commit_ts must be > start_ts + let mut txn = MvccTxn::new(snapshot.clone(), 44.into(), false, cm.clone()); + let (min_ts, _) = prewrite( + &mut txn, + &optimistic_async_props(b"k3", 44.into(), 50.into(), 2, false), + mutation.clone(), + &Some(vec![b"k4".to_vec()]), + false, + ) + .unwrap(); + assert!(min_ts > 44.into()); + assert!(min_ts < 50.into()); + txn.take_guards(); + + // min_commit_ts must be > for_update_ts + if !should_not_write { + let mut props = optimistic_async_props(b"k5", 44.into(), 50.into(), 2, false); + props.kind = TransactionKind::Pessimistic(45.into()); + let (min_ts, _) = prewrite( + &mut txn, + &props, + mutation.clone(), + &Some(vec![b"k6".to_vec()]), + false, + ) + .unwrap(); + assert!(min_ts > 45.into()); + assert!(min_ts < 50.into()); + txn.take_guards(); + } + + // min_commit_ts must be >= txn min_commit_ts + let mut props = optimistic_async_props(b"k7", 44.into(), 50.into(), 2, false); + props.min_commit_ts = 46.into(); + let (min_ts, _) = prewrite( + &mut txn, + &props, + mutation.clone(), + &Some(vec![b"k8".to_vec()]), + false, + ) + .unwrap(); + assert!(min_ts >= 46.into()); + assert!(min_ts < 50.into()); + txn.take_guards(); + } } #[test]