diff --git a/components/cdc/src/service.rs b/components/cdc/src/service.rs index 53f471c1939..2ce8f2c218e 100644 --- a/components/cdc/src/service.rs +++ b/components/cdc/src/service.rs @@ -59,11 +59,11 @@ impl CdcEvent { // See https://play.golang.org/p/GFA9S-z_kUt let approximate_region_id_bytes = 4; let approximate_tso_bytes = 9; - // Protobuf encoding adds a tag to every varints. + // Protobuf encoding adds a tag to every Uvarint. // protobuf::rt::tag_size(1 /* or 2, field number*/) yields 1. let tag_bytes = 1; - // Byets of an array of region id. + // Bytes of an array of region id. r.regions.len() as u32 * (tag_bytes + approximate_region_id_bytes) // Bytes of a TSO. + (tag_bytes + approximate_tso_bytes) diff --git a/components/cdc/tests/integrations/test_cdc.rs b/components/cdc/tests/integrations/test_cdc.rs index fa9c5f2d748..d2aa2f70c82 100644 --- a/components/cdc/tests/integrations/test_cdc.rs +++ b/components/cdc/tests/integrations/test_cdc.rs @@ -171,7 +171,7 @@ fn test_cdc_basic() { let mut req = suite.new_changedata_request(1); req.set_region_epoch(Default::default()); // Zero region epoch. let (mut req_tx, resp_rx) = suite.get_region_cdc_client(1).event_feed().unwrap(); - let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); event_feed_wrap.replace(Some(resp_rx)); let mut events = receive_event(false).events.to_vec(); assert_eq!(events.len(), 1); @@ -277,7 +277,7 @@ fn test_cdc_not_leader() { rx.recv_timeout(Duration::from_millis(200)).unwrap(); // Try to subscribe again. - let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); let mut events = receive_event(false).events.to_vec(); assert_eq!(events.len(), 1); // Should failed with not leader error. @@ -485,7 +485,7 @@ fn test_cdc_scan() { req.checkpoint_ts = checkpoint_ts.into_inner(); let (mut req_tx, resp_rx) = suite.get_region_cdc_client(1).event_feed().unwrap(); event_feed_wrap.replace(Some(resp_rx)); - let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); let mut events = receive_event(false).events.to_vec(); if events.len() == 1 { events.extend(receive_event(false).events.to_vec()); @@ -687,7 +687,7 @@ fn test_duplicate_subscribe() { other => panic!("unknown event {:?}", other), } // Try to subscribe again. - let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); let mut events = receive_event(false).events.to_vec(); assert_eq!(events.len(), 1); // Should receive duplicate request error. @@ -800,7 +800,7 @@ fn test_old_value_basic() { req.set_extra_op(ExtraOp::ReadOldValue); let (mut req_tx, event_feed_wrap, receive_event) = new_event_feed(suite.get_region_cdc_client(1)); - let _req_tx = block_on(req_tx.send((req.clone(), WriteFlags::default()))).unwrap(); + block_on(req_tx.send((req.clone(), WriteFlags::default()))).unwrap(); sleep_ms(1000); // Insert value @@ -912,7 +912,7 @@ fn test_old_value_basic() { let (mut req_tx, resp_rx) = suite.get_region_cdc_client(1).event_feed().unwrap(); event_feed_wrap.replace(Some(resp_rx)); - let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); let mut event_count = 0; loop { let event = receive_event(false); @@ -957,12 +957,12 @@ fn test_old_value_multi_changefeeds() { req.set_extra_op(ExtraOp::ReadOldValue); let (mut req_tx_1, event_feed_wrap_1, receive_event_1) = new_event_feed(suite.get_region_cdc_client(1)); - let _req_tx_1 = block_on(req_tx_1.send((req.clone(), WriteFlags::default()))).unwrap(); + block_on(req_tx_1.send((req.clone(), WriteFlags::default()))).unwrap(); req.set_extra_op(ExtraOp::Noop); let (mut req_tx_2, event_feed_wrap_2, receive_event_2) = new_event_feed(suite.get_region_cdc_client(1)); - let _req_tx_2 = block_on(req_tx_2.send((req, WriteFlags::default()))).unwrap(); + block_on(req_tx_2.send((req, WriteFlags::default()))).unwrap(); sleep_ms(1000); // Insert value @@ -1065,7 +1065,7 @@ fn test_cdc_resolve_ts_checking_concurrency_manager() { req.set_checkpoint_ts(100); let (mut req_tx, event_feed_wrap, receive_event) = new_event_feed(suite.get_region_cdc_client(1)); - let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); // Make sure region 1 is registered. let mut events = receive_event(false).events; assert_eq!(events.len(), 1); @@ -1222,7 +1222,7 @@ fn test_old_value_1pc() { let mut req = suite.new_changedata_request(1); req.set_extra_op(ExtraOp::ReadOldValue); let (mut req_tx, _, receive_event) = new_event_feed(suite.get_region_cdc_client(1)); - let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); // Insert value let mut m1 = Mutation::default(); @@ -1283,7 +1283,7 @@ fn test_old_value_cache_hit() { req.set_extra_op(ExtraOp::ReadOldValue); let (mut req_tx, event_feed_wrap, receive_event) = new_event_feed(suite.get_region_cdc_client(1)); - let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); let mut events = receive_event(false).events.to_vec(); match events.remove(0).event.unwrap() { Event_oneof_event::Entries(mut es) => { @@ -1432,7 +1432,7 @@ fn test_old_value_cache_hit_pessimistic() { req.set_extra_op(ExtraOp::ReadOldValue); let (mut req_tx, event_feed_wrap, receive_event) = new_event_feed(suite.get_region_cdc_client(1)); - let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); let mut events = receive_event(false).events.to_vec(); match events.remove(0).event.unwrap() { Event_oneof_event::Entries(mut es) => { @@ -1942,7 +1942,7 @@ fn test_cdc_no_write_corresponding_to_lock() { let mut req = suite.new_changedata_request(1); req.set_extra_op(ExtraOp::ReadOldValue); let (mut req_tx, _, receive_event) = new_event_feed(suite.get_region_cdc_client(1)); - let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); // Txn1 commit_ts = 15 let mut m1 = Mutation::default(); @@ -1987,7 +1987,7 @@ fn test_cdc_write_rollback_when_no_lock() { let mut req = suite.new_changedata_request(1); req.set_extra_op(ExtraOp::ReadOldValue); let (mut req_tx, _, receive_event) = new_event_feed(suite.get_region_cdc_client(1)); - let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); // Txn1 commit_ts = 15 let mut m1 = Mutation::default();