Skip to content

Commit

Permalink
feat: WalReplicator skeleton implementation (apache#179)
Browse files Browse the repository at this point in the history
* feat: draft WalReplicator

Signed-off-by: Ruihang Xia <[email protected]>

* add document

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy warnings

Signed-off-by: Ruihang Xia <[email protected]>

* simple start stop test

Signed-off-by: Ruihang Xia <[email protected]>

* chore: CR namings

Signed-off-by: Ruihang Xia <[email protected]>

* rename Replicator to Synchronizer

Signed-off-by: Ruihang Xia <[email protected]>

* remove redundent cfg

Signed-off-by: Ruihang Xia <[email protected]>

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Aug 23, 2022
1 parent 7ddc262 commit ea8943e
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 4 deletions.
1 change: 1 addition & 0 deletions analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ common_types = { path = "../common_types", features = ["test"] }
common_util = { path = "../common_util", features = ["test"] }
env_logger = "0.6"
tempfile = "3.1.0"
wal = { path = "../wal", features = ["test"] }
14 changes: 13 additions & 1 deletion analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ use crate::{
space::{SpaceId, SpaceRef},
sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger},
table::data::TableDataRef,
wal_synchronizer::WalSynchronizer,
TableOptions,
};

#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Failed to stop file purger, err:{}", source))]
Expand All @@ -49,6 +51,11 @@ pub enum Error {
StopScheduler {
source: crate::compaction::scheduler::Error,
},

#[snafu(display("Failed to stop WAL Synchronizer, err:{}", source))]
StopWalSynchronizer {
source: crate::wal_synchronizer::Error,
},
}

define_result!(Error);
Expand Down Expand Up @@ -141,7 +148,6 @@ impl SpaceStore {
/// Table engine instance
///
/// Manages all spaces, also contains needed resources shared across all table
// TODO(yingwen): Track memory usage of all tables (or tables of space)
pub struct Instance {
/// Space storage
space_store: Arc<SpaceStore>,
Expand All @@ -157,6 +163,7 @@ pub struct Instance {
// End of write group options.
compaction_scheduler: CompactionSchedulerRef,
file_purger: FilePurger,
wal_synchronizer: WalSynchronizer,

meta_cache: Option<MetaCacheRef>,
data_cache: Option<DataCacheRef>,
Expand All @@ -175,6 +182,11 @@ impl Instance {
pub async fn close(&self) -> Result<()> {
self.file_purger.stop().await.context(StopFilePurger)?;

self.wal_synchronizer
.stop()
.await
.context(StopWalSynchronizer)?;

self.space_store.close().await?;

self.compaction_scheduler
Expand Down
8 changes: 7 additions & 1 deletion analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::{
space::{Space, SpaceId, SpaceRef},
sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger},
table::data::{TableData, TableDataRef},
wal_synchronizer::{WalSynchronizer, WalSynchronizerConfig},
};

impl Instance {
Expand All @@ -51,7 +52,7 @@ impl Instance {
let space_store = Arc::new(SpaceStore {
spaces: RwLock::new(Spaces::default()),
manifest,
wal_manager,
wal_manager: wal_manager.clone(),
store: store.clone(),
sst_factory,
meta_cache: ctx.meta_cache.clone(),
Expand All @@ -68,6 +69,10 @@ impl Instance {

let file_purger = FilePurger::start(&bg_runtime, store);

let mut wal_synchronizer =
WalSynchronizer::new(WalSynchronizerConfig::default(), wal_manager);
wal_synchronizer.start(&bg_runtime).await;

let instance = Arc::new(Instance {
space_store,
runtimes: ctx.runtimes.clone(),
Expand All @@ -76,6 +81,7 @@ impl Instance {
write_group_command_channel_cap: ctx.config.write_group_command_channel_cap,
compaction_scheduler,
file_purger,
wal_synchronizer,
meta_cache: ctx.meta_cache.clone(),
data_cache: ctx.data_cache.clone(),
mem_usage_collector: Arc::new(MemUsageCollector::default()),
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod sst;
mod storage_options;
pub mod table;
pub mod table_options;
mod wal_synchronizer;

#[cfg(any(test, feature = "test"))]
pub mod tests;
Expand Down
Loading

0 comments on commit ea8943e

Please sign in to comment.