Skip to content

Commit 6960a01

Browse files
authored
refactor: add support for batch region upgrade operations part1 (#7155)
* refactor: convert UpgradeRegion instruction to batch operation Signed-off-by: WenyXu <[email protected]> * feat: introduce `handle_batch_catchup_requests` fn for mito engine Signed-off-by: WenyXu <[email protected]> * test: add tests Signed-off-by: WenyXu <[email protected]> * feat: introduce `handle_batch_catchup_requests` fn for metric engine Signed-off-by: WenyXu <[email protected]> * chore: suggestion and add ser/de tests Signed-off-by: WenyXu <[email protected]> * chore: add comments Signed-off-by: WenyXu <[email protected]> * fix: fix unit tests Signed-off-by: WenyXu <[email protected]> --------- Signed-off-by: WenyXu <[email protected]>
1 parent 30894d7 commit 6960a01

File tree

16 files changed

+886
-177
lines changed

16 files changed

+886
-177
lines changed

src/common/meta/src/instruction.rs

Lines changed: 129 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -507,13 +507,14 @@ pub enum Instruction {
507507
/// Closes regions.
508508
#[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
509509
CloseRegions(Vec<RegionIdent>),
510-
/// Upgrades a region.
511-
UpgradeRegion(UpgradeRegion),
510+
/// Upgrades regions.
511+
#[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")]
512+
UpgradeRegions(Vec<UpgradeRegion>),
512513
#[serde(
513514
deserialize_with = "single_or_multiple_from",
514515
alias = "DowngradeRegion"
515516
)]
516-
/// Downgrades a region.
517+
/// Downgrades regions.
517518
DowngradeRegions(Vec<DowngradeRegion>),
518519
/// Invalidates batch cache.
519520
InvalidateCaches(Vec<CacheIdent>),
@@ -559,9 +560,9 @@ impl Instruction {
559560
}
560561

561562
/// Converts the instruction into a [UpgradeRegion].
562-
pub fn into_upgrade_regions(self) -> Option<UpgradeRegion> {
563+
pub fn into_upgrade_regions(self) -> Option<Vec<UpgradeRegion>> {
563564
match self {
564-
Self::UpgradeRegion(upgrade_region) => Some(upgrade_region),
565+
Self::UpgradeRegions(upgrade_region) => Some(upgrade_region),
565566
_ => None,
566567
}
567568
}
@@ -584,6 +585,10 @@ impl Instruction {
584585
/// The reply of [UpgradeRegion].
585586
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
586587
pub struct UpgradeRegionReply {
588+
/// The [RegionId].
589+
/// For compatibility, it is defaulted to [RegionId::new(0, 0)].
590+
#[serde(default)]
591+
pub region_id: RegionId,
587592
/// Returns true if `last_entry_id` has been replayed to the latest.
588593
pub ready: bool,
589594
/// Indicates whether the region exists.
@@ -635,14 +640,51 @@ where
635640
})
636641
}
637642

