Skip to content

Commit

Permalink
cdc: skip seek old value for Put if cache returns None (tikv#10031)
Browse files Browse the repository at this point in the history
* Revert "Fix reuse of invalid write cursor (tikv#9916)"

This reverts commit 60101e9.

Signed-off-by: Neil Shen <[email protected]>

* txn, mvcc: fix reuse of invalid write cursor

Signed-off-by: Neil Shen <[email protected]>

* txn: add more tests

Signed-off-by: Neil Shen <[email protected]>

* cdc: skip seek old value for Put if cache returns None

Signed-off-by: Neil Shen <[email protected]>

* cdc: add old value cache length and capacity metrics

Signed-off-by: Neil Shen <[email protected]>

* add random test

Signed-off-by: Neil Shen <[email protected]>

* address comments

Signed-off-by: Neil Shen <[email protected]>

* fix tests

Signed-off-by: Neil Shen <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
overvenus and ti-chi-bot authored Apr 20, 2021
1 parent c6cbec4 commit 95ebc5c
Show file tree
Hide file tree
Showing 12 changed files with 648 additions and 157 deletions.
28 changes: 12 additions & 16 deletions components/cdc/src/delegate.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use std::cell::RefCell;
use std::mem;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};

Expand Down Expand Up @@ -32,7 +30,6 @@ use raftstore::store::fsm::ObserveID;
use raftstore::store::util::compare_region_epoch;
use raftstore::Error as RaftStoreError;
use resolved_ts::Resolver;
use tikv::storage::Statistics;
use tikv::{server::raftkv::WriteBatchFlags, storage::txn::TxnEntry};
use tikv_util::time::Instant;
use tikv_util::{debug, info, warn};
Expand Down Expand Up @@ -464,7 +461,7 @@ impl Delegate {
pub fn on_batch(
&mut self,
batch: CmdBatch,
old_value_cb: Rc<RefCell<OldValueCallback>>,
old_value_cb: &OldValueCallback,
old_value_cache: &mut OldValueCache,
) -> Result<()> {
// Stale CmdBatch, drop it sliently.
Expand All @@ -485,7 +482,7 @@ impl Delegate {
self.sink_data(
index,
request.requests.into(),
old_value_cb.clone(),
old_value_cb,
old_value_cache,
is_one_pc,
)?;
Expand Down Expand Up @@ -613,7 +610,7 @@ impl Delegate {
&mut self,
index: u64,
requests: Vec<Request>,
old_value_cb: Rc<RefCell<OldValueCallback>>,
old_value_cb: &OldValueCallback,
old_value_cache: &mut OldValueCache,
is_one_pc: bool,
) -> Result<()> {
Expand All @@ -622,19 +619,18 @@ impl Delegate {
if txn_extra_op == TxnExtraOp::ReadOldValue {
let key = Key::from_raw(&row.key).append_ts(row.start_ts.into());
let start = Instant::now();

let mut statistics = Statistics::default();
row.old_value =
old_value_cb.borrow_mut()(key, read_old_ts, old_value_cache, &mut statistics)
.unwrap_or_default();
let (old_value, statistics) = old_value_cb(key, read_old_ts, old_value_cache);
row.old_value = old_value.unwrap_or_default();
CDC_OLD_VALUE_DURATION_HISTOGRAM
.with_label_values(&["all"])
.observe(start.elapsed().as_secs_f64());
for (cf, cf_details) in statistics.details().iter() {
for (tag, count) in cf_details.iter() {
CDC_OLD_VALUE_SCAN_DETAILS
.with_label_values(&[*cf, *tag])
.inc_by(*count as i64);
if let Some(statistics) = statistics {
for (cf, cf_details) in statistics.details().iter() {
for (tag, count) in cf_details.iter() {
CDC_OLD_VALUE_SCAN_DETAILS
.with_label_values(&[*cf, *tag])
.inc_by(*count as i64);
}
}
}
}
Expand Down
36 changes: 24 additions & 12 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use std::cell::RefCell;
use std::fmt;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::time::Duration;

Expand Down Expand Up @@ -104,7 +103,7 @@ impl fmt::Debug for Deregister {

type InitCallback = Box<dyn FnOnce() + Send>;
pub(crate) type OldValueCallback =
Box<dyn FnMut(Key, TimeStamp, &mut OldValueCache, &mut Statistics) -> Option<Vec<u8>> + Send>;
Box<dyn Fn(Key, TimeStamp, &mut OldValueCache) -> (Option<Vec<u8>>, Option<Statistics>) + Send>;

pub struct OldValueCache {
pub cache: LruCache<Key, (OldValue, MutationType)>,
Expand All @@ -122,6 +121,11 @@ impl OldValueCache {
}
}

pub enum Validate {
Region(u64, Box<dyn FnOnce(Option<&Delegate>) + Send>),
OldValueCache(Box<dyn FnOnce(&OldValueCache) + Send>),
}

pub enum Task {
Register {
request: ChangeDataRequest,
Expand Down Expand Up @@ -160,7 +164,7 @@ pub enum Task {
cb: InitCallback,
},
TxnExtra(TxnExtra),
Validate(u64, Box<dyn FnOnce(Option<&Delegate>) + Send>),
Validate(Validate),
}

impl_display_as_debug!(Task);
Expand Down Expand Up @@ -225,7 +229,10 @@ impl fmt::Debug for Task {
.field("downstream", &downstream_id)
.finish(),
Task::TxnExtra(_) => de.field("type", &"txn_extra").finish(),
Task::Validate(region_id, _) => de.field("region_id", &region_id).finish(),
Task::Validate(validate) => match validate {
Validate::Region(region_id, _) => de.field("region_id", &region_id).finish(),
Validate::OldValueCache(_) => de.finish(),
},
}
}
}
Expand Down Expand Up @@ -288,6 +295,8 @@ impl<T: 'static + RaftStoreRouter<RocksEngine>> Endpoint<T> {
.core_threads(1)
.build()
.unwrap();
CDC_OLD_VALUE_CACHE_CAP.set(cfg.old_value_cache_size as i64);
let old_value_cache = OldValueCache::new(cfg.old_value_cache_size);
let ep = Endpoint {
env,
security_mgr,
Expand All @@ -307,7 +316,7 @@ impl<T: 'static + RaftStoreRouter<RocksEngine>> Endpoint<T> {
min_ts_interval: cfg.min_ts_interval.0,
min_resolved_ts: TimeStamp::max(),
min_ts_region_id: 0,
old_value_cache: OldValueCache::new(cfg.old_value_cache_size),
old_value_cache,
hibernate_regions_compatible: cfg.hibernate_regions_compatible,
tikv_clients: Arc::new(Mutex::new(HashMap::default())),
};
Expand Down Expand Up @@ -632,7 +641,6 @@ impl<T: 'static + RaftStoreRouter<RocksEngine>> Endpoint<T> {

pub fn on_multi_batch(&mut self, multi: Vec<CmdBatch>, old_value_cb: OldValueCallback) {
fail_point!("cdc_before_handle_multi_batch", |_| {});
let old_value_cb = Rc::new(RefCell::new(old_value_cb));
for batch in multi {
let region_id = batch.region_id;
let mut deregister = None;
Expand All @@ -641,9 +649,7 @@ impl<T: 'static + RaftStoreRouter<RocksEngine>> Endpoint<T> {
// Skip the batch if the delegate has failed.
continue;
}
if let Err(e) =
delegate.on_batch(batch, old_value_cb.clone(), &mut self.old_value_cache)
{
if let Err(e) = delegate.on_batch(batch, &old_value_cb, &mut self.old_value_cache) {
assert!(delegate.has_failed());
// Delegate has error, deregister the corresponding region.
deregister = Some(Deregister::Region {
Expand Down Expand Up @@ -1510,9 +1516,14 @@ impl<T: 'static + RaftStoreRouter<RocksEngine>> Runnable for Endpoint<T> {
self.old_value_cache.cache.insert(k, v);
}
}
Task::Validate(region_id, validate) => {
validate(self.capture_regions.get(&region_id));
}
Task::Validate(validate) => match validate {
Validate::Region(region_id, validate) => {
validate(self.capture_regions.get(&region_id));
}
Validate::OldValueCache(validate) => {
validate(&self.old_value_cache);
}
},
}
self.flush_all();
}
Expand All @@ -1537,6 +1548,7 @@ impl<T: 'static + RaftStoreRouter<RocksEngine>> RunnableWithTimer for Endpoint<T
CDC_OLD_VALUE_CACHE_BYTES.set(cache_size as i64);
CDC_OLD_VALUE_CACHE_ACCESS.add(self.old_value_cache.access_count as i64);
CDC_OLD_VALUE_CACHE_MISS.add(self.old_value_cache.miss_count as i64);
CDC_OLD_VALUE_CACHE_LEN.set(self.old_value_cache.cache.len() as i64);
self.old_value_cache.access_count = 0;
self.old_value_cache.miss_count = 0;
}
Expand Down
2 changes: 1 addition & 1 deletion components/cdc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod observer;
mod rate_limiter;
mod service;

pub use endpoint::{CdcTxnExtraScheduler, Endpoint, Task};
pub use endpoint::{CdcTxnExtraScheduler, Endpoint, Task, Validate};
pub use errors::{Error, Result};
pub use observer::CdcObserver;
pub use service::Service;
10 changes: 10 additions & 0 deletions components/cdc/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ lazy_static! {
"Total number of CDC captured regions"
)
.unwrap();
pub static ref CDC_OLD_VALUE_CACHE_LEN: IntGauge = register_int_gauge!(
"tikv_cdc_old_value_cache_length",
"Number of elements in old value cache"
)
.unwrap();
pub static ref CDC_OLD_VALUE_CACHE_CAP: IntGauge = register_int_gauge!(
"tikv_cdc_old_value_cache_capacity",
"Capacity of old value cache"
)
.unwrap();
pub static ref CDC_OLD_VALUE_CACHE_MISS: IntGauge = register_int_gauge!(
"tikv_cdc_old_value_cache_miss",
"Count of old value cache missing"
Expand Down
52 changes: 25 additions & 27 deletions components/cdc/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,50 +125,52 @@ impl<E: KvEngine> CmdObserver<E> for CdcObserver {
// Create a snapshot here for preventing the old value was GC-ed.
let snapshot =
RegionSnapshot::from_snapshot(Arc::new(engine.snapshot()), Arc::new(region));
let mut reader = OldValueReader::new(snapshot);
let get_old_value = move |key,
query_ts,
old_value_cache: &mut OldValueCache,
statistics: &mut Statistics| {
let reader = OldValueReader::new(snapshot);
let get_old_value = move |key, query_ts, old_value_cache: &mut OldValueCache| {
old_value_cache.access_count += 1;
if let Some((old_value, mutation_type)) = old_value_cache.cache.remove(&key) {
match mutation_type {
MutationType::Insert => {
return None;
}
MutationType::Put | MutationType::Delete => {
if let OldValue::Value {
return match mutation_type {
MutationType::Insert => (None, None),
MutationType::Put | MutationType::Delete => match old_value {
OldValue::None => (None, None),
OldValue::Value {
start_ts,
short_value,
} = old_value
{
return short_value.or_else(|| {
} => {
let mut statistics = None;
let value = short_value.or_else(|| {
statistics = Some(Statistics::default());
let prev_key = key.truncate_ts().unwrap().append_ts(start_ts);
let start = Instant::now();
let mut opts = ReadOptions::new();
opts.set_fill_cache(false);
let value = reader.get_value_default(&prev_key, statistics);
let value = reader
.get_value_default(&prev_key, statistics.as_mut().unwrap());
CDC_OLD_VALUE_DURATION_HISTOGRAM
.with_label_values(&["get"])
.observe(start.elapsed().as_secs_f64());
value
});
(value, statistics)
}
}
// Unspecified should not be added into cache.
OldValue::Unspecified => unreachable!(),
},
_ => unreachable!(),
}
};
}
// Cannot get old value from cache, seek for it in engine.
old_value_cache.miss_count += 1;
let mut statistics = Statistics::default();
let start = Instant::now();
let key = key.truncate_ts().unwrap().append_ts(query_ts);
let value = reader
.near_seek_old_value(&key, statistics)
.near_seek_old_value(&key, &mut statistics)
.unwrap_or_default();
CDC_OLD_VALUE_DURATION_HISTOGRAM
.with_label_values(&["seek"])
.observe(start.elapsed().as_secs_f64());
value
(value, Some(statistics))
};
if let Err(e) = self.sched.schedule(Task::MultiBatch {
multi: batches,
Expand Down Expand Up @@ -247,7 +249,7 @@ impl<S: EngineSnapshot> OldValueReader<S> {
.unwrap()
}

fn get_value_default(&mut self, key: &Key, statistics: &mut Statistics) -> Option<Value> {
fn get_value_default(&self, key: &Key, statistics: &mut Statistics) -> Option<Value> {
statistics.data.get += 1;
let mut opts = ReadOptions::new();
opts.set_fill_cache(false);
Expand All @@ -262,11 +264,7 @@ impl<S: EngineSnapshot> OldValueReader<S> {
/// The key passed in should be a key with a timestamp. This function will returns
/// the latest value of the entry if the user key is the same to the given key and
/// the timestamp is older than or equal to the timestamp in the given key.
fn near_seek_old_value(
&mut self,
key: &Key,
statistics: &mut Statistics,
) -> Result<Option<Value>> {
fn near_seek_old_value(&self, key: &Key, statistics: &mut Statistics) -> Result<Option<Value>> {
let (user_key, seek_ts) = Key::split_on_ts_for(key.as_encoded()).unwrap();
let mut write_cursor = self.new_write_cursor(key);
if write_cursor.near_seek(key, &mut statistics.write)?
Expand Down Expand Up @@ -389,7 +387,7 @@ mod tests {
let key = Key::from_raw(k);

let must_get_eq = |ts: u64, value| {
let mut old_value_reader = OldValueReader::new(Arc::new(kv_engine.snapshot()));
let old_value_reader = OldValueReader::new(Arc::new(kv_engine.snapshot()));
let mut statistics = Statistics::default();
assert_eq!(
old_value_reader
Expand Down Expand Up @@ -440,7 +438,7 @@ mod tests {
let kv_engine = engine.get_rocksdb();

let must_get_eq = |key: &[u8], ts: u64, value| {
let mut old_value_reader = OldValueReader::new(Arc::new(kv_engine.snapshot()));
let old_value_reader = OldValueReader::new(Arc::new(kv_engine.snapshot()));
let mut statistics = Statistics::default();
assert_eq!(
old_value_reader
Expand Down
Loading

0 comments on commit 95ebc5c

Please sign in to comment.