diff --git a/components/cdc/src/delegate.rs b/components/cdc/src/delegate.rs index f47b7d29abc..f67e18968d0 100644 --- a/components/cdc/src/delegate.rs +++ b/components/cdc/src/delegate.rs @@ -1,8 +1,6 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. -use std::cell::RefCell; use std::mem; -use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; @@ -32,7 +30,6 @@ use raftstore::store::fsm::ObserveID; use raftstore::store::util::compare_region_epoch; use raftstore::Error as RaftStoreError; use resolved_ts::Resolver; -use tikv::storage::Statistics; use tikv::{server::raftkv::WriteBatchFlags, storage::txn::TxnEntry}; use tikv_util::time::Instant; use tikv_util::{debug, info, warn}; @@ -464,7 +461,7 @@ impl Delegate { pub fn on_batch( &mut self, batch: CmdBatch, - old_value_cb: Rc>, + old_value_cb: &OldValueCallback, old_value_cache: &mut OldValueCache, ) -> Result<()> { // Stale CmdBatch, drop it sliently. @@ -485,7 +482,7 @@ impl Delegate { self.sink_data( index, request.requests.into(), - old_value_cb.clone(), + old_value_cb, old_value_cache, is_one_pc, )?; @@ -613,7 +610,7 @@ impl Delegate { &mut self, index: u64, requests: Vec, - old_value_cb: Rc>, + old_value_cb: &OldValueCallback, old_value_cache: &mut OldValueCache, is_one_pc: bool, ) -> Result<()> { @@ -622,19 +619,18 @@ impl Delegate { if txn_extra_op == TxnExtraOp::ReadOldValue { let key = Key::from_raw(&row.key).append_ts(row.start_ts.into()); let start = Instant::now(); - - let mut statistics = Statistics::default(); - row.old_value = - old_value_cb.borrow_mut()(key, read_old_ts, old_value_cache, &mut statistics) - .unwrap_or_default(); + let (old_value, statistics) = old_value_cb(key, read_old_ts, old_value_cache); + row.old_value = old_value.unwrap_or_default(); CDC_OLD_VALUE_DURATION_HISTOGRAM .with_label_values(&["all"]) .observe(start.elapsed().as_secs_f64()); - for (cf, cf_details) in statistics.details().iter() { - for (tag, count) in cf_details.iter() { - CDC_OLD_VALUE_SCAN_DETAILS - .with_label_values(&[*cf, *tag]) - .inc_by(*count as i64); + if let Some(statistics) = statistics { + for (cf, cf_details) in statistics.details().iter() { + for (tag, count) in cf_details.iter() { + CDC_OLD_VALUE_SCAN_DETAILS + .with_label_values(&[*cf, *tag]) + .inc_by(*count as i64); + } } } } diff --git a/components/cdc/src/endpoint.rs b/components/cdc/src/endpoint.rs index 5422491fe98..d3d39919b2d 100644 --- a/components/cdc/src/endpoint.rs +++ b/components/cdc/src/endpoint.rs @@ -2,7 +2,6 @@ use std::cell::RefCell; use std::fmt; -use std::rc::Rc; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -104,7 +103,7 @@ impl fmt::Debug for Deregister { type InitCallback = Box; pub(crate) type OldValueCallback = - Box Option> + Send>; + Box (Option>, Option) + Send>; pub struct OldValueCache { pub cache: LruCache, @@ -122,6 +121,11 @@ impl OldValueCache { } } +pub enum Validate { + Region(u64, Box) + Send>), + OldValueCache(Box), +} + pub enum Task { Register { request: ChangeDataRequest, @@ -160,7 +164,7 @@ pub enum Task { cb: InitCallback, }, TxnExtra(TxnExtra), - Validate(u64, Box) + Send>), + Validate(Validate), } impl_display_as_debug!(Task); @@ -225,7 +229,10 @@ impl fmt::Debug for Task { .field("downstream", &downstream_id) .finish(), Task::TxnExtra(_) => de.field("type", &"txn_extra").finish(), - Task::Validate(region_id, _) => de.field("region_id", ®ion_id).finish(), + Task::Validate(validate) => match validate { + Validate::Region(region_id, _) => de.field("region_id", ®ion_id).finish(), + Validate::OldValueCache(_) => de.finish(), + }, } } } @@ -288,6 +295,8 @@ impl> Endpoint { .core_threads(1) .build() .unwrap(); + CDC_OLD_VALUE_CACHE_CAP.set(cfg.old_value_cache_size as i64); + let old_value_cache = OldValueCache::new(cfg.old_value_cache_size); let ep = Endpoint { env, security_mgr, @@ -307,7 +316,7 @@ impl> Endpoint { min_ts_interval: cfg.min_ts_interval.0, min_resolved_ts: TimeStamp::max(), min_ts_region_id: 0, - old_value_cache: OldValueCache::new(cfg.old_value_cache_size), + old_value_cache, hibernate_regions_compatible: cfg.hibernate_regions_compatible, tikv_clients: Arc::new(Mutex::new(HashMap::default())), }; @@ -632,7 +641,6 @@ impl> Endpoint { pub fn on_multi_batch(&mut self, multi: Vec, old_value_cb: OldValueCallback) { fail_point!("cdc_before_handle_multi_batch", |_| {}); - let old_value_cb = Rc::new(RefCell::new(old_value_cb)); for batch in multi { let region_id = batch.region_id; let mut deregister = None; @@ -641,9 +649,7 @@ impl> Endpoint { // Skip the batch if the delegate has failed. continue; } - if let Err(e) = - delegate.on_batch(batch, old_value_cb.clone(), &mut self.old_value_cache) - { + if let Err(e) = delegate.on_batch(batch, &old_value_cb, &mut self.old_value_cache) { assert!(delegate.has_failed()); // Delegate has error, deregister the corresponding region. deregister = Some(Deregister::Region { @@ -1510,9 +1516,14 @@ impl> Runnable for Endpoint { self.old_value_cache.cache.insert(k, v); } } - Task::Validate(region_id, validate) => { - validate(self.capture_regions.get(®ion_id)); - } + Task::Validate(validate) => match validate { + Validate::Region(region_id, validate) => { + validate(self.capture_regions.get(®ion_id)); + } + Validate::OldValueCache(validate) => { + validate(&self.old_value_cache); + } + }, } self.flush_all(); } @@ -1537,6 +1548,7 @@ impl> RunnableWithTimer for Endpoint CmdObserver for CdcObserver { // Create a snapshot here for preventing the old value was GC-ed. let snapshot = RegionSnapshot::from_snapshot(Arc::new(engine.snapshot()), Arc::new(region)); - let mut reader = OldValueReader::new(snapshot); - let get_old_value = move |key, - query_ts, - old_value_cache: &mut OldValueCache, - statistics: &mut Statistics| { + let reader = OldValueReader::new(snapshot); + let get_old_value = move |key, query_ts, old_value_cache: &mut OldValueCache| { old_value_cache.access_count += 1; if let Some((old_value, mutation_type)) = old_value_cache.cache.remove(&key) { - match mutation_type { - MutationType::Insert => { - return None; - } - MutationType::Put | MutationType::Delete => { - if let OldValue::Value { + return match mutation_type { + MutationType::Insert => (None, None), + MutationType::Put | MutationType::Delete => match old_value { + OldValue::None => (None, None), + OldValue::Value { start_ts, short_value, - } = old_value - { - return short_value.or_else(|| { + } => { + let mut statistics = None; + let value = short_value.or_else(|| { + statistics = Some(Statistics::default()); let prev_key = key.truncate_ts().unwrap().append_ts(start_ts); let start = Instant::now(); let mut opts = ReadOptions::new(); opts.set_fill_cache(false); - let value = reader.get_value_default(&prev_key, statistics); + let value = reader + .get_value_default(&prev_key, statistics.as_mut().unwrap()); CDC_OLD_VALUE_DURATION_HISTOGRAM .with_label_values(&["get"]) .observe(start.elapsed().as_secs_f64()); value }); + (value, statistics) } - } + // Unspecified should not be added into cache. + OldValue::Unspecified => unreachable!(), + }, _ => unreachable!(), - } + }; } // Cannot get old value from cache, seek for it in engine. old_value_cache.miss_count += 1; + let mut statistics = Statistics::default(); let start = Instant::now(); let key = key.truncate_ts().unwrap().append_ts(query_ts); let value = reader - .near_seek_old_value(&key, statistics) + .near_seek_old_value(&key, &mut statistics) .unwrap_or_default(); CDC_OLD_VALUE_DURATION_HISTOGRAM .with_label_values(&["seek"]) .observe(start.elapsed().as_secs_f64()); - value + (value, Some(statistics)) }; if let Err(e) = self.sched.schedule(Task::MultiBatch { multi: batches, @@ -247,7 +249,7 @@ impl OldValueReader { .unwrap() } - fn get_value_default(&mut self, key: &Key, statistics: &mut Statistics) -> Option { + fn get_value_default(&self, key: &Key, statistics: &mut Statistics) -> Option { statistics.data.get += 1; let mut opts = ReadOptions::new(); opts.set_fill_cache(false); @@ -262,11 +264,7 @@ impl OldValueReader { /// The key passed in should be a key with a timestamp. This function will returns /// the latest value of the entry if the user key is the same to the given key and /// the timestamp is older than or equal to the timestamp in the given key. - fn near_seek_old_value( - &mut self, - key: &Key, - statistics: &mut Statistics, - ) -> Result> { + fn near_seek_old_value(&self, key: &Key, statistics: &mut Statistics) -> Result> { let (user_key, seek_ts) = Key::split_on_ts_for(key.as_encoded()).unwrap(); let mut write_cursor = self.new_write_cursor(key); if write_cursor.near_seek(key, &mut statistics.write)? @@ -389,7 +387,7 @@ mod tests { let key = Key::from_raw(k); let must_get_eq = |ts: u64, value| { - let mut old_value_reader = OldValueReader::new(Arc::new(kv_engine.snapshot())); + let old_value_reader = OldValueReader::new(Arc::new(kv_engine.snapshot())); let mut statistics = Statistics::default(); assert_eq!( old_value_reader @@ -440,7 +438,7 @@ mod tests { let kv_engine = engine.get_rocksdb(); let must_get_eq = |key: &[u8], ts: u64, value| { - let mut old_value_reader = OldValueReader::new(Arc::new(kv_engine.snapshot())); + let old_value_reader = OldValueReader::new(Arc::new(kv_engine.snapshot())); let mut statistics = Statistics::default(); assert_eq!( old_value_reader diff --git a/components/cdc/tests/integrations/test_cdc.rs b/components/cdc/tests/integrations/test_cdc.rs index d32b50e02e0..c740915106a 100644 --- a/components/cdc/tests/integrations/test_cdc.rs +++ b/components/cdc/tests/integrations/test_cdc.rs @@ -23,7 +23,7 @@ use test_raftstore::*; use tikv_util::HandyRwLock; use txn_types::{Key, Lock, LockType}; -use cdc::{metrics::CDC_RESOLVED_TS_ADVANCE_METHOD, Task}; +use cdc::{metrics::CDC_RESOLVED_TS_ADVANCE_METHOD, Task, Validate}; #[test] fn test_cdc_basic() { @@ -52,13 +52,13 @@ fn test_cdc_basic() { // There must be a delegate. let scheduler = suite.endpoints.values().next().unwrap().scheduler(); scheduler - .schedule(Task::Validate( + .schedule(Task::Validate(Validate::Region( 1, Box::new(|delegate| { let d = delegate.unwrap(); assert_eq!(d.downstreams.len(), 1); }), - )) + ))) .unwrap(); let (k, v) = ("key1".to_owned(), "value".to_owned()); @@ -119,12 +119,12 @@ fn test_cdc_basic() { } // The delegate must be removed. scheduler - .schedule(Task::Validate( + .schedule(Task::Validate(Validate::Region( 1, Box::new(|delegate| { assert!(delegate.is_none()); }), - )) + ))) .unwrap(); // request again. @@ -145,13 +145,13 @@ fn test_cdc_basic() { // Sleep a while to make sure the stream is registered. sleep_ms(200); scheduler - .schedule(Task::Validate( + .schedule(Task::Validate(Validate::Region( 1, Box::new(|delegate| { let d = delegate.unwrap(); assert_eq!(d.downstreams.len(), 1); }), - )) + ))) .unwrap(); // Drop stream and cancel its server streaming. @@ -159,12 +159,12 @@ fn test_cdc_basic() { // Sleep a while to make sure the stream is deregistered. sleep_ms(200); scheduler - .schedule(Task::Validate( + .schedule(Task::Validate(Validate::Region( 1, Box::new(|delegate| { assert!(delegate.is_none()); }), - )) + ))) .unwrap(); // Stale region epoch. @@ -218,14 +218,14 @@ fn test_cdc_not_leader() { let (tx, rx) = mpsc::channel(); let tx_ = tx.clone(); scheduler - .schedule(Task::Validate( + .schedule(Task::Validate(Validate::Region( 1, Box::new(move |delegate| { let d = delegate.unwrap(); assert_eq!(d.downstreams.len(), 1); tx_.send(()).unwrap(); }), - )) + ))) .unwrap(); rx.recv_timeout(Duration::from_secs(1)).unwrap(); assert!( @@ -266,13 +266,13 @@ fn test_cdc_not_leader() { // Sleep a while to make sure the stream is deregistered. sleep_ms(200); scheduler - .schedule(Task::Validate( + .schedule(Task::Validate(Validate::Region( 1, Box::new(move |delegate| { assert!(delegate.is_none()); tx.send(()).unwrap(); }), - )) + ))) .unwrap(); rx.recv_timeout(Duration::from_millis(200)).unwrap(); @@ -864,13 +864,18 @@ fn test_old_value_basic() { if row.get_start_ts() == ts3.into_inner() || row.get_start_ts() == ts4.into_inner() { - assert_eq!(row.get_old_value(), b"v1"); + assert_eq!(row.get_old_value(), b"v1", "{:?}", row); event_count += 1; } else if row.get_start_ts() == ts8.into_inner() { - assert_eq!(row.get_old_value(), vec![b'3'; 5120].as_slice()); + assert_eq!( + row.get_old_value(), + vec![b'3'; 5120].as_slice(), + "{:?}", + row + ); event_count += 1; } else if row.get_start_ts() == ts9.into_inner() { - assert_eq!(row.get_old_value(), b"v6"); + assert_eq!(row.get_old_value(), b"v6", "{:?}", row); event_count += 1; } } @@ -1249,6 +1254,140 @@ fn test_old_value_1pc() { suite.stop(); } +#[test] +fn test_old_value_cache() { + let mut suite = TestSuite::new(1); + let scheduler = suite.endpoints.values().next().unwrap().scheduler(); + let mut req = suite.new_changedata_request(1); + req.set_extra_op(ExtraOp::ReadOldValue); + let (mut req_tx, _, receive_event) = new_event_feed(suite.get_region_cdc_client(1)); + let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + let mut events = receive_event(false).events.to_vec(); + match events.remove(0).event.unwrap() { + Event_oneof_event::Entries(mut es) => { + let row = &es.take_entries().to_vec()[0]; + assert_eq!(row.get_type(), EventLogType::Initialized); + } + other => panic!("unknown event {:?}", other), + } + + // Insert value, simulate INSERT INTO. + let mut m1 = Mutation::default(); + let k1 = b"k1".to_vec(); + m1.set_op(Op::Insert); + m1.key = k1.clone(); + m1.value = b"v1".to_vec(); + suite.must_kv_prewrite(1, vec![m1], k1.clone(), 10.into()); + let mut events = receive_event(false).events.to_vec(); + match events.remove(0).event.unwrap() { + Event_oneof_event::Entries(mut es) => { + let row = &es.take_entries().to_vec()[0]; + assert_eq!(row.get_value(), b"v1"); + assert_eq!(row.get_old_value(), b""); + assert_eq!(row.get_type(), EventLogType::Prewrite); + assert_eq!(row.get_start_ts(), 10); + } + other => panic!("unknown event {:?}", other), + } + // k1 old value must be cached. + scheduler + .schedule(Task::Validate(Validate::OldValueCache(Box::new( + move |old_value_cache| { + assert_eq!(old_value_cache.access_count, 1); + assert_eq!(old_value_cache.miss_count, 0); + }, + )))) + .unwrap(); + suite.must_kv_commit(1, vec![k1], 10.into(), 15.into()); + let mut events = receive_event(false).events.to_vec(); + match events.remove(0).event.unwrap() { + Event_oneof_event::Entries(mut es) => { + let row = &es.take_entries().to_vec()[0]; + assert_eq!(row.get_type(), EventLogType::Commit); + assert_eq!(row.get_commit_ts(), 15); + } + other => panic!("unknown event {:?}", other), + } + + // Update a noexist value, simulate INSERT IGNORE INTO. + let mut m2 = Mutation::default(); + let k2 = b"k2".to_vec(); + m2.set_op(Op::Put); + m2.key = k2.clone(); + m2.value = b"v2".to_vec(); + suite.must_kv_prewrite(1, vec![m2], k2.clone(), 10.into()); + let mut events = receive_event(false).events.to_vec(); + match events.remove(0).event.unwrap() { + Event_oneof_event::Entries(mut es) => { + let row = &es.take_entries().to_vec()[0]; + assert_eq!(row.get_value(), b"v2"); + assert_eq!(row.get_old_value(), b""); + assert_eq!(row.get_type(), EventLogType::Prewrite); + assert_eq!(row.get_start_ts(), 10); + } + other => panic!("unknown event {:?}", other), + } + // k2 old value must be cached. + scheduler + .schedule(Task::Validate(Validate::OldValueCache(Box::new( + move |old_value_cache| { + assert_eq!(old_value_cache.access_count, 2); + assert_eq!(old_value_cache.miss_count, 0); + }, + )))) + .unwrap(); + suite.must_kv_commit(1, vec![k2], 10.into(), 15.into()); + let mut events = receive_event(false).events.to_vec(); + match events.remove(0).event.unwrap() { + Event_oneof_event::Entries(mut es) => { + let row = &es.take_entries().to_vec()[0]; + assert_eq!(row.get_type(), EventLogType::Commit); + assert_eq!(row.get_commit_ts(), 15); + } + other => panic!("unknown event {:?}", other), + } + + // Update an exist value, simulate UPDATE. + let mut m2 = Mutation::default(); + let k2 = b"k2".to_vec(); + m2.set_op(Op::Put); + m2.key = k2.clone(); + m2.value = b"v3".to_vec(); + suite.must_kv_prewrite(1, vec![m2], k2.clone(), 20.into()); + let mut events = receive_event(false).events.to_vec(); + match events.remove(0).event.unwrap() { + Event_oneof_event::Entries(mut es) => { + let row = &es.take_entries().to_vec()[0]; + assert_eq!(row.get_value(), b"v3"); + assert_eq!(row.get_old_value(), b"v2"); + assert_eq!(row.get_type(), EventLogType::Prewrite); + assert_eq!(row.get_start_ts(), 20); + } + other => panic!("unknown event {:?}", other), + } + // k2 old value must be cached. + scheduler + .schedule(Task::Validate(Validate::OldValueCache(Box::new( + move |old_value_cache| { + assert_eq!(old_value_cache.access_count, 3); + assert_eq!(old_value_cache.miss_count, 0); + }, + )))) + .unwrap(); + suite.must_kv_commit(1, vec![k2], 20.into(), 25.into()); + let mut events = receive_event(false).events.to_vec(); + match events.remove(0).event.unwrap() { + Event_oneof_event::Entries(mut es) => { + let row = &es.take_entries().to_vec()[0]; + assert_eq!(row.get_type(), EventLogType::Commit); + assert_eq!(row.get_commit_ts(), 25); + } + other => panic!("unknown event {:?}", other), + } + + suite.stop(); +} + #[test] fn test_region_created_replicate() { let cluster = new_server_cluster(0, 2); diff --git a/components/tikv_util/src/lru.rs b/components/tikv_util/src/lru.rs index 3ef91dc4f49..a22e48af612 100644 --- a/components/tikv_util/src/lru.rs +++ b/components/tikv_util/src/lru.rs @@ -248,6 +248,14 @@ where base: self.map.iter(), } } + + pub fn len(&self) -> usize { + self.map.len() + } + + pub fn is_empty(&self) -> bool { + self.map.is_empty() + } } unsafe impl Send for LruCache {} diff --git a/components/txn_types/src/types.rs b/components/txn_types/src/types.rs index ac774025620..869e305d736 100644 --- a/components/txn_types/src/types.rs +++ b/components/txn_types/src/types.rs @@ -1,7 +1,6 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. use super::timestamp::TimeStamp; -use crate::Write; use byteorder::{ByteOrder, NativeEndian}; use collections::HashMap; use kvproto::kvrpcpb; @@ -247,17 +246,6 @@ pub enum MutationType { Other, } -impl MutationType { - pub fn may_have_old_value(&self) -> bool { - matches!( - self, - // Insert operations don't have old value but need to update a flag - // for indicating that not seeking for old value for it. - MutationType::Put | MutationType::Delete | MutationType::Insert - ) - } -} - /// A row mutation. #[derive(Debug, Clone)] pub enum Mutation { @@ -340,7 +328,9 @@ pub enum OldValue { }, /// `None` means we don't found a previous value None, - /// `Unspecified` means the user doesn't care about the previous value + /// `Unspecified` means one of the following: + /// - The user doesn't care about the previous value + /// - We don't sure if there is a previous value Unspecified, } @@ -350,27 +340,11 @@ impl Default for OldValue { } } -impl From> for OldValue { - fn from(write: Option) -> Self { - match write { - Some(w) => OldValue::Value { - short_value: w.short_value, - start_ts: w.start_ts, - }, - None => OldValue::None, - } - } -} - impl OldValue { - pub fn specified(&self) -> bool { + pub fn valid(&self) -> bool { !matches!(self, OldValue::Unspecified) } - pub fn exists(&self) -> bool { - matches!(self, OldValue::Value { .. }) - } - pub fn size(&self) -> usize { let value_size = match self { OldValue::Value { @@ -379,7 +353,7 @@ impl OldValue { } => v.len(), _ => 0, }; - value_size + std::mem::size_of::() + value_size + std::mem::size_of::() } } @@ -513,4 +487,22 @@ mod tests { assert!(!encoded.is_encoded_from(&longer_raw)); } } + + #[test] + fn test_old_value_valid() { + let cases = vec![ + (OldValue::Unspecified, false), + (OldValue::None, true), + ( + OldValue::Value { + short_value: None, + start_ts: 0.into(), + }, + true, + ), + ]; + for (old_value, v) in cases { + assert_eq!(old_value.valid(), v); + } + } } diff --git a/components/txn_types/src/write.rs b/components/txn_types/src/write.rs index eb992d3f1ff..7e59157f4ee 100644 --- a/components/txn_types/src/write.rs +++ b/components/txn_types/src/write.rs @@ -218,10 +218,6 @@ impl Write { gc_fence: self.gc_fence, } } - - pub fn may_have_old_value(&self) -> bool { - matches!(self.write_type, WriteType::Put | WriteType::Delete) - } } #[derive(PartialEq, Clone)] diff --git a/src/storage/mvcc/reader/reader.rs b/src/storage/mvcc/reader/reader.rs index 0ccbe0ffc4e..86f3ebd050e 100644 --- a/src/storage/mvcc/reader/reader.rs +++ b/src/storage/mvcc/reader/reader.rs @@ -65,8 +65,8 @@ impl SnapshotReader { } #[inline(always)] - pub fn get_old_value(&mut self, prev_write: Write) -> Result { - self.reader.get_old_value(self.start_ts, prev_write) + pub fn get_old_value(&mut self, key: &Key, prev_write: Write) -> Result { + self.reader.get_old_value(key, self.start_ts, prev_write) } #[inline(always)] @@ -448,18 +448,43 @@ impl MvccReader { /// Read the old value for key for CDC. /// `prev_write` stands for the previous write record of the key /// it must be read in the caller and be passed in for optimization - fn get_old_value(&mut self, start_ts: TimeStamp, prev_write: Write) -> Result { - if prev_write.may_have_old_value() - && prev_write - .as_ref() - .check_gc_fence_as_latest_version(start_ts) + fn get_old_value( + &mut self, + key: &Key, + start_ts: TimeStamp, + prev_write: Write, + ) -> Result { + if !prev_write + .as_ref() + .check_gc_fence_as_latest_version(start_ts) { - Ok(OldValue::Value { - short_value: prev_write.short_value, - start_ts: prev_write.start_ts, - }) - } else { - Ok(OldValue::None) + return Ok(OldValue::None); + } + match prev_write.write_type { + WriteType::Put => { + // For Put, there must be an old value either in its + // short value or in the default CF. + Ok(OldValue::Value { + short_value: prev_write.short_value, + start_ts: prev_write.start_ts, + }) + } + WriteType::Delete => { + // For Delete, no old value. + Ok(OldValue::None) + } + WriteType::Rollback | WriteType::Lock => { + // For Rollback and Lock, it's unknown whether there is a more + // previous valid write. Call `get_write` to get a valid + // previous write. + Ok(match self.get_write(key, start_ts, Some(start_ts))? { + Some(write) => OldValue::Value { + short_value: write.short_value, + start_ts: write.start_ts, + }, + None => OldValue::None, + }) + } } } } @@ -1636,7 +1661,10 @@ pub mod tests { }, // prev_write is Rollback, and there exists a more previous valid write Case { - expected: OldValue::None, + expected: OldValue::Value { + short_value: None, + start_ts: TimeStamp::new(4), + }, written: vec![ ( @@ -1649,10 +1677,26 @@ pub mod tests { ), ], }, + Case { + expected: OldValue::Value { + short_value: Some(b"v".to_vec()), + start_ts: TimeStamp::new(4), + }, + + written: vec![ + ( + Write::new(WriteType::Put, TimeStamp::new(4), Some(b"v".to_vec())), + TimeStamp::new(6), + ), + ( + Write::new(WriteType::Rollback, TimeStamp::new(5), None), + TimeStamp::new(7), + ), + ], + }, // prev_write is Rollback, and there isn't a more previous valid write Case { expected: OldValue::None, - written: vec![( Write::new(WriteType::Rollback, TimeStamp::new(5), None), TimeStamp::new(6), @@ -1660,7 +1704,10 @@ pub mod tests { }, // prev_write is Lock, and there exists a more previous valid write Case { - expected: OldValue::None, + expected: OldValue::Value { + short_value: None, + start_ts: TimeStamp::new(3), + }, written: vec![ ( @@ -1676,7 +1723,6 @@ pub mod tests { // prev_write is Lock, and there isn't a more previous valid write Case { expected: OldValue::None, - written: vec![( Write::new(WriteType::Lock, TimeStamp::new(5), None), TimeStamp::new(6), @@ -1703,8 +1749,37 @@ pub mod tests { TimeStamp::new(5), )], }, + // prev_write is Delete, check_gc_fence_as_latest_version is true + Case { + expected: OldValue::None, + written: vec![ + ( + Write::new(WriteType::Put, TimeStamp::new(3), None), + TimeStamp::new(6), + ), + ( + Write::new(WriteType::Delete, TimeStamp::new(7), None), + TimeStamp::new(8), + ), + ], + }, + // prev_write is Delete, check_gc_fence_as_latest_version is false + Case { + expected: OldValue::None, + written: vec![ + ( + Write::new(WriteType::Put, TimeStamp::new(3), None), + TimeStamp::new(6), + ), + ( + Write::new(WriteType::Delete, TimeStamp::new(7), None) + .set_overlapped_rollback(true, Some(6.into())), + TimeStamp::new(8), + ), + ], + }, ]; - for case in cases { + for (i, case) in cases.into_iter().enumerate() { let engine = TestEngineBuilder::new().build().unwrap(); let cm = ConcurrencyManager::new(42.into()); let mut txn = MvccTxn::new(TimeStamp::new(10), cm.clone()); @@ -1725,9 +1800,9 @@ pub mod tests { .unwrap() .1; let result = reader - .get_old_value(TimeStamp::new(25), prev_write) + .get_old_value(&Key::from_raw(b"a"), TimeStamp::new(25), prev_write) .unwrap(); - assert_eq!(result, case.expected); + assert_eq!(result, case.expected, "case #{}", i); } } } diff --git a/src/storage/txn/actions/prewrite.rs b/src/storage/txn/actions/prewrite.rs index 3cdfba99404..353992a949e 100644 --- a/src/storage/txn/actions/prewrite.rs +++ b/src/storage/txn/actions/prewrite.rs @@ -75,10 +75,27 @@ pub fn prewrite( return Ok((min_commit_ts, OldValue::Unspecified)); } - let old_value = if txn_props.need_old_value && mutation.mutation_type.may_have_old_value() { - if let Some(w) = prev_write { - reader.get_old_value(w)? + let old_value = if txn_props.need_old_value + && matches!( + mutation.mutation_type, + // Only Put, Delete and Insert may have old value. + MutationType::Put | MutationType::Delete | MutationType::Insert + ) { + if mutation.mutation_type == MutationType::Insert { + // The previous write of an Insert is guaranteed to be None. + OldValue::None + } else if mutation.skip_constraint_check() { + // The mutation does not read previous write if it skips constraint + // check. + // Pessimistic transaction always skip constraint check in + // "prewrite" stage, as it checks constraint in + // "acquire pessimistic lock" stage. + OldValue::Unspecified + } else if let Some(w) = prev_write { + // The mutation reads and get a previous write. + reader.get_old_value(&mutation.key, w)? } else { + // There is no previous write. OldValue::None } } else { @@ -495,13 +512,15 @@ pub mod tests { commands::prewrite::fallback_1pc_locks, tests::{ must_acquire_pessimistic_lock, must_cleanup_with_gc_fence, must_commit, - must_prewrite_lock, must_prewrite_put, + must_prewrite_delete, must_prewrite_lock, must_prewrite_put, must_rollback, }, }; use crate::storage::{mvcc::tests::*, Engine}; use concurrency_manager::ConcurrencyManager; use kvproto::kvrpcpb::Context; #[cfg(test)] + use rand::{Rng, SeedableRng}; + #[cfg(test)] use txn_types::OldValue; fn optimistic_txn_props(primary: &[u8], start_ts: TimeStamp) -> TransactionProperties<'_> { @@ -537,7 +556,7 @@ pub mod tests { txn_size, lock_ttl: 2000, min_commit_ts: 10.into(), - need_old_value: false, + need_old_value: true, } } @@ -556,14 +575,23 @@ pub mod tests { let mut txn = MvccTxn::new(ts, cm); let mut reader = SnapshotReader::new(ts, snapshot, true); - prewrite( + let mut props = optimistic_txn_props(pk, ts); + props.need_old_value = true; + let (_, old_value) = prewrite( &mut txn, &mut reader, - &optimistic_txn_props(pk, ts), + &props, Mutation::Insert((Key::from_raw(key), value.to_vec())), &None, false, )?; + // Insert must be None if the key is not lock, or be Unspecified if the + // key is already locked. + assert!( + matches!(old_value, OldValue::None | OldValue::Unspecified), + "{:?}", + old_value + ); write(engine, &ctx, txn.into_modifies()); Ok(()) } @@ -580,14 +608,15 @@ pub mod tests { let mut txn = MvccTxn::new(ts, cm); let mut reader = SnapshotReader::new(ts, snapshot, true); - prewrite( + let (_, old_value) = prewrite( &mut txn, &mut reader, &optimistic_txn_props(pk, ts), Mutation::CheckNotExists(Key::from_raw(key)), &None, - false, + true, )?; + assert_eq!(old_value, OldValue::Unspecified); Ok(()) } @@ -601,7 +630,7 @@ pub mod tests { let mut reader = SnapshotReader::new(10.into(), snapshot, true); // calculated commit_ts = 43 ≤ 50, ok - prewrite( + let (_, old_value) = prewrite( &mut txn, &mut reader, &optimistic_async_props(b"k1", 10.into(), 50.into(), 2, false), @@ -610,6 +639,7 @@ pub mod tests { false, ) .unwrap(); + assert_eq!(old_value, OldValue::None); cm.update_max_ts(60.into()); // calculated commit_ts = 61 > 50, err @@ -647,7 +677,7 @@ pub mod tests { props.min_commit_ts = 11.into(); let mut txn = MvccTxn::new(10.into(), cm.clone()); let mut reader = SnapshotReader::new(10.into(), snapshot.clone(), false); - let (min_ts, _) = prewrite( + let (min_ts, old_value) = prewrite( &mut txn, &mut reader, &props, @@ -659,13 +689,14 @@ pub mod tests { assert!(min_ts > props.start_ts); assert!(min_ts >= props.min_commit_ts); assert!(min_ts < 41.into()); + assert_eq!(old_value, OldValue::Unspecified); // `checkNotExists` is equivalent to a get operation, so it should update the max_ts. let mut props = optimistic_txn_props(b"k0", 42.into()); props.min_commit_ts = 43.into(); let mut txn = MvccTxn::new(42.into(), cm.clone()); let mut reader = SnapshotReader::new(42.into(), snapshot.clone(), false); - prewrite( + let (_, old_value) = prewrite( &mut txn, &mut reader, &props, @@ -675,11 +706,12 @@ pub mod tests { ) .unwrap(); assert_eq!(cm.max_ts(), props.start_ts); + assert_eq!(old_value, OldValue::Unspecified); // should_write mutations' min_commit_ts must be > max_ts let mut txn = MvccTxn::new(10.into(), cm.clone()); let mut reader = SnapshotReader::new(10.into(), snapshot.clone(), false); - let (min_ts, _) = prewrite( + let (min_ts, old_value) = prewrite( &mut txn, &mut reader, &optimistic_async_props(b"k1", 10.into(), 50.into(), 2, false), @@ -690,6 +722,7 @@ pub mod tests { .unwrap(); assert!(min_ts > 42.into()); assert!(min_ts < 50.into()); + assert_eq!(old_value, OldValue::None); for &should_not_write in &[false, true] { let mutation = if should_not_write { @@ -701,7 +734,7 @@ pub mod tests { // min_commit_ts must be > start_ts let mut txn = MvccTxn::new(44.into(), cm.clone()); let mut reader = SnapshotReader::new(44.into(), snapshot.clone(), false); - let (min_ts, _) = prewrite( + let (min_ts, old_value) = prewrite( &mut txn, &mut reader, &optimistic_async_props(b"k3", 44.into(), 50.into(), 2, false), @@ -713,12 +746,17 @@ pub mod tests { assert!(min_ts > 44.into()); assert!(min_ts < 50.into()); txn.take_guards(); + if should_not_write { + assert_eq!(old_value, OldValue::Unspecified); + } else { + assert_eq!(old_value, OldValue::None); + } // min_commit_ts must be > for_update_ts if !should_not_write { let mut props = optimistic_async_props(b"k5", 44.into(), 50.into(), 2, false); props.kind = TransactionKind::Pessimistic(45.into()); - let (min_ts, _) = prewrite( + let (min_ts, old_value) = prewrite( &mut txn, &mut reader, &props, @@ -730,12 +768,14 @@ pub mod tests { assert!(min_ts > 45.into()); assert!(min_ts < 50.into()); txn.take_guards(); + // Pessimistic txn skips constraint check, does not read previous write. + assert_eq!(old_value, OldValue::Unspecified); } // min_commit_ts must be >= txn min_commit_ts let mut props = optimistic_async_props(b"k7", 44.into(), 50.into(), 2, false); props.min_commit_ts = 46.into(); - let (min_ts, _) = prewrite( + let (min_ts, old_value) = prewrite( &mut txn, &mut reader, &props, @@ -747,6 +787,11 @@ pub mod tests { assert!(min_ts >= 46.into()); assert!(min_ts < 50.into()); txn.take_guards(); + if should_not_write { + assert_eq!(old_value, OldValue::Unspecified); + } else { + assert_eq!(old_value, OldValue::None); + } } } @@ -760,7 +805,7 @@ pub mod tests { let mut txn = MvccTxn::new(10.into(), cm.clone()); let mut reader = SnapshotReader::new(10.into(), snapshot, false); // calculated commit_ts = 43 ≤ 50, ok - prewrite( + let (_, old_value) = prewrite( &mut txn, &mut reader, &optimistic_async_props(b"k1", 10.into(), 50.into(), 2, true), @@ -769,6 +814,7 @@ pub mod tests { false, ) .unwrap(); + assert_eq!(old_value, OldValue::None); cm.update_max_ts(60.into()); // calculated commit_ts = 61 > 50, err @@ -807,7 +853,7 @@ pub mod tests { let mut txn = MvccTxn::new(ts, cm); let mut reader = SnapshotReader::new(ts, snapshot, false); - prewrite( + let (_, old_value) = prewrite( &mut txn, &mut reader, &TransactionProperties { @@ -818,12 +864,13 @@ pub mod tests { txn_size: 0, lock_ttl: 0, min_commit_ts: TimeStamp::default(), - need_old_value: false, + need_old_value: true, }, Mutation::CheckNotExists(Key::from_raw(key)), &None, false, )?; + assert_eq!(old_value, OldValue::Unspecified); Ok(()) } @@ -847,10 +894,10 @@ pub mod tests { txn_size: 2, lock_ttl: 2000, min_commit_ts: 10.into(), - need_old_value: false, + need_old_value: true, }; // calculated commit_ts = 43 ≤ 50, ok - prewrite( + let (_, old_value) = prewrite( &mut txn, &mut reader, &txn_props, @@ -859,6 +906,8 @@ pub mod tests { true, ) .unwrap(); + // Pessimistic txn skips constraint check, does not read previous write. + assert_eq!(old_value, OldValue::Unspecified); cm.update_max_ts(60.into()); // calculated commit_ts = 61 > 50, ok @@ -893,10 +942,10 @@ pub mod tests { txn_size: 2, lock_ttl: 2000, min_commit_ts: 10.into(), - need_old_value: false, + need_old_value: true, }; // calculated commit_ts = 43 ≤ 50, ok - prewrite( + let (_, old_value) = prewrite( &mut txn, &mut reader, &txn_props, @@ -905,6 +954,8 @@ pub mod tests { true, ) .unwrap(); + // Pessimistic txn skips constraint check, does not read previous write. + assert_eq!(old_value, OldValue::Unspecified); cm.update_max_ts(60.into()); // calculated commit_ts = 61 > 50, ok @@ -998,7 +1049,7 @@ pub mod tests { txn_size: 6, lock_ttl: 2000, min_commit_ts: 51.into(), - need_old_value: false, + need_old_value: true, }; let cases = vec![ @@ -1021,7 +1072,8 @@ pub mod tests { false, ); if success { - res.unwrap(); + let res = res.unwrap(); + assert_eq!(res.1, OldValue::Unspecified); } else { res.unwrap_err(); } @@ -1035,7 +1087,8 @@ pub mod tests { false, ); if success { - res.unwrap(); + let res = res.unwrap(); + assert_eq!(res.1, OldValue::None); } else { res.unwrap_err(); } @@ -1061,8 +1114,7 @@ pub mod tests { (b"k1" as &[u8], None), (b"k2", Some((b"v2" as &[u8], 11))), (b"k3", None), - // `get_old_value` won't seek before Lock record - (b"k4", None), + (b"k4", Some((b"v4", 13))), (b"k5", None), (b"k6", Some((b"v6x", 22))), (b"k7", None), @@ -1089,7 +1141,220 @@ pub mod tests { false, ) .unwrap(); - assert_eq!(&old_value, expected_value); + assert_eq!(&old_value, expected_value, "key: {}", key); + } + } + + #[test] + fn test_prewrite_old_value_rollback_and_lock() { + let engine_rollback = crate::storage::TestEngineBuilder::new().build().unwrap(); + + must_prewrite_put(&engine_rollback, b"k1", b"v1", b"k1", 10); + must_commit(&engine_rollback, b"k1", 10, 30); + + must_prewrite_put(&engine_rollback, b"k1", b"v2", b"k1", 40); + must_rollback(&engine_rollback, b"k1", 40, false); + + let engine_lock = crate::storage::TestEngineBuilder::new().build().unwrap(); + + must_prewrite_put(&engine_lock, b"k1", b"v1", b"k1", 10); + must_commit(&engine_lock, b"k1", 10, 30); + + must_prewrite_lock(&engine_lock, b"k1", b"k1", 40); + must_commit(&engine_lock, b"k1", 40, 45); + + for engine in &[engine_rollback, engine_lock] { + let start_ts = TimeStamp::from(50); + let txn_props = TransactionProperties { + start_ts, + kind: TransactionKind::Optimistic(false), + commit_kind: CommitKind::TwoPc, + primary: b"k1", + txn_size: 0, + lock_ttl: 0, + min_commit_ts: TimeStamp::default(), + need_old_value: true, + }; + let snapshot = engine.snapshot(Default::default()).unwrap(); + let cm = ConcurrencyManager::new(start_ts); + let mut txn = MvccTxn::new(start_ts, cm); + let mut reader = SnapshotReader::new(start_ts, snapshot, true); + let (_, old_value) = prewrite( + &mut txn, + &mut reader, + &txn_props, + Mutation::Put((Key::from_raw(b"k1"), b"value".to_vec())), + &None, + false, + ) + .unwrap(); + assert_eq!( + old_value, + OldValue::Value { + short_value: Some(b"v1".to_vec()), + start_ts: 10.into(), + } + ); + } + } + + #[test] + fn test_prewrite_old_value_put_delete_lock_insert() { + let engine = crate::storage::TestEngineBuilder::new().build().unwrap(); + + must_prewrite_put(&engine, b"k1", b"v1", b"k1", 10); + must_commit(&engine, b"k1", 10, 20); + + must_prewrite_delete(&engine, b"k1", b"k1", 30); + must_commit(&engine, b"k1", 30, 40); + + must_prewrite_lock(&engine, b"k1", b"k1", 50); + must_commit(&engine, b"k1", 50, 60); + + let start_ts = TimeStamp::from(70); + let txn_props = TransactionProperties { + start_ts, + kind: TransactionKind::Optimistic(false), + commit_kind: CommitKind::TwoPc, + primary: b"k1", + txn_size: 0, + lock_ttl: 0, + min_commit_ts: TimeStamp::default(), + need_old_value: true, + }; + let snapshot = engine.snapshot(Default::default()).unwrap(); + let cm = ConcurrencyManager::new(start_ts); + let mut txn = MvccTxn::new(start_ts, cm); + let mut reader = SnapshotReader::new(start_ts, snapshot, true); + let (_, old_value) = prewrite( + &mut txn, + &mut reader, + &txn_props, + Mutation::Insert((Key::from_raw(b"k1"), b"v2".to_vec())), + &None, + false, + ) + .unwrap(); + assert_eq!(old_value, OldValue::None); + } + + #[test] + fn test_prewrite_old_value_random() { + let mut ts = 1u64; + let mut tso = || { + ts += 1; + ts + }; + + use std::time::SystemTime; + // A simple valid operation sequence: p[iprld]* + // i: insert, p: put, r: rollback, l: lock, d: delete + let seed = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + let mut rg = rand::rngs::StdRng::seed_from_u64(seed); + + // Generate 1000 random cases; + let engine = crate::storage::TestEngineBuilder::new().build().unwrap(); + let cases = 1000; + for _ in 0..cases { + // At most 12 ops per-case. + let ops_count = rg.gen::() % 12; + let ops = (0..ops_count) + .into_iter() + .enumerate() + .map(|(i, _)| { + if i == 0 { + // The first op must be put. + 0 + } else { + rg.gen::() % 4 + } + }) + .collect::>(); + + for (i, op) in ops.iter().enumerate() { + let start_ts = tso(); + let commit_ts = tso(); + + match op { + 0 => { + must_prewrite_put(&engine, b"k1", &[i as u8], b"k1", start_ts); + must_commit(&engine, b"k1", start_ts, commit_ts); + } + 1 => { + must_prewrite_delete(&engine, b"k1", b"k1", start_ts); + must_commit(&engine, b"k1", start_ts, commit_ts); + } + 2 => { + must_prewrite_lock(&engine, b"k1", b"k1", start_ts); + must_commit(&engine, b"k1", start_ts, commit_ts); + } + 3 => { + must_prewrite_put(&engine, b"k1", &[i as u8], b"k1", start_ts); + must_rollback(&engine, b"k1", start_ts, false); + } + _ => unreachable!(), + } + } + let start_ts = TimeStamp::from(tso()); + let snapshot = engine.snapshot(Default::default()).unwrap(); + let cm = ConcurrencyManager::new(start_ts); + let expect = { + let mut reader = SnapshotReader::new(start_ts, snapshot.clone(), true); + if let Some(write) = reader + .reader + .get_write(&Key::from_raw(b"k1"), start_ts, Some(start_ts)) + .unwrap() + { + assert_eq!(write.write_type, WriteType::Put); + OldValue::Value { + short_value: write.short_value, + start_ts: write.start_ts, + } + } else { + OldValue::None + } + }; + + let mut txn = MvccTxn::new(start_ts, cm.clone()); + let mut reader = SnapshotReader::new(start_ts, snapshot.clone(), true); + let txn_props = TransactionProperties { + start_ts, + kind: TransactionKind::Optimistic(false), + commit_kind: CommitKind::TwoPc, + primary: b"k1", + txn_size: 0, + lock_ttl: 0, + min_commit_ts: TimeStamp::default(), + need_old_value: true, + }; + let (_, old_value) = prewrite( + &mut txn, + &mut reader, + &txn_props, + Mutation::Put((Key::from_raw(b"k1"), b"v2".to_vec())), + &None, + false, + ) + .unwrap(); + assert_eq!(old_value, expect, "seed: {} ops: {:?}", seed, ops); + + if expect == OldValue::None { + let mut txn = MvccTxn::new(start_ts, cm); + let mut reader = SnapshotReader::new(start_ts, snapshot, true); + let (_, old_value) = prewrite( + &mut txn, + &mut reader, + &txn_props, + Mutation::Insert((Key::from_raw(b"k1"), b"v2".to_vec())), + &None, + false, + ) + .unwrap(); + assert_eq!(old_value, expect, "seed: {} ops: {:?}", seed, ops); + } } } } diff --git a/src/storage/txn/commands/prewrite.rs b/src/storage/txn/commands/prewrite.rs index c66b9fc3894..191cad5172c 100644 --- a/src/storage/txn/commands/prewrite.rs +++ b/src/storage/txn/commands/prewrite.rs @@ -451,7 +451,7 @@ impl Prewriter { if need_min_commit_ts && final_min_commit_ts < ts { final_min_commit_ts = ts; } - if old_value.specified() { + if old_value.valid() { let key = key.append_ts(txn.start_ts); self.old_values.insert(key, (old_value, mutation_type)); }