643+
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
644+
pub struct UpgradeRegionsReply {
645+
pub replies: Vec<UpgradeRegionReply>,
646+
}
647+
648+
impl UpgradeRegionsReply {
649+
pub fn new(replies: Vec<UpgradeRegionReply>) -> Self {
650+
Self { replies }
651+
}
652+
653+
pub fn single(reply: UpgradeRegionReply) -> Self {
654+
Self::new(vec![reply])
655+
}
656+
}
657+
658+
#[derive(Deserialize)]
659+
#[serde(untagged)]
660+
enum UpgradeRegionsCompat {
661+
Single(UpgradeRegionReply),
662+
Multiple(UpgradeRegionsReply),
663+
}
664+
665+
fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<UpgradeRegionsReply, D::Error>
666+
where
667+
D: Deserializer<'de>,
668+
{
669+
let helper = UpgradeRegionsCompat::deserialize(deserializer)?;
670+
Ok(match helper {
671+
UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]),
672+
UpgradeRegionsCompat::Multiple(reply) => reply,
673+
})
674+
}
675+
638676
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
639677
#[serde(tag = "type", rename_all = "snake_case")]
640678
pub enum InstructionReply {
641679
#[serde(alias = "open_region")]
642680
OpenRegions(SimpleReply),
643681
#[serde(alias = "close_region")]
644682
CloseRegions(SimpleReply),
645-
UpgradeRegion(UpgradeRegionReply),
683+
#[serde(
684+
deserialize_with = "upgrade_regions_compat_from",
685+
alias = "upgrade_region"
686+
)]
687+
UpgradeRegions(UpgradeRegionsReply),
646688
#[serde(
647689
alias = "downgrade_region",
648690
deserialize_with = "downgrade_regions_compat_from"
@@ -658,9 +700,11 @@ impl Display for InstructionReply {
658700
match self {
659701
Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
660702
Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
661-
Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
703+
Self::UpgradeRegions(reply) => {
704+
write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies)
705+
}
662706
Self::DowngradeRegions(reply) => {
663-
write!(f, "InstructionReply::DowngradeRegions({:?})", reply)
707+
write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies)
664708
}
665709
Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
666710
Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
@@ -685,9 +729,9 @@ impl InstructionReply {
685729
}
686730
}
687731

688-
pub fn expect_upgrade_region_reply(self) -> UpgradeRegionReply {
732+
pub fn expect_upgrade_regions_reply(self) -> Vec<UpgradeRegionReply> {
689733
match self {
690-
Self::UpgradeRegion(reply) => reply,
734+
Self::UpgradeRegions(reply) => reply.replies,
691735
_ => panic!("Expected UpgradeRegion reply"),
692736
}
693737
}
@@ -749,25 +793,58 @@ mod tests {
749793
serialized
750794
);
751795

752-
let downgrade_region = InstructionReply::DowngradeRegions(DowngradeRegionsReply::single(
753-
DowngradeRegionReply {
796+
let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
797+
region_id: RegionId::new(1024, 1),
798+
last_entry_id: None,
799+
metadata_last_entry_id: None,
800+
replay_timeout: Some(Duration::from_millis(1000)),
801+
location_id: None,
802+
replay_entry_id: None,
803+
metadata_replay_entry_id: None,
804+
}]);
805+
806+
let serialized = serde_json::to_string(&upgrade_region).unwrap();
807+
assert_eq!(
808+
r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#,
809+
serialized
810+
);
811+
}
812+
813+
#[test]
814+
fn test_serialize_instruction_reply() {
815+
let downgrade_region_reply = InstructionReply::DowngradeRegions(
816+
DowngradeRegionsReply::single(DowngradeRegionReply {
754817
region_id: RegionId::new(1024, 1),
755818
last_entry_id: None,
756819
metadata_last_entry_id: None,
757820
exists: true,
758821
error: None,
759-
},
760-
));
822+
}),
823+
);
761824

762-
let serialized = serde_json::to_string(&downgrade_region).unwrap();
825+
let serialized = serde_json::to_string(&downgrade_region_reply).unwrap();
763826
assert_eq!(
764827
r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
765828
serialized
766-
)
829+
);
830+
831+
let upgrade_region_reply =
832+
InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
833+
region_id: RegionId::new(1024, 1),
834+
ready: true,
835+
exists: true,
836+
error: None,
837+
}));
838+
let serialized = serde_json::to_string(&upgrade_region_reply).unwrap();
839+
assert_eq!(
840+
r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#,
841+
serialized
842+
);
767843
}
768844

769845
#[test]
770846
fn test_deserialize_instruction() {
847+
// legacy open region instruction
771848
let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
772849
let open_region_instruction: Instruction =
773850
serde_json::from_str(open_region_instruction).unwrap();
@@ -785,6 +862,7 @@ mod tests {
785862
)]);
786863
assert_eq!(open_region_instruction, open_region);
787864

865+
// legacy close region instruction
788866
let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
789867
let close_region_instruction: Instruction =
790868
serde_json::from_str(close_region_instruction).unwrap();
@@ -796,6 +874,7 @@ mod tests {
796874
}]);
797875
assert_eq!(close_region_instruction, close_region);
798876

