Skip to content

Commit

Permalink
cdc: validate request's cluster ID (tikv#10907)
Browse files Browse the repository at this point in the history
* cdc:  validate request's cluster ID

Signed-off-by: hi-rustin <[email protected]>

* cdc: validate cluster id

Signed-off-by: hi-rustin <[email protected]>

* test: add test_cdc_cluster_id_mismatch

Signed-off-by: hi-rustin <[email protected]>

* cdc: add version check

Signed-off-by: hi-rustin <[email protected]>

* docs: add comments

Signed-off-by: hi-rustin <[email protected]>

* Fix typo

Signed-off-by: hi-rustin <[email protected]>
  • Loading branch information
Rustin170506 authored Oct 8, 2021
1 parent f13318c commit 05767ac
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 23 additions & 2 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use kvproto::cdcpb::{
};
#[cfg(not(feature = "prost-codec"))]
use kvproto::cdcpb::{
ChangeDataRequest, DuplicateRequest as ErrorDuplicateRequest, Error as EventError, Event,
Event_oneof_event, ResolvedTs,
ChangeDataRequest, ClusterIdMismatch as ErrorClusterIdMismatch,
DuplicateRequest as ErrorDuplicateRequest, Error as EventError, Event, Event_oneof_event,
ResolvedTs,
};
use kvproto::kvrpcpb::{CheckLeaderRequest, ExtraOp as TxnExtraOp, LeaderInfo};
use kvproto::metapb::{PeerRole, Region, RegionEpoch};
Expand Down Expand Up @@ -220,6 +221,8 @@ impl fmt::Debug for Task {
const METRICS_FLUSH_INTERVAL: u64 = 10_000; // 10s

pub struct Endpoint<T, E> {
cluster_id: u64,

capture_regions: HashMap<u64, Delegate>,
connections: HashMap<ConnID, Conn>,
scheduler: Scheduler<Task>,
Expand Down Expand Up @@ -261,6 +264,7 @@ pub struct Endpoint<T, E> {

impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
pub fn new(
cluster_id: u64,
config: &CdcConfig,
pd_client: Arc<dyn PdClient>,
scheduler: Scheduler<Task>,
Expand Down Expand Up @@ -301,6 +305,7 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
let max_scan_batch_size = 1024;

let ep = Endpoint {
cluster_id,
env,
security_mgr,
capture_regions: HashMap::default(),
Expand Down Expand Up @@ -500,6 +505,20 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
};
downstream.set_sink(conn.get_sink().clone());

// Check if the cluster id matches.
let request_cluster_id = request.get_header().get_cluster_id();
if version >= FeatureGate::validate_cluster_id() && self.cluster_id != request_cluster_id {
let mut err_event = EventError::default();
let mut err = ErrorClusterIdMismatch::default();

err.set_current(self.cluster_id);
err.set_request(request_cluster_id);
err_event.set_cluster_id_mismatch(err);

let _ = downstream.sink_error_event(region_id, err_event);
return;
}

// TODO: Add a new task to close incompatible features.
if let Some(e) = conn.check_version_and_set_feature(version) {
// The downstream has not registered yet, send error right away.
Expand Down Expand Up @@ -1497,6 +1516,7 @@ mod tests {
use tempfile::TempDir;
use test_raftstore::MockRaftStoreRouter;
use test_raftstore::TestPdClient;
use tikv::server::DEFAULT_CLUSTER_ID;
use tikv::storage::kv::Engine;
use tikv::storage::txn::tests::{must_acquire_pessimistic_lock, must_prewrite_put};
use tikv::storage::TestEngineBuilder;
Expand Down Expand Up @@ -1605,6 +1625,7 @@ mod tests {
let env = Arc::new(Environment::new(1));
let security_mgr = Arc::new(SecurityManager::default());
let ep = Endpoint::new(
DEFAULT_CLUSTER_ID,
cfg,
pd_client,
task_sched,
Expand Down
5 changes: 5 additions & 0 deletions components/cdc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ impl FeatureGate {
pub fn batch_resolved_ts() -> semver::Version {
semver::Version::new(4, 0, 8)
}

// Returns the first version (v5.3.0) that supports validate cluster id.
pub(crate) fn validate_cluster_id() -> semver::Version {
semver::Version::new(5, 3, 0)
}
}

pub struct Conn {
Expand Down
46 changes: 46 additions & 0 deletions components/cdc/tests/integrations/test_cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use kvproto::kvrpcpb::*;
use pd_client::PdClient;
use raft::eraftpb::MessageType;
use test_raftstore::*;
use tikv::server::DEFAULT_CLUSTER_ID;
use tikv_util::HandyRwLock;
use txn_types::{Key, Lock, LockType};

Expand Down Expand Up @@ -299,6 +300,51 @@ fn test_cdc_not_leader() {
suite.stop();
}

#[test]
fn test_cdc_cluster_id_mismatch() {
let mut suite = TestSuite::new(3);

// Send request with mismatched cluster id.
let mut req = suite.new_changedata_request(1);
req.mut_header().set_ticdc_version("5.3.0".into());
req.mut_header().set_cluster_id(DEFAULT_CLUSTER_ID + 1);
let (mut req_tx, event_feed_wrap, receive_event) =
new_event_feed(suite.get_region_cdc_client(1));
block_on(req_tx.send((req.clone(), WriteFlags::default()))).unwrap();

// Assert mismatch.
let mut events = receive_event(false).events.to_vec();
assert_eq!(events.len(), 1);
match events.pop().unwrap().event.unwrap() {
Event_oneof_event::Error(err) => {
assert!(err.has_cluster_id_mismatch(), "{:?}", err);
}
other => panic!("unknown event {:?}", other),
}

// Low version request.
req.mut_header().set_ticdc_version("4.0.8".into());
req.mut_header().set_cluster_id(DEFAULT_CLUSTER_ID + 1);
block_on(req_tx.send((req, WriteFlags::default()))).unwrap();
let mut events = receive_event(false).events.to_vec();
assert_eq!(events.len(), 1);

// Should without error.
match events.pop().unwrap().event.unwrap() {
// Even if there is no write,
// it should always outputs an Initialized event.
Event_oneof_event::Entries(es) => {
assert!(es.entries.len() == 1, "{:?}", es);
let e = &es.entries[0];
assert_eq!(e.get_type(), EventLogType::Initialized, "{:?}", es);
}
other => panic!("unknown event {:?}", other),
}

event_feed_wrap.replace(None);
suite.stop();
}

#[test]
fn test_cdc_stale_epoch_after_region_ready() {
let mut suite = TestSuite::new(3);
Expand Down
2 changes: 2 additions & 0 deletions components/cdc/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use online_config::OnlineConfig;
use raftstore::coprocessor::CoprocessorHost;
use test_raftstore::*;
use tikv::config::CdcConfig;
use tikv::server::DEFAULT_CLUSTER_ID;
use tikv_util::config::ReadableDuration;
use tikv_util::worker::{LazyWorker, Runnable};
use tikv_util::HandyRwLock;
Expand Down Expand Up @@ -161,6 +162,7 @@ impl TestSuiteBuilder {
let env = Arc::new(Environment::new(1));
let cfg = CdcConfig::default();
let mut cdc_endpoint = cdc::Endpoint::new(
DEFAULT_CLUSTER_ID,
&cfg,
pd_cli.clone(),
worker.scheduler(),
Expand Down
1 change: 1 addition & 0 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
// Start CDC.
let cdc_memory_quota = MemoryQuota::new(self.config.cdc.sink_memory_quota.0 as _);
let cdc_endpoint = cdc::Endpoint::new(
self.config.server.cluster_id,
&self.config.cdc,
self.pd_client.clone(),
cdc_scheduler.clone(),
Expand Down

0 comments on commit 05767ac

Please sign in to comment.