Skip to content

Commit

Permalink
txn: make prewrite idempotent when falling back to 2PC (tikv#9884)
Browse files Browse the repository at this point in the history
* txn: make prewrite idempotent when falling back to 2PC

Signed-off-by: youjiali1995 <[email protected]>

* tests for 1pc

Signed-off-by: youjiali1995 <[email protected]>

* test exceeding the max ts during prewrite

Signed-off-by: youjiali1995 <[email protected]>

Co-authored-by: zhangjinpeng1987 <[email protected]>
  • Loading branch information
youjiali1995 and zhangjinpeng87 authored Mar 25, 2021
1 parent 5535ef0 commit d9462e3
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 49 deletions.
30 changes: 21 additions & 9 deletions src/storage/txn/actions/prewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,22 @@ pub fn prewrite<S: Snapshot>(
) -> Result<(TimeStamp, OldValue)> {
let mut mutation = PrewriteMutation::from_mutation(mutation, secondary_keys, txn_props)?;

let fail_point = if txn_props.is_pessimistic() {
"pessimistic_prewrite"
} else {
"prewrite"
};
fail_point!(fail_point, |err| Err(
crate::storage::mvcc::txn::make_txn_error(err, &mutation.key, mutation.txn_props.start_ts)
#[cfg(feature = "failpoints")]
{
let fail_point = if txn_props.is_pessimistic() {
"pessimistic_prewrite"
} else {
"prewrite"
};
fail_point!(fail_point, |err| Err(
crate::storage::mvcc::txn::make_txn_error(
err,
&mutation.key,
mutation.txn_props.start_ts
)
.into()
));
));
}

let lock_status = match txn.reader.load_lock(&mutation.key)? {
Some(lock) => mutation.check_lock(lock, is_pessimistic_lock)?,
Expand Down Expand Up @@ -246,7 +253,12 @@ impl<'a> PrewriteMutation<'a> {

// Duplicated command. No need to overwrite the lock and data.
MVCC_DUPLICATE_CMD_COUNTER_VEC.prewrite.inc();
Ok(LockStatus::Locked(lock.min_commit_ts))
let min_commit_ts = if lock.use_async_commit {
lock.min_commit_ts
} else {
TimeStamp::zero()
};
Ok(LockStatus::Locked(min_commit_ts))
}

fn check_for_newer_version<S: Snapshot>(&self, txn: &mut MvccTxn<S>) -> Result<Option<Write>> {
Expand Down
4 changes: 3 additions & 1 deletion src/storage/txn/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,9 @@ pub mod test_util {
_ => unreachable!(),
};
let ctx = Context::default();
engine.write(&ctx, ret.to_be_write).unwrap();
if !ret.to_be_write.modifies.is_empty() {
engine.write(&ctx, ret.to_be_write).unwrap();
}
Ok(res)
}

Expand Down
79 changes: 43 additions & 36 deletions src/storage/txn/commands/prewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,18 +441,19 @@ impl<K: PrewriteKind> Prewriter<K> {
secondaries = &self.secondary_keys;
}

let need_min_commit_ts = secondaries.is_some() || self.try_one_pc;
let prewrite_result = prewrite(txn, &props, m, secondaries, is_pessimistic_lock);
match prewrite_result {
Ok((ts, old_value)) => {
if (secondaries.is_some() || self.try_one_pc) && final_min_commit_ts < ts {
Ok((ts, old_value)) if !(need_min_commit_ts && ts.is_zero()) => {
if need_min_commit_ts && final_min_commit_ts < ts {
final_min_commit_ts = ts;
}
let key = key.append_ts(txn.start_ts);
if old_value.specified() {
let key = key.append_ts(txn.start_ts);
self.old_values.insert(key, (old_value, mutation_type));
}
}
Err(MvccError(box MvccErrorInner::CommitTsTooLarge { .. })) => {
Err(MvccError(box MvccErrorInner::CommitTsTooLarge { .. })) | Ok((_, _)) => {
// fallback to not using async commit or 1pc
props.commit_kind = CommitKind::TwoPc;
async_commit_pk = None;
Expand Down Expand Up @@ -925,19 +926,22 @@ mod tests {
let mutations = vec![Mutation::Put((Key::from_raw(key), value.to_vec()))];

let mut statistics = Statistics::default();
let res = prewrite_with_cm(
&engine,
cm.clone(),
&mut statistics,
mutations,
key.to_vec(),
20,
Some(30),
)
.unwrap();
assert!(res.min_commit_ts.is_zero());
assert!(res.one_pc_commit_ts.is_zero());
must_locked(&engine, key, 20);
// Test the idempotency of prewrite when falling back to 2PC.
for _ in 0..2 {
let res = prewrite_with_cm(
&engine,
cm.clone(),
&mut statistics,
mutations.clone(),
key.to_vec(),
20,
Some(30),
)
.unwrap();
assert!(res.min_commit_ts.is_zero());
assert!(res.one_pc_commit_ts.is_zero());
must_locked(&engine, key, 20);
}

must_rollback(&engine, key, 20);
let mutations = vec![
Expand Down Expand Up @@ -1074,25 +1078,28 @@ mod tests {
];
let mut statistics = Statistics::default();
// calculated_ts > max_commit_ts
let cmd = super::Prewrite::new(
mutations,
k1.to_vec(),
20.into(),
0,
false,
2,
TimeStamp::default(),
40.into(),
Some(vec![k2.to_vec()]),
false,
Context::default(),
);

let res = prewrite_command(&engine, cm, &mut statistics, cmd).unwrap();
assert!(res.min_commit_ts.is_zero());
assert!(res.one_pc_commit_ts.is_zero());
assert!(!must_locked(&engine, k1, 20).use_async_commit);
assert!(!must_locked(&engine, k2, 20).use_async_commit);
// Test the idempotency of prewrite when falling back to 2PC.
for _ in 0..2 {
let cmd = super::Prewrite::new(
mutations.clone(),
k1.to_vec(),
20.into(),
0,
false,
2,
21.into(),
40.into(),
Some(vec![k2.to_vec()]),
false,
Context::default(),
);

let res = prewrite_command(&engine, cm.clone(), &mut statistics, cmd).unwrap();
assert!(res.min_commit_ts.is_zero());
assert!(res.one_pc_commit_ts.is_zero());
assert!(!must_locked(&engine, k1, 20).use_async_commit);
assert!(!must_locked(&engine, k2, 20).use_async_commit);
}
}

#[test]
Expand Down
91 changes: 91 additions & 0 deletions tests/failpoints/cases/test_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,94 @@ fn test_max_commit_ts_error() {
assert!(l1.use_async_commit);
assert!(!l2.use_async_commit);
}

#[test]
fn test_exceed_max_commit_ts_in_the_middle_of_prewrite() {
let engine = TestEngineBuilder::new().build().unwrap();
let storage = TestStorageBuilder::<_, DummyLockManager>::from_engine_and_lock_mgr(
engine,
DummyLockManager {},
)
.build()
.unwrap();
let cm = storage.get_concurrency_manager();

let (prewrite_tx, prewrite_rx) = channel();
// Pause between getting max ts and store the lock in memory
fail::cfg("before-set-lock-in-memory", "pause").unwrap();

cm.update_max_ts(40.into());
let mutations = vec![
Mutation::Put((Key::from_raw(b"k1"), b"v".to_vec())),
Mutation::Put((Key::from_raw(b"k2"), b"v".to_vec())),
];
storage
.sched_txn_command(
commands::Prewrite::new(
mutations.clone(),
b"k1".to_vec(),
10.into(),
20000,
false,
2,
11.into(),
50.into(),
Some(vec![]),
false,
Context::default(),
),
Box::new(move |res| {
prewrite_tx.send(res).unwrap();
}),
)
.unwrap();
// sleep a while so the first key gets max ts.
thread::sleep(Duration::from_millis(200));

cm.update_max_ts(51.into());
fail::remove("before-set-lock-in-memory");
let res = prewrite_rx.recv().unwrap().unwrap();
assert!(res.min_commit_ts.is_zero());
assert!(res.one_pc_commit_ts.is_zero());

let locks = block_on(storage.scan_lock(
Context::default(),
20.into(),
Some(Key::from_raw(b"k1")),
None,
2,
))
.unwrap();
assert_eq!(locks.len(), 2);
assert_eq!(locks[0].get_key(), b"k1");
assert!(locks[0].get_use_async_commit());
assert_eq!(locks[0].get_min_commit_ts(), 41);
assert_eq!(locks[1].get_key(), b"k2");
assert!(!locks[1].get_use_async_commit());

// Send a duplicated request to test the idempotency of prewrite when falling back to 2PC.
let (prewrite_tx, prewrite_rx) = channel();
storage
.sched_txn_command(
commands::Prewrite::new(
mutations,
b"k1".to_vec(),
10.into(),
20000,
false,
2,
11.into(),
50.into(),
Some(vec![]),
false,
Context::default(),
),
Box::new(move |res| {
prewrite_tx.send(res).unwrap();
}),
)
.unwrap();
let res = prewrite_rx.recv().unwrap().unwrap();
assert!(res.min_commit_ts.is_zero());
assert!(res.one_pc_commit_ts.is_zero());
}
34 changes: 31 additions & 3 deletions tests/integrations/server/kv_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1275,19 +1275,47 @@ fn test_prewrite_check_max_commit_ts() {
assert_eq!(resp.get_min_commit_ts(), 101);

let mut req = PrewriteRequest::default();
req.set_context(ctx);
req.set_context(ctx.clone());
req.set_primary_lock(b"k2".to_vec());
let mut mutation = Mutation::default();
mutation.set_op(Op::Put);
mutation.set_key(b"k2".to_vec());
mutation.set_value(b"v2".to_vec());
req.mut_mutations().push(mutation);
req.set_start_version(20);
req.set_min_commit_ts(21);
req.set_max_commit_ts(50);
req.set_lock_ttl(20000);
req.set_use_async_commit(true);
let resp = client.kv_prewrite(&req).unwrap();
assert_eq!(resp.get_min_commit_ts(), 0);
// Test the idempotency of prewrite when falling back to 2PC.
for _ in 0..2 {
let resp = client.kv_prewrite(&req).unwrap();
assert_eq!(resp.get_min_commit_ts(), 0);
assert_eq!(resp.get_one_pc_commit_ts(), 0);
}

// 1PC
let mut req = PrewriteRequest::default();
req.set_context(ctx);
req.set_primary_lock(b"k3".to_vec());
let mut mutation = Mutation::default();
mutation.set_op(Op::Put);
mutation.set_key(b"k3".to_vec());
mutation.set_value(b"v3".to_vec());
req.mut_mutations().push(mutation);
req.set_start_version(20);
req.set_min_commit_ts(21);
req.set_max_commit_ts(50);
req.set_lock_ttl(20000);
req.set_use_async_commit(true);
req.set_try_one_pc(true);
// Test the idempotency of prewrite when falling back to 2PC.
for _ in 0..2 {
let resp = client.kv_prewrite(&req).unwrap();
assert_eq!(resp.get_min_commit_ts(), 0);
assert_eq!(resp.get_one_pc_commit_ts(), 0);
}

// There shouldn't be locks remaining in the lock table.
assert!(cm.read_range_check(None, None, |_, _| Err(())).is_ok());
}
Expand Down

0 comments on commit d9462e3

Please sign in to comment.