877+
// legacy downgrade region instruction
799878
let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
800879
let downgrade_region_instruction: Instruction =
801880
serde_json::from_str(downgrade_region_instruction).unwrap();
@@ -805,6 +884,25 @@ mod tests {
805884
}]);
806885
assert_eq!(downgrade_region_instruction, downgrade_region);
807886

887+
// legacy upgrade region instruction
888+
let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#;
889+
let upgrade_region_instruction: Instruction =
890+
serde_json::from_str(upgrade_region_instruction).unwrap();
891+
let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
892+
region_id: RegionId::new(1024, 1),
893+
last_entry_id: None,
894+
metadata_last_entry_id: None,
895+
replay_timeout: Some(Duration::from_millis(1000)),
896+
location_id: None,
897+
replay_entry_id: None,
898+
metadata_replay_entry_id: None,
899+
}]);
900+
assert_eq!(upgrade_region_instruction, upgrade_region);
901+
}
902+
903+
#[test]
904+
fn test_deserialize_instruction_reply() {
905+
// legacy close region reply
808906
let close_region_instruction_reply =
809907
r#"{"result":true,"error":null,"type":"close_region"}"#;
810908
let close_region_instruction_reply: InstructionReply =
@@ -815,6 +913,7 @@ mod tests {
815913
});
816914
assert_eq!(close_region_instruction_reply, close_region_reply);
817915

916+
// legacy open region reply
818917
let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
819918
let open_region_instruction_reply: InstructionReply =
820919
serde_json::from_str(open_region_instruction_reply).unwrap();
@@ -824,6 +923,7 @@ mod tests {
824923
});
825924
assert_eq!(open_region_instruction_reply, open_region_reply);
826925

926+
// legacy downgrade region reply
827927
let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
828928
let downgrade_region_instruction_reply: InstructionReply =
829929
serde_json::from_str(downgrade_region_instruction_reply).unwrap();
@@ -837,6 +937,19 @@ mod tests {
837937
}),
838938
);
839939
assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
940+
941+
// legacy upgrade region reply
942+
let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#;
943+
let upgrade_region_instruction_reply: InstructionReply =
944+
serde_json::from_str(upgrade_region_instruction_reply).unwrap();
945+
let upgrade_region_reply =
946+
InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
947+
region_id: RegionId::new(1024, 1),
948+
ready: true,
949+
exists: true,
950+
error: None,
951+
}));
952+
assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
840953
}
841954

842955
#[derive(Debug, Clone, Serialize, Deserialize)]

src/datanode/src/heartbeat/handler.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ impl RegionHeartbeatResponseHandler {
114114
)),
115115
Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler.into())),
116116
Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler.into())),
117-
Instruction::UpgradeRegion(_) => Ok(Box::new(UpgradeRegionsHandler.into())),
117+
Instruction::UpgradeRegions(_) => Ok(Box::new(UpgradeRegionsHandler.into())),
118118
Instruction::GetFileRefs(_) => Ok(Box::new(GetFileRefsHandler.into())),
119119
Instruction::GcRegions(_) => Ok(Box::new(GcRegionsHandler.into())),
120120
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
@@ -194,7 +194,7 @@ dispatch_instr!(
194194
OpenRegions => OpenRegions,
195195
FlushRegions => FlushRegions,
196196
DowngradeRegions => DowngradeRegions,
197-
UpgradeRegion => UpgradeRegions,
197+
UpgradeRegions => UpgradeRegions,
198198
GetFileRefs => GetFileRefs,
199199
GcRegions => GcRegions,
200200
);
@@ -334,10 +334,10 @@ mod tests {
334334
);
335335

336336
// Upgrade region
337-
let instruction = Instruction::UpgradeRegion(UpgradeRegion {
337+
let instruction = Instruction::UpgradeRegions(vec![UpgradeRegion {
338338
region_id,
339339
..Default::default()
340-
});
340+
}]);
341341
assert!(
342342
heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
343343
);

0 commit comments

Comments
 (0)