From ea8943efba92993f0193c471218d0c1e4dc8867c Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 23 Aug 2022 17:09:09 +0800 Subject: [PATCH] feat: `WalReplicator` skeleton implementation (#179) * feat: draft WalReplicator Signed-off-by: Ruihang Xia * add document Signed-off-by: Ruihang Xia * fix clippy warnings Signed-off-by: Ruihang Xia * simple start stop test Signed-off-by: Ruihang Xia * chore: CR namings Signed-off-by: Ruihang Xia * rename Replicator to Synchronizer Signed-off-by: Ruihang Xia * remove redundent cfg Signed-off-by: Ruihang Xia Signed-off-by: Ruihang Xia --- analytic_engine/Cargo.toml | 1 + analytic_engine/src/instance/mod.rs | 14 +- analytic_engine/src/instance/open.rs | 8 +- analytic_engine/src/lib.rs | 1 + analytic_engine/src/wal_synchronizer.rs | 380 ++++++++++++++++++++++++ wal/Cargo.toml | 5 + wal/src/lib.rs | 4 +- wal/src/tests/mod.rs | 1 + 8 files changed, 410 insertions(+), 4 deletions(-) create mode 100644 analytic_engine/src/wal_synchronizer.rs diff --git a/analytic_engine/Cargo.toml b/analytic_engine/Cargo.toml index e70a0a3f35..02feaaad68 100644 --- a/analytic_engine/Cargo.toml +++ b/analytic_engine/Cargo.toml @@ -44,3 +44,4 @@ common_types = { path = "../common_types", features = ["test"] } common_util = { path = "../common_util", features = ["test"] } env_logger = "0.6" tempfile = "3.1.0" +wal = { path = "../wal", features = ["test"] } diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index d23180f818..0259df5fd2 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -37,9 +37,11 @@ use crate::{ space::{SpaceId, SpaceRef}, sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger}, table::data::TableDataRef, + wal_synchronizer::WalSynchronizer, TableOptions, }; +#[allow(clippy::enum_variant_names)] #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Failed to stop file purger, err:{}", source))] @@ -49,6 +51,11 @@ pub enum Error { StopScheduler { source: crate::compaction::scheduler::Error, }, + + #[snafu(display("Failed to stop WAL Synchronizer, err:{}", source))] + StopWalSynchronizer { + source: crate::wal_synchronizer::Error, + }, } define_result!(Error); @@ -141,7 +148,6 @@ impl SpaceStore { /// Table engine instance /// /// Manages all spaces, also contains needed resources shared across all table -// TODO(yingwen): Track memory usage of all tables (or tables of space) pub struct Instance { /// Space storage space_store: Arc, @@ -157,6 +163,7 @@ pub struct Instance { // End of write group options. compaction_scheduler: CompactionSchedulerRef, file_purger: FilePurger, + wal_synchronizer: WalSynchronizer, meta_cache: Option, data_cache: Option, @@ -175,6 +182,11 @@ impl Instance { pub async fn close(&self) -> Result<()> { self.file_purger.stop().await.context(StopFilePurger)?; + self.wal_synchronizer + .stop() + .await + .context(StopWalSynchronizer)?; + self.space_store.close().await?; self.compaction_scheduler diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 37e435f5ac..cc3f9f2e56 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -37,6 +37,7 @@ use crate::{ space::{Space, SpaceId, SpaceRef}, sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger}, table::data::{TableData, TableDataRef}, + wal_synchronizer::{WalSynchronizer, WalSynchronizerConfig}, }; impl Instance { @@ -51,7 +52,7 @@ impl Instance { let space_store = Arc::new(SpaceStore { spaces: RwLock::new(Spaces::default()), manifest, - wal_manager, + wal_manager: wal_manager.clone(), store: store.clone(), sst_factory, meta_cache: ctx.meta_cache.clone(), @@ -68,6 +69,10 @@ impl Instance { let file_purger = FilePurger::start(&bg_runtime, store); + let mut wal_synchronizer = + WalSynchronizer::new(WalSynchronizerConfig::default(), wal_manager); + wal_synchronizer.start(&bg_runtime).await; + let instance = Arc::new(Instance { space_store, runtimes: ctx.runtimes.clone(), @@ -76,6 +81,7 @@ impl Instance { write_group_command_channel_cap: ctx.config.write_group_command_channel_cap, compaction_scheduler, file_purger, + wal_synchronizer, meta_cache: ctx.meta_cache.clone(), data_cache: ctx.data_cache.clone(), mem_usage_collector: Arc::new(MemUsageCollector::default()), diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index ae562770ef..444fb2ab2c 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -18,6 +18,7 @@ pub mod sst; mod storage_options; pub mod table; pub mod table_options; +mod wal_synchronizer; #[cfg(any(test, feature = "test"))] pub mod tests; diff --git a/analytic_engine/src/wal_synchronizer.rs b/analytic_engine/src/wal_synchronizer.rs new file mode 100644 index 0000000000..42c94c804e --- /dev/null +++ b/analytic_engine/src/wal_synchronizer.rs @@ -0,0 +1,380 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! WAL Synchronizer implementation. + +use std::{ + collections::{BTreeMap, VecDeque}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Duration, +}; + +use common_types::SequenceNumber; +use common_util::{ + define_result, + runtime::{JoinHandle, Runtime}, +}; +use log::{debug, error, info}; +use snafu::{ResultExt, Snafu}; +use table_engine::table::WriteRequest; +use tokio::{ + sync::{ + mpsc::{self, Receiver, Sender}, + Mutex, RwLock, + }, + time, +}; +use wal::{ + log_batch::LogEntry, + manager::{ + BatchLogIterator, BatchLogIteratorAdapter, ReadBoundary, ReadContext, ReadRequest, + RegionId, WalManagerRef, + }, +}; + +use self::role_table::ReaderTable; +use crate::payload::{ReadPayload, WalDecoder}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Failed to write to wal, err:{}", source))] + WriteLogBatch { + source: Box, + }, + + #[snafu(display("Failed to read wal, err:{}", source))] + ReadWal { source: wal::manager::Error }, + + #[snafu(display("Encounter invalid table state, err:{}", source))] + InvalidTableState { + source: Box, + }, + + #[snafu(display("Failed to stop synchronizer, err:{}", source))] + StopSynchronizer { + source: Box, + }, +} + +define_result!(Error); + +pub struct WalSynchronizerConfig { + /// Interval between two syncs + interval: Duration, + /// Used as WAL's read batch size + batch_size: usize, +} + +impl Default for WalSynchronizerConfig { + fn default() -> Self { + Self { + interval: Duration::from_secs(30), + batch_size: 128, + } + } +} + +/// A background synchronizer that keep polling WAL update. +/// +/// This [WalSynchronizer] has a queue of [RegionId]s that need synchronization. +/// Others can register new region with [register_table] method. And invalid +/// table will be removed automatically. The workflow looks like: +/// +/// ```plaintext +/// register IDs +/// need synchronization +/// ┌─────────────────────┐ +/// │ │ +/// │ ┌──────▼─────────┐ +/// ┌────┴─────┐ │ background │ +/// │Role Table│ │WAL Synchronizer│ +/// └────▲─────┘ └──────┬─────────┘ +/// │ │ +/// └─────────────────────┘ +/// synchronize log +/// to table +/// ``` +pub struct WalSynchronizer { + inner: Arc, + stop_sender: Sender<()>, + join_handle: Mutex>>, + stop_receiver: Option>, +} + +impl WalSynchronizer { + pub fn new(config: WalSynchronizerConfig, wal: WalManagerRef) -> Self { + let (tx, rx) = mpsc::channel(1); + let inner = Inner { + wal, + config, + tables: RwLock::default(), + }; + Self { + inner: Arc::new(inner), + stop_sender: tx, + stop_receiver: Some(rx), + join_handle: Mutex::new(None), + } + } + + pub async fn stop(&self) -> Result<()> { + let _ = self.stop_sender.send(()).await; + if let Some(handle) = self.join_handle.lock().await.take() { + handle + .await + .map_err(|e| Box::new(e) as _) + .context(StopSynchronizer)?; + } + + Ok(()) + } + + #[allow(dead_code)] + pub async fn register_table(&self, region_id: RegionId, table: ReaderTable) { + self.inner.register_table(region_id, table).await; + } + + pub async fn start(&mut self, runtime: &Runtime) { + let join_handle = runtime.spawn( + self.inner + .clone() + .start_synchronize(self.stop_receiver.take().unwrap()), + ); + *self.join_handle.lock().await = Some(join_handle); + } +} + +pub struct Inner { + wal: WalManagerRef, + config: WalSynchronizerConfig, + tables: RwLock>, +} + +impl Inner { + #[allow(dead_code)] + pub async fn register_table(&self, region_id: RegionId, table: ReaderTable) { + let state = SynchronizeState { + region_id, + table, + last_synced_seq: AtomicU64::new(SequenceNumber::MIN), + }; + self.tables.write().await.insert(region_id, state); + } + + pub async fn start_synchronize(self: Arc, mut stop_listener: Receiver<()>) { + info!("Wal Synchronizer Started"); + + // constants + let read_context = ReadContext { + batch_size: self.config.batch_size, + ..Default::default() + }; + + loop { + let mut invalid_regions = vec![]; + let tables = self.tables.read().await; + // todo: consider clone [SynchronizeState] out to release the read lock. + let states = tables.values().collect::>(); + + // Poll WAL region by region. + for state in states { + // check state before polling WAL + if !state.check_state() { + invalid_regions.push(state.region_id); + continue; + } + + // build wal iterator + let req = state.read_req(); + let mut iter = match self.wal.read_batch(&read_context, &req).await { + Err(e) => { + error!( + "Failed to read from WAL, read request: {:?}, error: {:?}", + req, e + ); + // Failed to read from wal cannot indicate the region is invalid. Just + // ignore it. + continue; + } + Ok(iter) => iter, + }; + + // double check state before writing to table. Error due to + // state changed after this check will be treat as normal error. + if !state.check_state() { + invalid_regions.push(state.region_id); + continue; + } + + // read logs from iterator + if let Err(e) = self.consume_logs(&mut iter, state).await { + error!("Failed to consume WAL, error: {:?}", e); + } + } + + drop(tables); + self.purge_invalid_region(&mut invalid_regions).await; + + if time::timeout(self.config.interval, stop_listener.recv()) + .await + .is_ok() + { + info!("WAL Synchronizer stopped"); + break; + } + } + } + + async fn consume_logs( + &self, + iter: &mut BatchLogIteratorAdapter, + synchronize_state: &SynchronizeState, + ) -> Result<()> { + let mut buf = VecDeque::with_capacity(self.config.batch_size); + let mut should_continue = true; + let mut max_seq = SequenceNumber::MIN; + + while should_continue { + // fetch entries + buf = iter + .next_log_entries(WalDecoder::default(), buf) + .await + .context(ReadWal)?; + if buf.len() <= self.config.batch_size { + should_continue = false; + } + + // record max sequence number + max_seq = max_seq.max( + buf.back() + .map(|entry| entry.sequence) + .unwrap_or(SequenceNumber::MIN), + ); + + self.replay_logs(&mut buf, synchronize_state).await?; + } + + // update sequence number in state + synchronize_state + .last_synced_seq + .fetch_max(max_seq, Ordering::Relaxed); + + Ok(()) + } + + async fn replay_logs( + &self, + logs: &mut VecDeque>, + synchronize_state: &SynchronizeState, + ) -> Result<()> { + for entry in logs.drain(..) { + match entry.payload { + ReadPayload::Write { row_group } => { + let write_req = WriteRequest { row_group }; + synchronize_state + .table + .write(write_req) + .await + .map_err(|e| Box::new(e) as _) + .context(WriteLogBatch)?; + } + ReadPayload::AlterSchema { schema: _schema } => todo!(), + ReadPayload::AlterOptions { options: _options } => todo!(), + } + } + Ok(()) + } + + /// Remove invalid regions from poll list. This function will clear the + /// `invalid_region` vec. + async fn purge_invalid_region(&self, invalid_regions: &mut Vec) { + if invalid_regions.is_empty() { + return; + } + debug!( + "Removing invalid region from WAL Synchronizer: {:?}", + invalid_regions + ); + + let mut tables = self.tables.write().await; + for region in invalid_regions.drain(..) { + tables.remove(®ion); + } + } +} + +struct SynchronizeState { + region_id: RegionId, + table: ReaderTable, + /// Atomic version of [SequenceNumber] + last_synced_seq: AtomicU64, +} + +impl SynchronizeState { + pub fn read_req(&self) -> ReadRequest { + ReadRequest { + region_id: self.region_id, + start: ReadBoundary::Excluded(self.last_synced_seq.load(Ordering::Relaxed)), + end: ReadBoundary::Max, + } + } + + /// Check whether the underlying table is outdated. + pub fn check_state(&self) -> bool { + self.table.check_state() + } +} + +// todo: remove this mock mod +#[allow(unused)] +mod role_table { + use std::convert::Infallible; + + use table_engine::table::WriteRequest; + + pub struct ReaderTable {} + + impl ReaderTable { + pub async fn write(&self, request: WriteRequest) -> Result<(), Infallible> { + Ok(()) + } + + pub fn check_state(&self) -> bool { + true + } + } +} + +#[cfg(test)] +mod test { + use tokio::time::sleep; + use wal::tests::util::{MemoryTableWalBuilder, TableKvTestEnv}; + + use super::*; + + fn build_env() -> TableKvTestEnv { + TableKvTestEnv::new(1, MemoryTableWalBuilder::default()) + } + + fn build_synchronizer(env: &TableKvTestEnv) -> WalSynchronizer { + env.runtime.block_on(async { + let wal = env.build_wal().await; + WalSynchronizer::new(WalSynchronizerConfig::default(), wal) + }) + } + + #[test] + fn synchronizer_start_stop() { + let env = build_env(); + let runtime = env.runtime.clone(); + let mut synchronizer = build_synchronizer(&env); + + runtime.clone().block_on(async move { + synchronizer.start(&runtime).await; + sleep(Duration::from_secs(1)).await; + synchronizer.stop().await.unwrap(); + }); + } +} diff --git a/wal/Cargo.toml b/wal/Cargo.toml index 1dfd478b42..a72567f180 100644 --- a/wal/Cargo.toml +++ b/wal/Cargo.toml @@ -6,6 +6,9 @@ authors = ["CeresDB Authors "] [package.edition] workspace = true +[features] +test = ["tempfile", "futures"] + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] @@ -13,12 +16,14 @@ async-trait = "0.1.53" common_util = {path = "../common_util"} common_types = {path = "../common_types"} chrono = "0.4" +futures = { version = "0.3", features = ["async-await"], optional = true } log = "0.4" serde = "1.0" serde_derive = "1.0" serde_json = "1.0.60" snafu = { version ="0.6.10", features = ["backtraces"] } table_kv = { path = "../components/table_kv" } +tempfile = { version = "3.1.0", optional = true } tokio = { version = "1.0", features = ["sync"] } [dev-dependencies] diff --git a/wal/src/lib.rs b/wal/src/lib.rs index a72e404ebe..91336b6232 100644 --- a/wal/src/lib.rs +++ b/wal/src/lib.rs @@ -8,5 +8,5 @@ pub mod manager; pub mod rocks_impl; pub mod table_kv_impl; -#[cfg(test)] -mod tests; +#[cfg(any(test, feature = "test"))] +pub mod tests; diff --git a/wal/src/tests/mod.rs b/wal/src/tests/mod.rs index c52a689521..05c72b43d5 100644 --- a/wal/src/tests/mod.rs +++ b/wal/src/tests/mod.rs @@ -2,5 +2,6 @@ //! integration tests for wal +#[cfg_attr(feature = "test", allow(dead_code, unused_imports))] mod read_write; pub mod util;