diff --git a/Cargo.lock b/Cargo.lock index ff581229fe7..4216f9371a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2128,7 +2128,7 @@ dependencies = [ [[package]] name = "kvproto" version = "0.0.2" -source = "git+https://github.com/pingcap/kvproto.git#0f5764a128ad77ccf0a5b0ce0d6e2bfa50a108ce" +source = "git+https://github.com/pingcap/kvproto.git#df38c15b57b3049293c71ca828a42d83faffe221" dependencies = [ "futures 0.3.15", "grpcio", diff --git a/components/cdc/src/endpoint.rs b/components/cdc/src/endpoint.rs index 6b68d59834a..ef8164da785 100644 --- a/components/cdc/src/endpoint.rs +++ b/components/cdc/src/endpoint.rs @@ -20,8 +20,9 @@ use kvproto::cdcpb::{ }; #[cfg(not(feature = "prost-codec"))] use kvproto::cdcpb::{ - ChangeDataRequest, DuplicateRequest as ErrorDuplicateRequest, Error as EventError, Event, - Event_oneof_event, ResolvedTs, + ChangeDataRequest, ClusterIdMismatch as ErrorClusterIdMismatch, + DuplicateRequest as ErrorDuplicateRequest, Error as EventError, Event, Event_oneof_event, + ResolvedTs, }; use kvproto::kvrpcpb::{CheckLeaderRequest, ExtraOp as TxnExtraOp, LeaderInfo}; use kvproto::metapb::{PeerRole, Region, RegionEpoch}; @@ -220,6 +221,8 @@ impl fmt::Debug for Task { const METRICS_FLUSH_INTERVAL: u64 = 10_000; // 10s pub struct Endpoint { + cluster_id: u64, + capture_regions: HashMap, connections: HashMap, scheduler: Scheduler, @@ -261,6 +264,7 @@ pub struct Endpoint { impl, E: KvEngine> Endpoint { pub fn new( + cluster_id: u64, config: &CdcConfig, pd_client: Arc, scheduler: Scheduler, @@ -301,6 +305,7 @@ impl, E: KvEngine> Endpoint { let max_scan_batch_size = 1024; let ep = Endpoint { + cluster_id, env, security_mgr, capture_regions: HashMap::default(), @@ -500,6 +505,20 @@ impl, E: KvEngine> Endpoint { }; downstream.set_sink(conn.get_sink().clone()); + // Check if the cluster id matches. + let request_cluster_id = request.get_header().get_cluster_id(); + if version >= FeatureGate::validate_cluster_id() && self.cluster_id != request_cluster_id { + let mut err_event = EventError::default(); + let mut err = ErrorClusterIdMismatch::default(); + + err.set_current(self.cluster_id); + err.set_request(request_cluster_id); + err_event.set_cluster_id_mismatch(err); + + let _ = downstream.sink_error_event(region_id, err_event); + return; + } + // TODO: Add a new task to close incompatible features. if let Some(e) = conn.check_version_and_set_feature(version) { // The downstream has not registered yet, send error right away. @@ -1497,6 +1516,7 @@ mod tests { use tempfile::TempDir; use test_raftstore::MockRaftStoreRouter; use test_raftstore::TestPdClient; + use tikv::server::DEFAULT_CLUSTER_ID; use tikv::storage::kv::Engine; use tikv::storage::txn::tests::{must_acquire_pessimistic_lock, must_prewrite_put}; use tikv::storage::TestEngineBuilder; @@ -1605,6 +1625,7 @@ mod tests { let env = Arc::new(Environment::new(1)); let security_mgr = Arc::new(SecurityManager::default()); let ep = Endpoint::new( + DEFAULT_CLUSTER_ID, cfg, pd_client, task_sched, diff --git a/components/cdc/src/service.rs b/components/cdc/src/service.rs index c81a425d844..53f471c1939 100644 --- a/components/cdc/src/service.rs +++ b/components/cdc/src/service.rs @@ -198,6 +198,11 @@ impl FeatureGate { pub fn batch_resolved_ts() -> semver::Version { semver::Version::new(4, 0, 8) } + + // Returns the first version (v5.3.0) that supports validate cluster id. + pub(crate) fn validate_cluster_id() -> semver::Version { + semver::Version::new(5, 3, 0) + } } pub struct Conn { diff --git a/components/cdc/tests/integrations/test_cdc.rs b/components/cdc/tests/integrations/test_cdc.rs index 3876d707c2f..fa9c5f2d748 100644 --- a/components/cdc/tests/integrations/test_cdc.rs +++ b/components/cdc/tests/integrations/test_cdc.rs @@ -19,6 +19,7 @@ use kvproto::kvrpcpb::*; use pd_client::PdClient; use raft::eraftpb::MessageType; use test_raftstore::*; +use tikv::server::DEFAULT_CLUSTER_ID; use tikv_util::HandyRwLock; use txn_types::{Key, Lock, LockType}; @@ -299,6 +300,51 @@ fn test_cdc_not_leader() { suite.stop(); } +#[test] +fn test_cdc_cluster_id_mismatch() { + let mut suite = TestSuite::new(3); + + // Send request with mismatched cluster id. + let mut req = suite.new_changedata_request(1); + req.mut_header().set_ticdc_version("5.3.0".into()); + req.mut_header().set_cluster_id(DEFAULT_CLUSTER_ID + 1); + let (mut req_tx, event_feed_wrap, receive_event) = + new_event_feed(suite.get_region_cdc_client(1)); + block_on(req_tx.send((req.clone(), WriteFlags::default()))).unwrap(); + + // Assert mismatch. + let mut events = receive_event(false).events.to_vec(); + assert_eq!(events.len(), 1); + match events.pop().unwrap().event.unwrap() { + Event_oneof_event::Error(err) => { + assert!(err.has_cluster_id_mismatch(), "{:?}", err); + } + other => panic!("unknown event {:?}", other), + } + + // Low version request. + req.mut_header().set_ticdc_version("4.0.8".into()); + req.mut_header().set_cluster_id(DEFAULT_CLUSTER_ID + 1); + block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + let mut events = receive_event(false).events.to_vec(); + assert_eq!(events.len(), 1); + + // Should without error. + match events.pop().unwrap().event.unwrap() { + // Even if there is no write, + // it should always outputs an Initialized event. + Event_oneof_event::Entries(es) => { + assert!(es.entries.len() == 1, "{:?}", es); + let e = &es.entries[0]; + assert_eq!(e.get_type(), EventLogType::Initialized, "{:?}", es); + } + other => panic!("unknown event {:?}", other), + } + + event_feed_wrap.replace(None); + suite.stop(); +} + #[test] fn test_cdc_stale_epoch_after_region_ready() { let mut suite = TestSuite::new(3); diff --git a/components/cdc/tests/mod.rs b/components/cdc/tests/mod.rs index b91cba8c582..2f0129d188d 100644 --- a/components/cdc/tests/mod.rs +++ b/components/cdc/tests/mod.rs @@ -15,6 +15,7 @@ use online_config::OnlineConfig; use raftstore::coprocessor::CoprocessorHost; use test_raftstore::*; use tikv::config::CdcConfig; +use tikv::server::DEFAULT_CLUSTER_ID; use tikv_util::config::ReadableDuration; use tikv_util::worker::{LazyWorker, Runnable}; use tikv_util::HandyRwLock; @@ -161,6 +162,7 @@ impl TestSuiteBuilder { let env = Arc::new(Environment::new(1)); let cfg = CdcConfig::default(); let mut cdc_endpoint = cdc::Endpoint::new( + DEFAULT_CLUSTER_ID, &cfg, pd_cli.clone(), worker.scheduler(), diff --git a/components/server/src/server.rs b/components/server/src/server.rs index bb810b8482e..8a61aa782de 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -851,6 +851,7 @@ impl TiKVServer { // Start CDC. let cdc_memory_quota = MemoryQuota::new(self.config.cdc.sink_memory_quota.0 as _); let cdc_endpoint = cdc::Endpoint::new( + self.config.server.cluster_id, &self.config.cdc, self.pd_client.clone(), cdc_scheduler.clone(),