Skip to content

Commit 0ed902d

Browse files
committed
introduce codec type parameter for Snapshot and Transcation
Signed-off-by: iosmanthus <[email protected]>
1 parent 07c7af7 commit 0ed902d

File tree

4 files changed

+40
-30
lines changed

4 files changed

+40
-30
lines changed

src/transaction/client.rs

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ where
158158
/// transaction.commit().await.unwrap();
159159
/// # });
160160
/// ```
161-
pub async fn begin_optimistic(&self) -> Result<Transaction<PdRpcClient<C>>> {
161+
pub async fn begin_optimistic(&self) -> Result<Transaction<C>> {
162162
debug!(self.logger, "creating new optimistic transaction");
163163
let timestamp = self.current_timestamp().await?;
164164
Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic()))
@@ -184,7 +184,7 @@ where
184184
/// transaction.commit().await.unwrap();
185185
/// # });
186186
/// ```
187-
pub async fn begin_pessimistic(&self) -> Result<Transaction<PdRpcClient<C>>> {
187+
pub async fn begin_pessimistic(&self) -> Result<Transaction<C>> {
188188
debug!(self.logger, "creating new pessimistic transaction");
189189
let timestamp = self.current_timestamp().await?;
190190
Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic()))
@@ -210,21 +210,14 @@ where
210210
/// transaction.commit().await.unwrap();
211211
/// # });
212212
/// ```
213-
pub async fn begin_with_options(
214-
&self,
215-
options: TransactionOptions,
216-
) -> Result<Transaction<PdRpcClient<C>>> {
213+
pub async fn begin_with_options(&self, options: TransactionOptions) -> Result<Transaction<C>> {
217214
debug!(self.logger, "creating new customized transaction");
218215
let timestamp = self.current_timestamp().await?;
219216
Ok(self.new_transaction(timestamp, options))
220217
}
221218

222219
/// Create a new [`Snapshot`](Snapshot) at the given [`Timestamp`](Timestamp).
223-
pub fn snapshot(
224-
&self,
225-
timestamp: Timestamp,
226-
options: TransactionOptions,
227-
) -> Snapshot<PdRpcClient<C>> {
220+
pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot<C> {
228221
debug!(self.logger, "creating new snapshot");
229222
let logger = self.logger.new(o!("child" => 1));
230223
Snapshot::new(self.new_transaction(timestamp, options.read_only()), logger)
@@ -306,11 +299,7 @@ where
306299
Ok(res)
307300
}
308301

309-
fn new_transaction(
310-
&self,
311-
timestamp: Timestamp,
312-
options: TransactionOptions,
313-
) -> Transaction<PdRpcClient<C>> {
302+
fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction<C> {
314303
let logger = self.logger.new(o!("child" => 1));
315304
Transaction::new(timestamp, self.pd.clone(), options, logger)
316305
}

src/transaction/snapshot.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use crate::{pd::PdClient, BoundRange, Key, KvPair, Result, Transaction, Value};
3+
use crate::{
4+
pd::{PdClient, PdRpcClient},
5+
request::codec::RequestCodec,
6+
BoundRange, Key, KvPair, Result, Transaction, Value,
7+
};
48
use derive_new::new;
59
use slog::Logger;
10+
use std::marker::PhantomData;
611

712
/// A read-only transaction which reads at the given timestamp.
813
///
@@ -12,12 +17,17 @@ use slog::Logger;
1217
///
1318
/// See the [Transaction](struct@crate::Transaction) docs for more information on the methods.
1419
#[derive(new)]
15-
pub struct Snapshot<PdC: PdClient> {
16-
transaction: Transaction<PdC>,
20+
pub struct Snapshot<C: RequestCodec, PdC: PdClient<RequestCodec = C> = PdRpcClient<C>> {
21+
transaction: Transaction<C, PdC>,
1722
logger: Logger,
23+
_phantom: PhantomData<C>,
1824
}
1925

20-
impl<PdC: PdClient> Snapshot<PdC> {
26+
impl<C, PdC> Snapshot<C, PdC>
27+
where
28+
C: RequestCodec,
29+
PdC: PdClient<RequestCodec = C>,
30+
{
2131
/// Get the value associated with the given key.
2232
pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
2333
debug!(self.logger, "invoking get request on snapshot");

src/transaction/transaction.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use std::{iter, sync::Arc, time::Instant};
3+
use std::{iter, marker::PhantomData, sync::Arc, time::Instant};
44

55
use derive_new::new;
66
use fail::fail_point;
@@ -12,9 +12,10 @@ use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
1212

1313
use crate::{
1414
backoff::{Backoff, DEFAULT_REGION_BACKOFF},
15-
pd::PdClient,
15+
pd::{PdClient, PdRpcClient},
1616
request::{
17-
Collect, CollectError, CollectSingle, CollectWithShard, Plan, PlanBuilder, RetryOptions,
17+
codec::RequestCodec, Collect, CollectError, CollectSingle, CollectWithShard, Plan,
18+
PlanBuilder, RetryOptions,
1819
},
1920
timestamp::TimestampExt,
2021
transaction::{buffer::Buffer, lowering::*},
@@ -62,7 +63,7 @@ use crate::{
6263
/// txn.commit().await.unwrap();
6364
/// # });
6465
/// ```
65-
pub struct Transaction<PdC: PdClient> {
66+
pub struct Transaction<C: RequestCodec, PdC: PdClient<RequestCodec = C> = PdRpcClient<C>> {
6667
status: Arc<RwLock<TransactionStatus>>,
6768
timestamp: Timestamp,
6869
buffer: Buffer,
@@ -71,15 +72,20 @@ pub struct Transaction<PdC: PdClient> {
7172
is_heartbeat_started: bool,
7273
start_instant: Instant,
7374
logger: Logger,
75+
_phantom: PhantomData<C>,
7476
}
7577

76-
impl<PdC: PdClient> Transaction<PdC> {
78+
impl<C, PdC> Transaction<C, PdC>
79+
where
80+
C: RequestCodec,
81+
PdC: PdClient<RequestCodec = C>,
82+
{
7783
pub(crate) fn new(
7884
timestamp: Timestamp,
7985
rpc: Arc<PdC>,
8086
options: TransactionOptions,
8187
logger: Logger,
82-
) -> Transaction<PdC> {
88+
) -> Transaction<C, PdC> {
8389
let status = if options.read_only {
8490
TransactionStatus::ReadOnly
8591
} else {
@@ -94,6 +100,7 @@ impl<PdC: PdClient> Transaction<PdC> {
94100
is_heartbeat_started: false,
95101
start_instant: std::time::Instant::now(),
96102
logger,
103+
_phantom: PhantomData,
97104
}
98105
}
99106

@@ -931,7 +938,11 @@ impl<PdC: PdClient> Transaction<PdC> {
931938
}
932939
}
933940

934-
impl<PdC: PdClient> Drop for Transaction<PdC> {
941+
impl<C, PdC> Drop for Transaction<C, PdC>
942+
where
943+
C: RequestCodec,
944+
PdC: PdClient<RequestCodec = C>,
945+
{
935946
fn drop(&mut self) {
936947
debug!(self.logger, "dropping transaction");
937948
if std::thread::panicking() {

tests/integration_tests.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ use std::{
2323
};
2424
use tikv_client::{
2525
raw,
26-
request::codec::RawCodec,
26+
request::codec::{RawCodec, RequestCodec},
2727
transaction::{self, HeartbeatOption},
28-
BoundRange, Error, Key, KvPair, PdClient, RawClient, Result, Transaction, TransactionClient,
28+
BoundRange, Error, Key, KvPair, RawClient, Result, Transaction, TransactionClient,
2929
TransactionOptions, Value,
3030
};
3131

@@ -973,7 +973,7 @@ async fn get_u32<C: RawCodec>(client: &RawClient<C>, key: Vec<u8>) -> Result<u32
973973
}
974974

975975
// helper function
976-
async fn get_txn_u32<C: PdClient>(txn: &mut Transaction<C>, key: Vec<u8>) -> Result<u32> {
976+
async fn get_txn_u32<C: RequestCodec>(txn: &mut Transaction<C>, key: Vec<u8>) -> Result<u32> {
977977
let x = txn.get(key).await?.unwrap();
978978
let boxed_slice = x.into_boxed_slice();
979979
let array: Box<[u8; 4]> = boxed_slice

0 commit comments

Comments
 (0)