From dac55a7879a6af604d5368e0b2cff16be0889db5 Mon Sep 17 00:00:00 2001 From: Rahul Rane Date: Thu, 8 Feb 2024 13:59:50 -0800 Subject: [PATCH] Adding a reverse scan API for raw client Signed-off-by: Rahul Rane --- src/raw/client.rs | 47 +++++++++++++++++++++++++++++++--- src/raw/lowering.rs | 2 ++ src/raw/requests.rs | 2 ++ tests/integration_tests.rs | 52 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 99 insertions(+), 4 deletions(-) diff --git a/src/raw/client.rs b/src/raw/client.rs index 109008ab..4d025a10 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -500,7 +500,40 @@ impl Client { /// ``` pub async fn scan(&self, range: impl Into, limit: u32) -> Result> { debug!("invoking raw scan request"); - self.scan_inner(range.into(), limit, false).await + self.scan_inner(range.into(), limit, false, false).await + } + + // Create a new 'scan' request but scans in "reverse" direction. + /// + /// Once resolved this request will result in a `Vec` of key-value pairs that lies in the specified range. + /// + /// If the number of eligible key-value pairs are greater than `limit`, + /// only the first `limit` pairs are returned, ordered by the key. + /// + /// + /// Reverse Scan queries continuous kv pairs in range (endKey, startKey], + /// from startKey(upperBound) to endKey(lowerBound), up to limit pairs. + /// The returned keys are in reversed lexicographical order. + /// If you want to include the endKey or exclude the startKey, push a '\0' to the key. + /// It doesn't support Scanning from "", because locating the last Region is not yet implemented. + /// # Examples + /// ```rust,no_run + /// # use tikv_client::{KvPair, Config, RawClient, IntoOwnedRange}; + /// # use futures::prelude::*; + /// # futures::executor::block_on(async { + /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap(); + /// let inclusive_range = "TiDB"..="TiKV"; + /// let req = client.scan_reverse(inclusive_range.into_owned(), 2); + /// let result: Vec = req.await.unwrap(); + /// # }); + /// ``` + pub async fn scan_reverse( + &self, + range: impl Into, + limit: u32, + ) -> Result> { + debug!("invoking raw reverse scan request"); + self.scan_inner(range.into(), limit, false, true).await } /// Create a new 'scan' request that only returns the keys. @@ -525,7 +558,7 @@ impl Client { pub async fn scan_keys(&self, range: impl Into, limit: u32) -> Result> { debug!("invoking raw scan_keys request"); Ok(self - .scan_inner(range, limit, true) + .scan_inner(range, limit, true, false) .await? .into_iter() .map(KvPair::into_key) @@ -682,6 +715,7 @@ impl Client { range: impl Into, limit: u32, key_only: bool, + reverse: bool, ) -> Result> { if limit > MAX_RAW_KV_SCAN_LIMIT { return Err(Error::MaxScanLimitExceeded { @@ -703,8 +737,13 @@ impl Client { let mut cur_limit = limit; while cur_limit > 0 { - let request = - new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone()); + let request = new_raw_scan_request( + cur_range.clone(), + cur_limit, + key_only, + reverse, + self.cf.clone(), + ); let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .single_region_with_store(region_store.clone()) .await? diff --git a/src/raw/lowering.rs b/src/raw/lowering.rs index 4fd35477..3065401f 100644 --- a/src/raw/lowering.rs +++ b/src/raw/lowering.rs @@ -84,6 +84,7 @@ pub fn new_raw_scan_request( range: BoundRange, limit: u32, key_only: bool, + reverse: bool, cf: Option, ) -> kvrpcpb::RawScanRequest { let (start_key, end_key) = range.into_keys(); @@ -92,6 +93,7 @@ pub fn new_raw_scan_request( end_key.unwrap_or_default().into(), limit, key_only, + reverse, cf, ) } diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 65165927..af59b770 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -278,6 +278,7 @@ pub fn new_raw_scan_request( end_key: Vec, limit: u32, key_only: bool, + reverse: bool, cf: Option, ) -> kvrpcpb::RawScanRequest { let mut req = kvrpcpb::RawScanRequest::default(); @@ -285,6 +286,7 @@ pub fn new_raw_scan_request( req.end_key = end_key; req.limit = limit; req.key_only = key_only; + req.reverse = reverse; req.maybe_set_cf(cf); req diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 7244ed6f..cc3d09b9 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -561,6 +561,58 @@ async fn raw_req() -> Result<()> { assert_eq!(res[5].1, "v4".as_bytes()); assert_eq!(res[6].1, "v5".as_bytes()); + // reverse scan + // By default end key is exclusive, so k5 is not included and start key in included + + let res = client + .scan_reverse("k5".to_owned().."k2".to_owned(), 5) + .await?; + assert_eq!(res.len(), 3); + assert_eq!(res[0].1, "v4".as_bytes()); + assert_eq!(res[1].1, "v3".as_bytes()); + assert_eq!(res[2].1, "v2".as_bytes()); + + // by default end key in exclusive and start key is inclusive but now exclude start key + let res = client + .scan_reverse("k5".to_owned()..="k2".to_owned(), 5) + .await?; + assert_eq!(res.len(), 2); + assert_eq!(res[0].1, "v4".as_bytes()); + assert_eq!(res[1].1, "v3".as_bytes()); + + // reverse scan + // by default end key is exclusive and start key is inclusive but now include end key + let res = client + .scan_reverse("k5\0".to_owned().."k2".to_owned(), 5) + .await?; + assert_eq!(res.len(), 4); + assert_eq!(res[0].1, "v5".as_bytes()); + assert_eq!(res[1].1, "v4".as_bytes()); + assert_eq!(res[2].1, "v3".as_bytes()); + assert_eq!(res[3].1, "v2".as_bytes()); + + // by default end key is exclusive and start key is inclusive but now include end key and exclude start key + let res = client + .scan_reverse("k5\0".to_owned()..="k2".to_owned(), 5) + .await?; + assert_eq!(res.len(), 3); + assert_eq!(res[0].1, "v5".as_bytes()); + assert_eq!(res[1].1, "v4".as_bytes()); + assert_eq!(res[2].1, "v3".as_bytes()); + + // limit results to first 2 + let res = client + .scan_reverse("k5".to_owned().."k2".to_owned(), 2) + .await?; + assert_eq!(res.len(), 2); + assert_eq!(res[0].1, "v4".as_bytes()); + assert_eq!(res[1].1, "v3".as_bytes()); + + //let range = "k5"..; // Upperbound (k5, +inf). This is NOT SUPPORTED by TiKV. + let range = BoundRange::range_from(Key::from("k5".to_owned())); + let res = client.scan_reverse(range, 20).await?; + assert_eq!(res.len(), 0); + Ok(()) }