From 435cf0a36dbb1026c32f757f94bf86f7e713fbb1 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Sat, 25 Nov 2023 23:29:35 +0800 Subject: [PATCH 01/14] reproduce issue Signed-off-by: Ping Yu --- Makefile | 4 ++-- tests/integration_tests.rs | 36 +++++++++++++++++++++++------------- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/Makefile b/Makefile index 8014352d..aef0ad45 100644 --- a/Makefile +++ b/Makefile @@ -2,8 +2,8 @@ export RUSTFLAGS=-Dwarnings .PHONY: default check unit-test integration-tests test doc docker-pd docker-kv docker all -PD_ADDRS ?= "127.0.0.1:2379" -MULTI_REGION ?= 1 +export PD_ADDRS ?= 127.0.0.1:2379 +export MULTI_REGION ?= 1 ALL_FEATURES := integration-tests diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 71d48283..24556836 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1028,27 +1028,37 @@ async fn txn_scan_reverse() -> Result<()> { init().await?; let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; - let k1 = b"a1".to_vec(); - let k2 = b"a2".to_vec(); - let v1 = b"b1".to_vec(); - let v2 = b"b2".to_vec(); - - let reverse_resp = vec![ - (Key::from(k2.clone()), v2.clone()), - (Key::from(k1.clone()), v1.clone()), - ]; + // Keys in `keys` should locate in different regions. See `init()` for boundary of regions. + let keys: Vec = vec![ + 0x00000000_u32.to_be_bytes().to_vec(), + 0x40000000_u32.to_be_bytes().to_vec(), + b"a1".to_vec(), // 0x6149 + b"a2".to_vec(), // 0x614A + 0x80000000_u32.to_be_bytes().to_vec(), + 0xC0000000_u32.to_be_bytes().to_vec(), + ] + .into_iter() + .map(Into::into) + .collect(); + let values: Vec> = (0..keys.len()) + .map(|i| format!("v{}", i).into_bytes()) + .collect(); + let bound_range: BoundRange = + (keys.first().unwrap().clone()..=keys.last().unwrap().clone()).into(); // Pessimistic option is not stable in this case. Use optimistic options instead. let option = TransactionOptions::new_optimistic().drop_check(tikv_client::CheckLevel::Warn); let mut t = client.begin_with_options(option.clone()).await?; - t.put(k1.clone(), v1).await?; - t.put(k2.clone(), v2).await?; + let mut reverse_resp = Vec::with_capacity(keys.len()); + for (k, v) in keys.into_iter().zip(values.into_iter()).rev() { + t.put(k.clone(), v.clone()).await?; + reverse_resp.push((k, v)); + } t.commit().await?; let mut t2 = client.begin_with_options(option).await?; - let bound_range: BoundRange = (k1..=k2).into(); let resp = t2 - .scan_reverse(bound_range, 2) + .scan_reverse(bound_range, 100) .await? .map(|kv| (kv.0, kv.1)) .collect::)>>(); From 69862f0c37c26be8a67feeb9c7f73cb89ca96406 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Sun, 26 Nov 2023 17:12:32 +0800 Subject: [PATCH 02/14] fix reverse range Signed-off-by: Ping Yu --- src/raw/requests.rs | 4 ++ src/request/mod.rs | 1 + src/request/shard.rs | 41 +++++++++++++++++++- src/transaction/requests.rs | 3 ++ tests/integration_tests.rs | 75 ++++++++++++++++++++++++++++++++++++- 5 files changed, 120 insertions(+), 4 deletions(-) diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 23bfce73..0be733cf 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -16,6 +16,7 @@ use crate::proto::kvrpcpb; use crate::proto::kvrpcpb::ApiVersion; use crate::proto::metapb; use crate::proto::tikvpb::tikv_client::TikvClient; +use crate::range_request; use crate::request::plan::ResponseWithShard; use crate::request::Collect; use crate::request::CollectSingle; @@ -23,6 +24,7 @@ use crate::request::DefaultProcessor; use crate::request::KvRequest; use crate::request::Merge; use crate::request::Process; +use crate::request::RangeRequest; use crate::request::Shardable; use crate::request::SingleKey; use crate::shardable_key; @@ -227,6 +229,7 @@ impl KvRequest for kvrpcpb::RawDeleteRangeRequest { type Response = kvrpcpb::RawDeleteRangeResponse; } +range_request!(kvrpcpb::RawDeleteRangeRequest); shardable_range!(kvrpcpb::RawDeleteRangeRequest); pub fn new_raw_scan_request( @@ -250,6 +253,7 @@ impl KvRequest for kvrpcpb::RawScanRequest { type Response = kvrpcpb::RawScanResponse; } +range_request!(kvrpcpb::RawScanRequest); // TODO: support reverse raw scan. shardable_range!(kvrpcpb::RawScanRequest); impl Merge for Collect { diff --git a/src/request/mod.rs b/src/request/mod.rs index aecaf26d..8c3a45cb 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -23,6 +23,7 @@ pub use self::plan_builder::SingleKey; pub use self::shard::Batchable; pub use self::shard::HasNextBatch; pub use self::shard::NextBatch; +pub use self::shard::RangeRequest; pub use self::shard::Shardable; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; diff --git a/src/request/shard.rs b/src/request/shard.rs index 7c78743d..ec234239 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -12,6 +12,7 @@ use crate::request::KvRequest; use crate::request::Plan; use crate::request::ResolveLock; use crate::store::RegionStore; +use crate::store::Request; use crate::Result; macro_rules! impl_inner_shardable { @@ -204,6 +205,32 @@ macro_rules! shardable_keys { }; } +pub trait RangeRequest: Request { + fn is_reverse(&self) -> bool { + false + } +} + +#[doc(hidden)] +#[macro_export] +macro_rules! range_request { + ($type_: ty) => { + impl RangeRequest for $type_ {} + }; +} + +#[doc(hidden)] +#[macro_export] +macro_rules! reversible_range_request { + ($type_: ty) => { + impl RangeRequest for $type_ { + fn is_reverse(&self) -> bool { + self.reverse + } + } + }; +} + #[doc(hidden)] #[macro_export] macro_rules! shardable_range { @@ -215,8 +242,13 @@ macro_rules! shardable_range { &self, pd_client: &Arc, ) -> BoxStream<'static, $crate::Result<(Self::Shard, $crate::store::RegionStore)>> { - let start_key = self.start_key.clone().into(); - let end_key = self.end_key.clone().into(); + let mut start_key = self.start_key.clone().into(); + let mut end_key = self.end_key.clone().into(); + // In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key. + // Therefore, before fetching the regions from PD, it is necessary to swap the values of start_key and end_key. + if self.is_reverse() { + std::mem::swap(&mut start_key, &mut end_key); + } $crate::store::store_stream_for_range((start_key, end_key), pd_client.clone()) } @@ -227,8 +259,13 @@ macro_rules! shardable_range { ) -> $crate::Result<()> { self.set_context(store.region_with_leader.context()?); + // In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key. + // As a result, after obtaining start_key and end_key from PD, we need to swap their values when assigning them to the request. self.start_key = shard.0.into(); self.end_key = shard.1.into(); + if self.is_reverse() { + std::mem::swap(&mut self.start_key, &mut self.end_key); + } Ok(()) } } diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 6d3c0999..4f3e1b93 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -28,10 +28,12 @@ use crate::request::KvRequest; use crate::request::Merge; use crate::request::NextBatch; use crate::request::Process; +use crate::request::RangeRequest; use crate::request::ResponseWithShard; use crate::request::Shardable; use crate::request::SingleKey; use crate::request::{Batchable, StoreRequest}; +use crate::reversible_range_request; use crate::shardable_key; use crate::shardable_keys; use crate::shardable_range; @@ -170,6 +172,7 @@ impl KvRequest for kvrpcpb::ScanRequest { type Response = kvrpcpb::ScanResponse; } +reversible_range_request!(kvrpcpb::ScanRequest); shardable_range!(kvrpcpb::ScanRequest); impl Merge for Collect { diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 24556836..82442c4b 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1028,12 +1028,83 @@ async fn txn_scan_reverse() -> Result<()> { init().await?; let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; + let k1 = b"k1".to_vec(); + let k2 = b"k2".to_vec(); + let k3 = b"k3".to_vec(); + + let v1 = b"v1".to_vec(); + let v2 = b"v2".to_vec(); + let v3 = b"v3".to_vec(); + + // Pessimistic option is not stable in this case. Use optimistic options instead. + let option = TransactionOptions::new_optimistic().drop_check(tikv_client::CheckLevel::Warn); + let mut t = client.begin_with_options(option.clone()).await?; + t.put(k1.clone(), v1.clone()).await?; + t.put(k2.clone(), v2.clone()).await?; + t.put(k3.clone(), v3.clone()).await?; + t.commit().await?; + + let mut t2 = client.begin_with_options(option).await?; + { + // For [k1, k3]: + let bound_range: BoundRange = (k1.clone()..=k3.clone()).into(); + let resp = t2 + .scan_reverse(bound_range, 3) + .await? + .map(|kv| (kv.0, kv.1)) + .collect::)>>(); + assert_eq!( + resp, + vec![ + (Key::from(k3.clone()), v3.clone()), + (Key::from(k2.clone()), v2.clone()), + (Key::from(k1.clone()), v1.clone()), + ] + ); + } + { + // For [k1, k3): + let bound_range: BoundRange = (k1.clone()..k3.clone()).into(); + let resp = t2 + .scan_reverse(bound_range, 3) + .await? + .map(|kv| (kv.0, kv.1)) + .collect::)>>(); + assert_eq!( + resp, + vec![ + (Key::from(k2.clone()), v2.clone()), + (Key::from(k1.clone()), v1), + ] + ); + } + { + // For (k1, k3): + let mut start_key = k1.clone(); + start_key.push(0); + let bound_range: BoundRange = (start_key..k3).into(); + let resp = t2 + .scan_reverse(bound_range, 3) + .await? + .map(|kv| (kv.0, kv.1)) + .collect::)>>(); + assert_eq!(resp, vec![(Key::from(k2), v2),]); + } + t2.commit().await?; + + Ok(()) +} + +#[tokio::test] +#[serial] +async fn txn_scan_reverse_multi_regions() -> Result<()> { + init().await?; + let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; + // Keys in `keys` should locate in different regions. See `init()` for boundary of regions. let keys: Vec = vec![ 0x00000000_u32.to_be_bytes().to_vec(), 0x40000000_u32.to_be_bytes().to_vec(), - b"a1".to_vec(), // 0x6149 - b"a2".to_vec(), // 0x614A 0x80000000_u32.to_be_bytes().to_vec(), 0xC0000000_u32.to_be_bytes().to_vec(), ] From bd65277b26434b61a19ca276ab65a1571e661577 Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Tue, 30 Jul 2024 14:51:50 +0100 Subject: [PATCH 03/14] set max decoding message size to unlimited --- src/store/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/store/client.rs b/src/store/client.rs index 363d4137..7423d385 100644 --- a/src/store/client.rs +++ b/src/store/client.rs @@ -35,7 +35,7 @@ impl KvConnect for TikvConnect { self.security_mgr .connect(address, TikvClient::new) .await - .map(|c| KvRpcClient::new(c, self.timeout)) + .map(|c| KvRpcClient::new(c.max_decoding_message_size(usize::MAX), self.timeout)) } } From 3b66727f345967b44bd8fe70709e9035b0e2abb0 Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Sat, 3 Aug 2024 13:31:12 +0100 Subject: [PATCH 04/14] config commit ttl calculation --- src/transaction/transaction.rs | 67 ++++++++++++++++++++++++++++++++-- 1 file changed, 63 insertions(+), 4 deletions(-) diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index e984f153..764a1bf1 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -1046,6 +1046,47 @@ pub enum TransactionKind { Pessimistic(Timestamp), } +#[derive(Clone, PartialEq, Debug)] +pub struct CommitTTLParameters { + max_ttl: u64, + min_ttl: u64, + txn_commit_batch_size: u64, + ttl_factor: f64, +} + +impl Default for CommitTTLParameters { + fn default() -> Self { + Self { + max_ttl: MAX_TTL, + min_ttl: DEFAULT_LOCK_TTL, + txn_commit_batch_size: TXN_COMMIT_BATCH_SIZE, + ttl_factor: TTL_FACTOR, + } + } +} + +impl CommitTTLParameters { + pub fn max_ttl(mut self, millis: u64) -> Self { + self.max_ttl = millis; + self + } + + pub fn min_ttl(mut self, millis: u64) -> Self { + self.min_ttl = millis; + self + } + + pub fn txn_commit_batch_size(mut self, size: u64) -> Self { + self.txn_commit_batch_size = size; + self + } + + pub fn ttl_factor(mut self, factor: f64) -> Self { + self.ttl_factor = factor; + self + } +} + /// Options for configuring a transaction. /// /// `TransactionOptions` has a builder-style API. @@ -1063,6 +1104,8 @@ pub struct TransactionOptions { retry_options: RetryOptions, /// What to do if the transaction is dropped without an attempt to commit or rollback check_level: CheckLevel, + /// Variables related to commit TTL calculation + ttl_parameters: CommitTTLParameters, #[doc(hidden)] heartbeat_option: HeartbeatOption, } @@ -1089,6 +1132,7 @@ impl TransactionOptions { read_only: false, retry_options: RetryOptions::default_optimistic(), check_level: CheckLevel::Panic, + ttl_parameters: Default::default(), heartbeat_option: HeartbeatOption::FixedTime(DEFAULT_HEARTBEAT_INTERVAL), } } @@ -1102,6 +1146,7 @@ impl TransactionOptions { read_only: false, retry_options: RetryOptions::default_pessimistic(), check_level: CheckLevel::Panic, + ttl_parameters: Default::default(), heartbeat_option: HeartbeatOption::FixedTime(DEFAULT_HEARTBEAT_INTERVAL), } } @@ -1155,6 +1200,13 @@ impl TransactionOptions { self } + /// Set Commit TTL parameters. + #[must_use] + pub fn ttl_parameters(mut self, options: CommitTTLParameters) -> TransactionOptions { + self.ttl_parameters = options; + self + } + fn push_for_update_ts(&mut self, for_update_ts: Timestamp) { match &mut self.kind { TransactionKind::Optimistic => unreachable!(), @@ -1447,11 +1499,18 @@ impl Committer { } fn calc_txn_lock_ttl(&mut self) -> u64 { - let mut lock_ttl = DEFAULT_LOCK_TTL; - if self.write_size > TXN_COMMIT_BATCH_SIZE { + let CommitTTLParameters { + max_ttl, + min_ttl, + txn_commit_batch_size, + ttl_factor, + } = self.options.ttl_parameters; + + let mut lock_ttl = min_ttl; + if self.write_size > txn_commit_batch_size { let size_mb = self.write_size as f64 / 1024.0 / 1024.0; - lock_ttl = (TTL_FACTOR * size_mb.sqrt()) as u64; - lock_ttl = lock_ttl.clamp(DEFAULT_LOCK_TTL, MAX_TTL); + lock_ttl = (ttl_factor * size_mb.sqrt()) as u64; + lock_ttl = lock_ttl.clamp(min_ttl, max_ttl); } lock_ttl } From f81a7f81c8e9d131bd4687dc9b1c6d31085c5dcc Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Sat, 3 Aug 2024 13:40:05 +0100 Subject: [PATCH 05/14] expose ttl params --- src/transaction/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index 81a290fa..4b1f3149 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -13,6 +13,7 @@ pub(crate) use lock::resolve_locks; pub(crate) use lock::HasLocks; pub use snapshot::Snapshot; pub use transaction::CheckLevel; +pub use transaction::CommitTTLParameters; #[doc(hidden)] pub use transaction::HeartbeatOption; pub use transaction::Transaction; From 5aef357ae69bce81ca846ca21fbb7dfc9e042e3b Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Sat, 3 Aug 2024 15:54:03 +0100 Subject: [PATCH 06/14] expose ttl params --- src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index a2acf57b..bb454dde 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -156,6 +156,8 @@ pub use crate::timestamp::TimestampExt; #[doc(inline)] pub use crate::transaction::lowering as transaction_lowering; #[doc(inline)] +pub use crate::transaction::CommitTTLParameters; +#[doc(inline)] pub use crate::transaction::CheckLevel; #[doc(inline)] pub use crate::transaction::Client as TransactionClient; From d152e2e38f1d551d151ee22d56aef737846a9ede Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Wed, 20 Nov 2024 14:52:26 +0000 Subject: [PATCH 07/14] use config commit ttl everywhere --- src/lib.rs | 4 ++-- src/transaction/transaction.rs | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index bb454dde..6b6623bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -156,12 +156,12 @@ pub use crate::timestamp::TimestampExt; #[doc(inline)] pub use crate::transaction::lowering as transaction_lowering; #[doc(inline)] -pub use crate::transaction::CommitTTLParameters; -#[doc(inline)] pub use crate::transaction::CheckLevel; #[doc(inline)] pub use crate::transaction::Client as TransactionClient; #[doc(inline)] +pub use crate::transaction::CommitTTLParameters; +#[doc(inline)] pub use crate::transaction::Snapshot; #[doc(inline)] pub use crate::transaction::Transaction; diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 764a1bf1..e1b8ab15 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -743,7 +743,7 @@ impl> Transaction { let request = new_heart_beat_request( self.timestamp.clone(), primary_key, - self.start_instant.elapsed().as_millis() as u64 + MAX_TTL, + self.start_instant.elapsed().as_millis() as u64 + self.options.ttl_parameters.max_ttl, ); let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) @@ -828,7 +828,7 @@ impl> Transaction { keys.clone().into_iter(), primary_lock, self.timestamp.clone(), - MAX_TTL, + self.options.ttl_parameters.max_ttl, for_update_ts.clone(), need_value, ); @@ -923,6 +923,7 @@ impl> Transaction { return; } self.is_heartbeat_started = true; + let max_ttl = self.options.ttl_parameters.max_ttl; let status = self.status.clone(); let primary_key = self @@ -955,7 +956,7 @@ impl> Transaction { let request = new_heart_beat_request( start_ts.clone(), primary_key.clone(), - start_instant.elapsed().as_millis() as u64 + MAX_TTL, + start_instant.elapsed().as_millis() as u64 + max_ttl, ); let encoded_req = EncodedRequest::new(request, rpc.get_codec()); let plan = PlanBuilder::new(rpc.clone(), encoded_req) From b8b22833bdeed110f4aa54e901f2899a5998bd29 Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Thu, 30 Jan 2025 15:30:33 +0000 Subject: [PATCH 08/14] introduce legacy gc/cleanup locks --- src/transaction/client.rs | 74 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 4bcb16d9..24e03be7 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -1,14 +1,17 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use std::ops::Bound; use std::sync::Arc; use log::debug; use log::info; +use log::warn; use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF}; use crate::config::Config; use crate::pd::PdClient; use crate::pd::PdRpcClient; +use crate::proto::kvrpcpb; use crate::proto::pdpb::Timestamp; use crate::request::codec::{ApiV1TxnCodec, ApiV2TxnCodec, Codec, EncodedRequest}; use crate::request::plan::CleanupLocksResult; @@ -25,6 +28,8 @@ use crate::Backoff; use crate::BoundRange; use crate::Result; +use super::resolve_locks; + // FIXME: cargo-culted value const SCAN_LOCK_BATCH_SIZE: u32 = 1024; @@ -290,6 +295,75 @@ impl Client { plan.execute().await } + /// GC function from older version of library: + /// https://github.com/tikv/client-rust/blob/9ca9aa79c6e28a878e9ee9574fd96bbc2688ccea/src/transaction/client.rs + /// + /// Cleans stale MVCC records in TiKV. + /// + /// It is done by: + /// 1. resolve all locks with ts <= safepoint + /// 2. update safepoint to PD + /// + /// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview). + /// We omit the second step "delete ranges" which is an optimization for TiDB. + pub async fn legacy_gc(&self, safepoint: Timestamp) -> Result { + let (resolved, live) = self.legacy_cleanup_locks((..).into(), &safepoint).await?; + + info!("resolved {resolved} locks ({live} live), sending new safepoint to PD..."); + + // update safepoint to PD + let res: bool = self + .pd + .clone() + .update_safepoint(safepoint.version()) + .await?; + if !res { + warn!("new safepoint != user-specified safepoint"); + } + + Ok(res) + } + + pub async fn legacy_cleanup_locks( + &self, + mut range: BoundRange, + safepoint: &Timestamp, + ) -> Result<(usize, usize)> { + // resolved locks, live locks + // scan all locks with ts <= safepoint + let mut locks: Vec = vec![]; + loop { + let req = new_scan_lock_request(range.clone(), &safepoint, SCAN_LOCK_BATCH_SIZE); + + let encoded_req = EncodedRequest::new(req, self.pd.get_codec()); + let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req) + .retry_multi_region(DEFAULT_REGION_BACKOFF) + .merge(crate::request::Collect) + .plan(); + let res: Vec = plan.execute().await?; + + if res.is_empty() { + break; + } + + let mut start_key = res.last().unwrap().key.clone(); + start_key.push(0); + + range.from = Bound::Included(start_key.into()); + + locks.extend(res); + } + + let to_resolve = locks.len(); + + // resolve locks + let live_locks = resolve_locks(locks, self.pd.clone()).await?.len(); + + let resolved_locks = to_resolve - live_locks; + + Ok((resolved_locks, live_locks)) + } + // For test. // Note: `batch_size` must be >= expected number of locks. #[cfg(feature = "integration-tests")] From 5ea349904a382f803155f1a5c462657d05850810 Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Thu, 30 Jan 2025 17:06:49 +0000 Subject: [PATCH 09/14] logging in cleanuplocks --- src/transaction/client.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 24e03be7..21c5e54b 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -348,6 +348,8 @@ impl Client { let mut start_key = res.last().unwrap().key.clone(); start_key.push(0); + + info!("scanned {} keys, new start: {:?}", res.len(), start_key); range.from = Bound::Included(start_key.into()); From 36c42f9c57f620affbacc5bc2f40cdf3c054e8cc Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Thu, 30 Jan 2025 17:07:35 +0000 Subject: [PATCH 10/14] logging in cleanuplocks --- src/transaction/client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 21c5e54b..209e1830 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -348,10 +348,10 @@ impl Client { let mut start_key = res.last().unwrap().key.clone(); start_key.push(0); - - info!("scanned {} keys, new start: {:?}", res.len(), start_key); range.from = Bound::Included(start_key.into()); + + info!("scanned {} keys, new range: {:?}", res.len(), range); locks.extend(res); } From ec9b434199c56b8928106b46e36baa3c78ed31ae Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Thu, 30 Jan 2025 17:10:27 +0000 Subject: [PATCH 11/14] incremental cleanup locks --- src/transaction/client.rs | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 209e1830..3d2726c5 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -307,9 +307,9 @@ impl Client { /// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview). /// We omit the second step "delete ranges" which is an optimization for TiDB. pub async fn legacy_gc(&self, safepoint: Timestamp) -> Result { - let (resolved, live) = self.legacy_cleanup_locks((..).into(), &safepoint).await?; + let resolved = self.legacy_cleanup_locks((..).into(), &safepoint).await?; - info!("resolved {resolved} locks ({live} live), sending new safepoint to PD..."); + info!("resolved {resolved} locks, sending new safepoint to PD..."); // update safepoint to PD let res: bool = self @@ -328,10 +328,9 @@ impl Client { &self, mut range: BoundRange, safepoint: &Timestamp, - ) -> Result<(usize, usize)> { - // resolved locks, live locks - // scan all locks with ts <= safepoint - let mut locks: Vec = vec![]; + ) -> Result { + let mut total_resolved = 0; + loop { let req = new_scan_lock_request(range.clone(), &safepoint, SCAN_LOCK_BATCH_SIZE); @@ -350,20 +349,15 @@ impl Client { start_key.push(0); range.from = Bound::Included(start_key.into()); - - info!("scanned {} keys, new range: {:?}", res.len(), range); - locks.extend(res); - } - - let to_resolve = locks.len(); + info!("scanned {} keys, new range: {:?}", res.len(), range); - // resolve locks - let live_locks = resolve_locks(locks, self.pd.clone()).await?.len(); + let resolved = resolve_locks(res, self.pd.clone()).await?.len(); - let resolved_locks = to_resolve - live_locks; + total_resolved += resolved; + } - Ok((resolved_locks, live_locks)) + Ok(total_resolved) } // For test. From a9f7945b7188792f346946fa616926a69a8c2b6e Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Thu, 30 Jan 2025 19:39:56 +0000 Subject: [PATCH 12/14] fix logging --- src/transaction/client.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 3d2726c5..8797c879 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -352,7 +352,11 @@ impl Client { info!("scanned {} keys, new range: {:?}", res.len(), range); - let resolved = resolve_locks(res, self.pd.clone()).await?.len(); + let to_resolve = res.len(); + + let not_resolved = resolve_locks(res, self.pd.clone()).await?.len(); + + let resolved = to_resolve - not_resolved; total_resolved += resolved; } From e31b8bf4b6565d3f98121393ca1ee5a43fcf099d Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Thu, 30 Jan 2025 21:02:27 +0000 Subject: [PATCH 13/14] cleanup all locks --- src/transaction/client.rs | 11 +++----- src/transaction/lock.rs | 58 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 8797c879..e82b0dd3 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -17,6 +17,7 @@ use crate::request::codec::{ApiV1TxnCodec, ApiV2TxnCodec, Codec, EncodedRequest} use crate::request::plan::CleanupLocksResult; use crate::request::Plan; use crate::timestamp::TimestampExt; +use crate::transaction::lock::cleanup_locks; use crate::transaction::lock::ResolveLocksOptions; use crate::transaction::lowering::new_scan_lock_request; use crate::transaction::lowering::new_unsafe_destroy_range_request; @@ -28,10 +29,8 @@ use crate::Backoff; use crate::BoundRange; use crate::Result; -use super::resolve_locks; - // FIXME: cargo-culted value -const SCAN_LOCK_BATCH_SIZE: u32 = 1024; +const SCAN_LOCK_BATCH_SIZE: u32 = 8192; /// The TiKV transactional `Client` is used to interact with TiKV using transactional requests. /// @@ -354,11 +353,9 @@ impl Client { let to_resolve = res.len(); - let not_resolved = resolve_locks(res, self.pd.clone()).await?.len(); - - let resolved = to_resolve - not_resolved; + cleanup_locks(res, self.pd.clone()).await?; - total_resolved += resolved; + total_resolved += to_resolve; } Ok(total_resolved) diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index afb1d6c4..4f7ac809 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -104,6 +104,64 @@ pub async fn resolve_locks( Ok(live_locks) } +/// Resolve all locks, regardless of if they are expired or not +pub async fn cleanup_locks( + locks: Vec, + pd_client: Arc, +) -> Result<()> { + debug!("cleaning up locks"); + let expired_locks = locks; // assume all expired + + // records the commit version of each primary lock (representing the status of the transaction) + let mut commit_versions: HashMap = HashMap::new(); + let mut clean_regions: HashMap> = HashMap::new(); + for lock in expired_locks { + let region_ver_id = pd_client + .region_for_key(&lock.primary_lock.clone().into()) + .await? + .ver_id(); + // skip if the region is cleaned + if clean_regions + .get(&lock.lock_version) + .map(|regions| regions.contains(®ion_ver_id)) + .unwrap_or(false) + { + continue; + } + + let commit_version = match commit_versions.get(&lock.lock_version) { + Some(&commit_version) => commit_version, + None => { + let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version); + let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); + let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) + .resolve_lock(OPTIMISTIC_BACKOFF) + .retry_multi_region(DEFAULT_REGION_BACKOFF) + .merge(CollectSingle) + .post_process_default() + .plan(); + let commit_version = plan.execute().await?; + commit_versions.insert(lock.lock_version, commit_version); + commit_version + } + }; + + let cleaned_region = resolve_lock_with_retry( + &lock.key, + lock.lock_version, + commit_version, + pd_client.clone(), + ) + .await?; + clean_regions + .entry(lock.lock_version) + .or_default() + .insert(cleaned_region); + } + + Ok(()) +} + async fn resolve_lock_with_retry( #[allow(clippy::ptr_arg)] key: &Vec, start_version: u64, From 2b08a22d11d2ccd6a1527546cef633e6c61a5449 Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Thu, 30 Jan 2025 23:45:17 +0000 Subject: [PATCH 14/14] optional cleanup locks in gc --- src/transaction/client.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index e82b0dd3..ff865f6e 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -305,10 +305,12 @@ impl Client { /// /// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview). /// We omit the second step "delete ranges" which is an optimization for TiDB. - pub async fn legacy_gc(&self, safepoint: Timestamp) -> Result { - let resolved = self.legacy_cleanup_locks((..).into(), &safepoint).await?; + pub async fn legacy_gc(&self, safepoint: Timestamp, cleanup_locks: bool) -> Result { + if cleanup_locks { + let resolved = self.legacy_cleanup_locks((..).into(), &safepoint).await?; - info!("resolved {resolved} locks, sending new safepoint to PD..."); + info!("resolved {resolved} locks, sending new safepoint to PD..."); + } // update safepoint to PD let res: bool = self