diff --git a/src/storage/txn/actions/prewrite.rs b/src/storage/txn/actions/prewrite.rs index 6100f0c4a0f..2ef02d9b92e 100644 --- a/src/storage/txn/actions/prewrite.rs +++ b/src/storage/txn/actions/prewrite.rs @@ -28,15 +28,22 @@ pub fn prewrite( ) -> Result<(TimeStamp, OldValue)> { let mut mutation = PrewriteMutation::from_mutation(mutation, secondary_keys, txn_props)?; - let fail_point = if txn_props.is_pessimistic() { - "pessimistic_prewrite" - } else { - "prewrite" - }; - fail_point!(fail_point, |err| Err( - crate::storage::mvcc::txn::make_txn_error(err, &mutation.key, mutation.txn_props.start_ts) + #[cfg(feature = "failpoints")] + { + let fail_point = if txn_props.is_pessimistic() { + "pessimistic_prewrite" + } else { + "prewrite" + }; + fail_point!(fail_point, |err| Err( + crate::storage::mvcc::txn::make_txn_error( + err, + &mutation.key, + mutation.txn_props.start_ts + ) .into() - )); + )); + } let lock_status = match txn.reader.load_lock(&mutation.key)? { Some(lock) => mutation.check_lock(lock, is_pessimistic_lock)?, @@ -246,7 +253,12 @@ impl<'a> PrewriteMutation<'a> { // Duplicated command. No need to overwrite the lock and data. MVCC_DUPLICATE_CMD_COUNTER_VEC.prewrite.inc(); - Ok(LockStatus::Locked(lock.min_commit_ts)) + let min_commit_ts = if lock.use_async_commit { + lock.min_commit_ts + } else { + TimeStamp::zero() + }; + Ok(LockStatus::Locked(min_commit_ts)) } fn check_for_newer_version(&self, txn: &mut MvccTxn) -> Result> { diff --git a/src/storage/txn/commands/mod.rs b/src/storage/txn/commands/mod.rs index a9d049f41f4..000d671ea35 100644 --- a/src/storage/txn/commands/mod.rs +++ b/src/storage/txn/commands/mod.rs @@ -665,7 +665,9 @@ pub mod test_util { _ => unreachable!(), }; let ctx = Context::default(); - engine.write(&ctx, ret.to_be_write).unwrap(); + if !ret.to_be_write.modifies.is_empty() { + engine.write(&ctx, ret.to_be_write).unwrap(); + } Ok(res) } diff --git a/src/storage/txn/commands/prewrite.rs b/src/storage/txn/commands/prewrite.rs index 2d1e76bb0d2..387301fc36e 100644 --- a/src/storage/txn/commands/prewrite.rs +++ b/src/storage/txn/commands/prewrite.rs @@ -441,18 +441,19 @@ impl Prewriter { secondaries = &self.secondary_keys; } + let need_min_commit_ts = secondaries.is_some() || self.try_one_pc; let prewrite_result = prewrite(txn, &props, m, secondaries, is_pessimistic_lock); match prewrite_result { - Ok((ts, old_value)) => { - if (secondaries.is_some() || self.try_one_pc) && final_min_commit_ts < ts { + Ok((ts, old_value)) if !(need_min_commit_ts && ts.is_zero()) => { + if need_min_commit_ts && final_min_commit_ts < ts { final_min_commit_ts = ts; } - let key = key.append_ts(txn.start_ts); if old_value.specified() { + let key = key.append_ts(txn.start_ts); self.old_values.insert(key, (old_value, mutation_type)); } } - Err(MvccError(box MvccErrorInner::CommitTsTooLarge { .. })) => { + Err(MvccError(box MvccErrorInner::CommitTsTooLarge { .. })) | Ok((_, _)) => { // fallback to not using async commit or 1pc props.commit_kind = CommitKind::TwoPc; async_commit_pk = None; @@ -925,19 +926,22 @@ mod tests { let mutations = vec![Mutation::Put((Key::from_raw(key), value.to_vec()))]; let mut statistics = Statistics::default(); - let res = prewrite_with_cm( - &engine, - cm.clone(), - &mut statistics, - mutations, - key.to_vec(), - 20, - Some(30), - ) - .unwrap(); - assert!(res.min_commit_ts.is_zero()); - assert!(res.one_pc_commit_ts.is_zero()); - must_locked(&engine, key, 20); + // Test the idempotency of prewrite when falling back to 2PC. + for _ in 0..2 { + let res = prewrite_with_cm( + &engine, + cm.clone(), + &mut statistics, + mutations.clone(), + key.to_vec(), + 20, + Some(30), + ) + .unwrap(); + assert!(res.min_commit_ts.is_zero()); + assert!(res.one_pc_commit_ts.is_zero()); + must_locked(&engine, key, 20); + } must_rollback(&engine, key, 20); let mutations = vec![ @@ -1074,25 +1078,28 @@ mod tests { ]; let mut statistics = Statistics::default(); // calculated_ts > max_commit_ts - let cmd = super::Prewrite::new( - mutations, - k1.to_vec(), - 20.into(), - 0, - false, - 2, - TimeStamp::default(), - 40.into(), - Some(vec![k2.to_vec()]), - false, - Context::default(), - ); - - let res = prewrite_command(&engine, cm, &mut statistics, cmd).unwrap(); - assert!(res.min_commit_ts.is_zero()); - assert!(res.one_pc_commit_ts.is_zero()); - assert!(!must_locked(&engine, k1, 20).use_async_commit); - assert!(!must_locked(&engine, k2, 20).use_async_commit); + // Test the idempotency of prewrite when falling back to 2PC. + for _ in 0..2 { + let cmd = super::Prewrite::new( + mutations.clone(), + k1.to_vec(), + 20.into(), + 0, + false, + 2, + 21.into(), + 40.into(), + Some(vec![k2.to_vec()]), + false, + Context::default(), + ); + + let res = prewrite_command(&engine, cm.clone(), &mut statistics, cmd).unwrap(); + assert!(res.min_commit_ts.is_zero()); + assert!(res.one_pc_commit_ts.is_zero()); + assert!(!must_locked(&engine, k1, 20).use_async_commit); + assert!(!must_locked(&engine, k2, 20).use_async_commit); + } } #[test] diff --git a/tests/failpoints/cases/test_transaction.rs b/tests/failpoints/cases/test_transaction.rs index f2154596a88..743954f2318 100644 --- a/tests/failpoints/cases/test_transaction.rs +++ b/tests/failpoints/cases/test_transaction.rs @@ -312,3 +312,94 @@ fn test_max_commit_ts_error() { assert!(l1.use_async_commit); assert!(!l2.use_async_commit); } + +#[test] +fn test_exceed_max_commit_ts_in_the_middle_of_prewrite() { + let engine = TestEngineBuilder::new().build().unwrap(); + let storage = TestStorageBuilder::<_, DummyLockManager>::from_engine_and_lock_mgr( + engine, + DummyLockManager {}, + ) + .build() + .unwrap(); + let cm = storage.get_concurrency_manager(); + + let (prewrite_tx, prewrite_rx) = channel(); + // Pause between getting max ts and store the lock in memory + fail::cfg("before-set-lock-in-memory", "pause").unwrap(); + + cm.update_max_ts(40.into()); + let mutations = vec![ + Mutation::Put((Key::from_raw(b"k1"), b"v".to_vec())), + Mutation::Put((Key::from_raw(b"k2"), b"v".to_vec())), + ]; + storage + .sched_txn_command( + commands::Prewrite::new( + mutations.clone(), + b"k1".to_vec(), + 10.into(), + 20000, + false, + 2, + 11.into(), + 50.into(), + Some(vec![]), + false, + Context::default(), + ), + Box::new(move |res| { + prewrite_tx.send(res).unwrap(); + }), + ) + .unwrap(); + // sleep a while so the first key gets max ts. + thread::sleep(Duration::from_millis(200)); + + cm.update_max_ts(51.into()); + fail::remove("before-set-lock-in-memory"); + let res = prewrite_rx.recv().unwrap().unwrap(); + assert!(res.min_commit_ts.is_zero()); + assert!(res.one_pc_commit_ts.is_zero()); + + let locks = block_on(storage.scan_lock( + Context::default(), + 20.into(), + Some(Key::from_raw(b"k1")), + None, + 2, + )) + .unwrap(); + assert_eq!(locks.len(), 2); + assert_eq!(locks[0].get_key(), b"k1"); + assert!(locks[0].get_use_async_commit()); + assert_eq!(locks[0].get_min_commit_ts(), 41); + assert_eq!(locks[1].get_key(), b"k2"); + assert!(!locks[1].get_use_async_commit()); + + // Send a duplicated request to test the idempotency of prewrite when falling back to 2PC. + let (prewrite_tx, prewrite_rx) = channel(); + storage + .sched_txn_command( + commands::Prewrite::new( + mutations, + b"k1".to_vec(), + 10.into(), + 20000, + false, + 2, + 11.into(), + 50.into(), + Some(vec![]), + false, + Context::default(), + ), + Box::new(move |res| { + prewrite_tx.send(res).unwrap(); + }), + ) + .unwrap(); + let res = prewrite_rx.recv().unwrap().unwrap(); + assert!(res.min_commit_ts.is_zero()); + assert!(res.one_pc_commit_ts.is_zero()); +} diff --git a/tests/integrations/server/kv_service.rs b/tests/integrations/server/kv_service.rs index ac06eef8acc..7a7dec73eb2 100644 --- a/tests/integrations/server/kv_service.rs +++ b/tests/integrations/server/kv_service.rs @@ -1275,7 +1275,7 @@ fn test_prewrite_check_max_commit_ts() { assert_eq!(resp.get_min_commit_ts(), 101); let mut req = PrewriteRequest::default(); - req.set_context(ctx); + req.set_context(ctx.clone()); req.set_primary_lock(b"k2".to_vec()); let mut mutation = Mutation::default(); mutation.set_op(Op::Put); @@ -1283,11 +1283,39 @@ fn test_prewrite_check_max_commit_ts() { mutation.set_value(b"v2".to_vec()); req.mut_mutations().push(mutation); req.set_start_version(20); + req.set_min_commit_ts(21); req.set_max_commit_ts(50); req.set_lock_ttl(20000); req.set_use_async_commit(true); - let resp = client.kv_prewrite(&req).unwrap(); - assert_eq!(resp.get_min_commit_ts(), 0); + // Test the idempotency of prewrite when falling back to 2PC. + for _ in 0..2 { + let resp = client.kv_prewrite(&req).unwrap(); + assert_eq!(resp.get_min_commit_ts(), 0); + assert_eq!(resp.get_one_pc_commit_ts(), 0); + } + + // 1PC + let mut req = PrewriteRequest::default(); + req.set_context(ctx); + req.set_primary_lock(b"k3".to_vec()); + let mut mutation = Mutation::default(); + mutation.set_op(Op::Put); + mutation.set_key(b"k3".to_vec()); + mutation.set_value(b"v3".to_vec()); + req.mut_mutations().push(mutation); + req.set_start_version(20); + req.set_min_commit_ts(21); + req.set_max_commit_ts(50); + req.set_lock_ttl(20000); + req.set_use_async_commit(true); + req.set_try_one_pc(true); + // Test the idempotency of prewrite when falling back to 2PC. + for _ in 0..2 { + let resp = client.kv_prewrite(&req).unwrap(); + assert_eq!(resp.get_min_commit_ts(), 0); + assert_eq!(resp.get_one_pc_commit_ts(), 0); + } + // There shouldn't be locks remaining in the lock table. assert!(cm.read_range_check(None, None, |_, _| Err(())).is_ok()); }