Skip to content

Commit

Permalink
txn: return min_commit_ts for check_not_exists (tikv#9888)
Browse files Browse the repository at this point in the history
* txn: return min_commit_ts for check_not_exists

Signed-off-by: youjiali1995 <[email protected]>

* check_not_exsits udpates max_ts

Signed-off-by: youjiali1995 <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
youjiali1995 and ti-chi-bot authored Mar 25, 2021
1 parent d9462e3 commit d68b4ea
Showing 1 changed file with 98 additions and 42 deletions.
140 changes: 98 additions & 42 deletions src/storage/txn/actions/prewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,16 @@ pub fn prewrite<S: Snapshot>(
};

if mutation.should_not_write {
return Ok((TimeStamp::zero(), OldValue::Unspecified));
// `checkNotExists` is equivalent to a get operation, so it should update the max_ts.
txn.concurrency_manager.update_max_ts(txn_props.start_ts);
let min_commit_ts = if mutation.need_min_commit_ts() {
// Don't calculate the min_commit_ts according to the concurrency manager's max_ts
// for a should_not_write mutation because it's not persisted and doesn't change data.
cmp::max(txn_props.min_commit_ts, txn_props.start_ts.next())
} else {
TimeStamp::zero()
};
return Ok((min_commit_ts, OldValue::Unspecified));
}

let old_value = if txn_props.need_old_value && mutation.mutation_type.may_have_old_value() {
Expand Down Expand Up @@ -370,12 +379,15 @@ impl<'a> PrewriteMutation<'a> {
}
}

fn need_min_commit_ts(&self) -> bool {
matches!(
&self.txn_props.commit_kind,
CommitKind::Async(_) | CommitKind::OnePc(_)
)
}

fn try_one_pc(&self) -> bool {
match &self.txn_props.commit_kind {
CommitKind::TwoPc => false,
CommitKind::OnePc(_) => true,
CommitKind::Async(_) => false,
}
matches!(&self.txn_props.commit_kind, CommitKind::OnePc(_))
}
}

Expand Down Expand Up @@ -605,62 +617,106 @@ pub mod tests {
#[test]
fn test_async_commit_prewrite_min_commit_ts() {
let engine = crate::storage::TestEngineBuilder::new().build().unwrap();
let cm = ConcurrencyManager::new(42.into());
let cm = ConcurrencyManager::new(41.into());
let snapshot = engine.snapshot(Default::default()).unwrap();

// min_commit_ts must be > max_ts
// should_not_write mutations don't write locks or change data so that they needn't ask
// the concurrency manager for max_ts. Its min_commit_ts may be less than or equal to max_ts.
let mut props = optimistic_async_props(b"k0", 10.into(), 50.into(), 2, false);
props.min_commit_ts = 11.into();
let mut txn = MvccTxn::new(snapshot.clone(), 10.into(), false, cm.clone());
let (min_ts, _) = prewrite(
&mut txn,
&optimistic_async_props(b"k1", 10.into(), 50.into(), 2, false),
Mutation::Put((Key::from_raw(b"k1"), b"v1".to_vec())),
&Some(vec![b"k2".to_vec()]),
false,
)
.unwrap();
assert!(min_ts > 42.into());
assert!(min_ts < 50.into());

// min_commit_ts must be > start_ts
let mut txn = MvccTxn::new(snapshot, 44.into(), false, cm);
let (min_ts, _) = prewrite(
&mut txn,
&optimistic_async_props(b"k3", 44.into(), 50.into(), 2, false),
Mutation::Put((Key::from_raw(b"k3"), b"v1".to_vec())),
&Some(vec![b"k4".to_vec()]),
&props,
Mutation::CheckNotExists(Key::from_raw(b"k0")),
&Some(vec![]),
false,
)
.unwrap();
assert!(min_ts > 44.into());
assert!(min_ts < 50.into());

// min_commit_ts must be > for_update_ts
let mut props = optimistic_async_props(b"k5", 44.into(), 50.into(), 2, false);
props.kind = TransactionKind::Pessimistic(45.into());
let (min_ts, _) = prewrite(
assert!(min_ts > props.start_ts);
assert!(min_ts >= props.min_commit_ts);
assert!(min_ts < 41.into());

// `checkNotExists` is equivalent to a get operation, so it should update the max_ts.
let mut props = optimistic_txn_props(b"k0", 42.into());
props.min_commit_ts = 43.into();
let mut txn = MvccTxn::new(snapshot.clone(), 42.into(), false, cm.clone());
prewrite(
&mut txn,
&props,
Mutation::Put((Key::from_raw(b"k5"), b"v1".to_vec())),
&Some(vec![b"k6".to_vec()]),
Mutation::CheckNotExists(Key::from_raw(b"k0")),
&Some(vec![]),
false,
)
.unwrap();
assert!(min_ts > 45.into());
assert!(min_ts < 50.into());
assert_eq!(cm.max_ts(), props.start_ts);

// min_commit_ts must be >= txn min_commit_ts
let mut props = optimistic_async_props(b"k7", 44.into(), 50.into(), 2, false);
props.min_commit_ts = 46.into();
// should_write mutations' min_commit_ts must be > max_ts
let mut txn = MvccTxn::new(snapshot.clone(), 10.into(), false, cm.clone());
let (min_ts, _) = prewrite(
&mut txn,
&props,
Mutation::Put((Key::from_raw(b"k7"), b"v1".to_vec())),
&Some(vec![b"k8".to_vec()]),
&optimistic_async_props(b"k1", 10.into(), 50.into(), 2, false),
Mutation::Put((Key::from_raw(b"k1"), b"v1".to_vec())),
&Some(vec![b"k2".to_vec()]),
false,
)
.unwrap();
assert!(min_ts >= 46.into());
assert!(min_ts > 42.into());
assert!(min_ts < 50.into());

for &should_not_write in &[false, true] {
let mutation = if should_not_write {
Mutation::CheckNotExists(Key::from_raw(b"k3"))
} else {
Mutation::Put((Key::from_raw(b"k3"), b"v1".to_vec()))
};

// min_commit_ts must be > start_ts
let mut txn = MvccTxn::new(snapshot.clone(), 44.into(), false, cm.clone());
let (min_ts, _) = prewrite(
&mut txn,
&optimistic_async_props(b"k3", 44.into(), 50.into(), 2, false),
mutation.clone(),
&Some(vec![b"k4".to_vec()]),
false,
)
.unwrap();
assert!(min_ts > 44.into());
assert!(min_ts < 50.into());
txn.take_guards();

// min_commit_ts must be > for_update_ts
if !should_not_write {
let mut props = optimistic_async_props(b"k5", 44.into(), 50.into(), 2, false);
props.kind = TransactionKind::Pessimistic(45.into());
let (min_ts, _) = prewrite(
&mut txn,
&props,
mutation.clone(),
&Some(vec![b"k6".to_vec()]),
false,
)
.unwrap();
assert!(min_ts > 45.into());
assert!(min_ts < 50.into());
txn.take_guards();
}

// min_commit_ts must be >= txn min_commit_ts
let mut props = optimistic_async_props(b"k7", 44.into(), 50.into(), 2, false);
props.min_commit_ts = 46.into();
let (min_ts, _) = prewrite(
&mut txn,
&props,
mutation.clone(),
&Some(vec![b"k8".to_vec()]),
false,
)
.unwrap();
assert!(min_ts >= 46.into());
assert!(min_ts < 50.into());
txn.take_guards();
}
}

#[test]
Expand Down

0 comments on commit d68b4ea

Please sign in to comment.