Skip to content

Commit

Permalink
cdc: Remove useless variable assignments and refine typos (tikv#11014)
Browse files Browse the repository at this point in the history
* cdc: Remove useless variable assignments and refine typos

Signed-off-by: hi-rustin <[email protected]>

* docs: fix typo

Signed-off-by: hi-rustin <[email protected]>

Co-authored-by: qupeng <[email protected]>
  • Loading branch information
Rustin170506 and hicqu authored Oct 13, 2021
1 parent ba3a2e4 commit bcee15b
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
4 changes: 2 additions & 2 deletions components/cdc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ impl CdcEvent {
// See https://play.golang.org/p/GFA9S-z_kUt
let approximate_region_id_bytes = 4;
let approximate_tso_bytes = 9;
// Protobuf encoding adds a tag to every varints.
// Protobuf encoding adds a tag to every Uvarint.
// protobuf::rt::tag_size(1 /* or 2, field number*/) yields 1.
let tag_bytes = 1;

// Byets of an array of region id.
// Bytes of an array of region id.
r.regions.len() as u32 * (tag_bytes + approximate_region_id_bytes)
// Bytes of a TSO.
+ (tag_bytes + approximate_tso_bytes)
Expand Down
28 changes: 14 additions & 14 deletions components/cdc/tests/integrations/test_cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ fn test_cdc_basic() {
let mut req = suite.new_changedata_request(1);
req.set_region_epoch(Default::default()); // Zero region epoch.
let (mut req_tx, resp_rx) = suite.get_region_cdc_client(1).event_feed().unwrap();
let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
event_feed_wrap.replace(Some(resp_rx));
let mut events = receive_event(false).events.to_vec();
assert_eq!(events.len(), 1);
Expand Down Expand Up @@ -277,7 +277,7 @@ fn test_cdc_not_leader() {
rx.recv_timeout(Duration::from_millis(200)).unwrap();

// Try to subscribe again.
let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
let mut events = receive_event(false).events.to_vec();
assert_eq!(events.len(), 1);
// Should failed with not leader error.
Expand Down Expand Up @@ -485,7 +485,7 @@ fn test_cdc_scan() {
req.checkpoint_ts = checkpoint_ts.into_inner();
let (mut req_tx, resp_rx) = suite.get_region_cdc_client(1).event_feed().unwrap();
event_feed_wrap.replace(Some(resp_rx));
let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
let mut events = receive_event(false).events.to_vec();
if events.len() == 1 {
events.extend(receive_event(false).events.to_vec());
Expand Down Expand Up @@ -687,7 +687,7 @@ fn test_duplicate_subscribe() {
other => panic!("unknown event {:?}", other),
}
// Try to subscribe again.
let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
let mut events = receive_event(false).events.to_vec();
assert_eq!(events.len(), 1);
// Should receive duplicate request error.
Expand Down Expand Up @@ -800,7 +800,7 @@ fn test_old_value_basic() {
req.set_extra_op(ExtraOp::ReadOldValue);
let (mut req_tx, event_feed_wrap, receive_event) =
new_event_feed(suite.get_region_cdc_client(1));
let _req_tx = block_on(req_tx.send((req.clone(), WriteFlags::default()))).unwrap();
block_on(req_tx.send((req.clone(), WriteFlags::default()))).unwrap();
sleep_ms(1000);

// Insert value
Expand Down Expand Up @@ -912,7 +912,7 @@ fn test_old_value_basic() {

let (mut req_tx, resp_rx) = suite.get_region_cdc_client(1).event_feed().unwrap();
event_feed_wrap.replace(Some(resp_rx));
let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
let mut event_count = 0;
loop {
let event = receive_event(false);
Expand Down Expand Up @@ -957,12 +957,12 @@ fn test_old_value_multi_changefeeds() {
req.set_extra_op(ExtraOp::ReadOldValue);
let (mut req_tx_1, event_feed_wrap_1, receive_event_1) =
new_event_feed(suite.get_region_cdc_client(1));
let _req_tx_1 = block_on(req_tx_1.send((req.clone(), WriteFlags::default()))).unwrap();
block_on(req_tx_1.send((req.clone(), WriteFlags::default()))).unwrap();

req.set_extra_op(ExtraOp::Noop);
let (mut req_tx_2, event_feed_wrap_2, receive_event_2) =
new_event_feed(suite.get_region_cdc_client(1));
let _req_tx_2 = block_on(req_tx_2.send((req, WriteFlags::default()))).unwrap();
block_on(req_tx_2.send((req, WriteFlags::default()))).unwrap();

sleep_ms(1000);
// Insert value
Expand Down Expand Up @@ -1065,7 +1065,7 @@ fn test_cdc_resolve_ts_checking_concurrency_manager() {
req.set_checkpoint_ts(100);
let (mut req_tx, event_feed_wrap, receive_event) =
new_event_feed(suite.get_region_cdc_client(1));
let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
// Make sure region 1 is registered.
let mut events = receive_event(false).events;
assert_eq!(events.len(), 1);
Expand Down Expand Up @@ -1222,7 +1222,7 @@ fn test_old_value_1pc() {
let mut req = suite.new_changedata_request(1);
req.set_extra_op(ExtraOp::ReadOldValue);
let (mut req_tx, _, receive_event) = new_event_feed(suite.get_region_cdc_client(1));
let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
block_on(req_tx.send((req, WriteFlags::default()))).unwrap();

// Insert value
let mut m1 = Mutation::default();
Expand Down Expand Up @@ -1283,7 +1283,7 @@ fn test_old_value_cache_hit() {
req.set_extra_op(ExtraOp::ReadOldValue);
let (mut req_tx, event_feed_wrap, receive_event) =
new_event_feed(suite.get_region_cdc_client(1));
let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
let mut events = receive_event(false).events.to_vec();
match events.remove(0).event.unwrap() {
Event_oneof_event::Entries(mut es) => {
Expand Down Expand Up @@ -1432,7 +1432,7 @@ fn test_old_value_cache_hit_pessimistic() {
req.set_extra_op(ExtraOp::ReadOldValue);
let (mut req_tx, event_feed_wrap, receive_event) =
new_event_feed(suite.get_region_cdc_client(1));
let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
let mut events = receive_event(false).events.to_vec();
match events.remove(0).event.unwrap() {
Event_oneof_event::Entries(mut es) => {
Expand Down Expand Up @@ -1942,7 +1942,7 @@ fn test_cdc_no_write_corresponding_to_lock() {
let mut req = suite.new_changedata_request(1);
req.set_extra_op(ExtraOp::ReadOldValue);
let (mut req_tx, _, receive_event) = new_event_feed(suite.get_region_cdc_client(1));
let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
block_on(req_tx.send((req, WriteFlags::default()))).unwrap();

// Txn1 commit_ts = 15
let mut m1 = Mutation::default();
Expand Down Expand Up @@ -1987,7 +1987,7 @@ fn test_cdc_write_rollback_when_no_lock() {
let mut req = suite.new_changedata_request(1);
req.set_extra_op(ExtraOp::ReadOldValue);
let (mut req_tx, _, receive_event) = new_event_feed(suite.get_region_cdc_client(1));
let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
block_on(req_tx.send((req, WriteFlags::default()))).unwrap();

// Txn1 commit_ts = 15
let mut m1 = Mutation::default();
Expand Down

0 comments on commit bcee15b

Please sign in to comment.