Skip to content

Commit

Permalink
feat: RoleTable trait and LeaderTable implementation (apache#188)
Browse files Browse the repository at this point in the history
* draft trait definition

Signed-off-by: Ruihang Xia <[email protected]>

* draft two impl (write and flush) as examples

Signed-off-by: Ruihang Xia <[email protected]>

* impl alter_schema, write and flush in LeaderTable

Signed-off-by: Ruihang Xia <[email protected]>

* impl RoleTable for LeaderTable

Signed-off-by: Ruihang Xia <[email protected]>

* add alter_option fn

Signed-off-by: Ruihang Xia <[email protected]>

* finish definition

Signed-off-by: Ruihang Xia <[email protected]>

* temporary workaround default policy for test

Signed-off-by: Ruihang Xia <[email protected]>

* address CR issues

Signed-off-by: Ruihang Xia <[email protected]>

* refine snafu result backtrace

Signed-off-by: Ruihang Xia <[email protected]>

* add comment for unknown policy

Signed-off-by: Ruihang Xia <[email protected]>

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Aug 22, 2022
1 parent f892ca9 commit 7ddc262
Show file tree
Hide file tree
Showing 10 changed files with 390 additions and 73 deletions.
52 changes: 32 additions & 20 deletions analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,20 @@ use crate::{
table_options,
};

/// Policy of how to perform flush operation.
#[derive(Default, Debug, Clone, Copy)]
pub enum TableAlterSchemaPolicy {
/// Unknown flush policy, this is the default value.
#[default]
Unknown,
/// Perform Alter Schema operation.
Alter,
// TODO: use this policy and remove "allow(dead_code)"
/// Ignore this operation.
#[allow(dead_code)]
Noop,
}

impl Instance {
// Alter schema need to be handled by write worker.
pub async fn alter_schema_of_table(
Expand All @@ -44,7 +58,8 @@ impl Instance {
// Create a oneshot channel to send/receive alter schema result.
let (tx, rx) = oneshot::channel();
let cmd = AlterSchemaCommand {
space_table: space_table.clone(),
// space_table: space_table.clone(),
table_data: space_table.table_data().clone(),
request,
tx,
};
Expand All @@ -69,10 +84,10 @@ impl Instance {
pub(crate) async fn process_alter_schema_command(
self: &Arc<Self>,
worker_local: &mut WorkerLocal,
space_table: &SpaceAndTable,
table_data: &TableDataRef,
request: AlterSchemaRequest,
#[allow(unused_variables)] policy: TableAlterSchemaPolicy,
) -> Result<()> {
let table_data = space_table.table_data();
// Validate alter schema request.
self.validate_before_alter(table_data, &request)?;

Expand All @@ -89,14 +104,14 @@ impl Instance {
self.flush_table_in_worker(worker_local, table_data, opts)
.await
.context(FlushTable {
space_id: space_table.space().id,
space_id: table_data.space_id,
table: &table_data.name,
table_id: table_data.id,
})?;

// Build alter op
let manifest_update = AlterSchemaMeta {
space_id: space_table.space().id,
space_id: table_data.space_id,
table_id: table_data.id,
schema: request.schema.clone(),
pre_schema_version: request.pre_schema_version,
Expand All @@ -107,7 +122,7 @@ impl Instance {
let payload = WritePayload::AlterSchema(&alter_schema_pb);

// Encode payloads
let region_id = space_table.table_data().wal_region_id();
let region_id = table_data.wal_region_id();
let log_batch_encoder =
self.space_store
.wal_manager
Expand All @@ -130,7 +145,7 @@ impl Instance {
.await
.map_err(|e| Box::new(e) as _)
.context(WriteWal {
space_id: space_table.space().id,
space_id: table_data.space_id,
table: &table_data.name,
table_id: table_data.id,
})?;
Expand All @@ -147,7 +162,7 @@ impl Instance {
.store_update(update)
.await
.context(WriteManifest {
space_id: space_table.space().id,
space_id: table_data.space_id,
table: &table_data.name,
table_id: table_data.id,
})?;
Expand Down Expand Up @@ -197,6 +212,7 @@ impl Instance {
pub async fn alter_options_of_table(
&self,
space_table: &SpaceAndTable,
// todo: encapsulate this into a struct like other functions.
options: HashMap<String, String>,
) -> Result<()> {
info!(
Expand All @@ -207,7 +223,7 @@ impl Instance {
// Create a oneshot channel to send/receive alter options result.
let (tx, rx) = oneshot::channel();
let cmd = AlterOptionsCommand {
space_table: space_table.clone(),
table_data: space_table.table_data().clone(),
options,
tx,
};
Expand All @@ -232,10 +248,9 @@ impl Instance {
pub(crate) async fn process_alter_options_command(
self: &Arc<Self>,
worker_local: &mut WorkerLocal,
space_table: &SpaceAndTable,
table_data: &TableDataRef,
options: HashMap<String, String>,
) -> Result<()> {
let table_data = space_table.table_data();
ensure!(
!table_data.is_dropped(),
AlterDroppedTable {
Expand All @@ -249,22 +264,19 @@ impl Instance {
let current_table_options = table_data.table_options();
info!(
"Instance alter options, space_id:{}, tables:{:?}, old_table_opts:{:?}, options:{:?}",
space_table.space().id,
space_table.table_data().name,
current_table_options,
options
table_data.space_id, table_data.name, current_table_options, options
);
let mut table_opts =
table_options::merge_table_options_for_alter(&options, &current_table_options)
.map_err(|e| Box::new(e) as _)
.context(InvalidOptions {
space_id: space_table.space().id,
space_id: table_data.space_id,
table: &table_data.name,
table_id: table_data.id,
})?;
table_opts.sanitize();
let manifest_update = AlterOptionsMeta {
space_id: space_table.space().id,
space_id: table_data.space_id,
table_id: table_data.id,
options: table_opts.clone(),
};
Expand All @@ -278,7 +290,7 @@ impl Instance {
let payload = WritePayload::AlterOption(&alter_options_pb);

// Encode payload
let region_id = space_table.table_data().wal_region_id();
let region_id = table_data.wal_region_id();
let log_batch_encoder =
self.space_store
.wal_manager
Expand All @@ -301,7 +313,7 @@ impl Instance {
.await
.map_err(|e| Box::new(e) as _)
.context(WriteWal {
space_id: space_table.space().id,
space_id: table_data.space_id,
table: &table_data.name,
table_id: table_data.id,
})?;
Expand All @@ -313,7 +325,7 @@ impl Instance {
.store_update(meta_update)
.await
.context(WriteManifest {
space_id: space_table.space().id,
space_id: table_data.space_id,
table: &table_data.name,
table_id: table_data.id,
})?;
Expand Down
35 changes: 25 additions & 10 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ pub enum Error {

#[snafu(display("Runtime join error, source:{}", source))]
RuntimeJoin { source: common_util::runtime::Error },

#[snafu(display("Unknown flush policy"))]
UnknownPolicy { backtrace: Backtrace },
}

define_result!(Error);
Expand All @@ -145,6 +148,8 @@ pub struct TableFlushOptions {
///
/// Default is false.
pub block_on_write_thread: bool,
/// Flush policy
pub policy: TableFlushPolicy,
}

impl Default for TableFlushOptions {
Expand All @@ -153,6 +158,7 @@ impl Default for TableFlushOptions {
res_sender: None,
compact_after_flush: true,
block_on_write_thread: false,
policy: TableFlushPolicy::Dump,
}
}
}
Expand All @@ -163,14 +169,18 @@ pub struct TableFlushRequest {
pub table_data: TableDataRef,
/// Max sequence number to flush (inclusive).
pub max_sequence: SequenceNumber,
/// Flush policy.
pub policy: TableFlushPolicy,
}

/// Policy of how to perform flush operation.
#[derive(Debug, Clone, Copy)]
#[derive(Default, Debug, Clone, Copy)]
pub enum TableFlushPolicy {
/// Unknown policy, this is the default value and operation will report
/// error for it. Others except `RoleTable` should set policy to this
/// variant.
Unknown,
/// Dump memtable to sst file.
// todo: the default value should be [Unknown].
#[default]
Dump,
// TODO: use this policy and remove "allow(dead_code)"
/// Drop memtables.
Expand All @@ -193,7 +203,7 @@ impl Instance {
// Create a oneshot channel to send/receive flush result.
let (tx, rx) = oneshot::channel();
let cmd = FlushTableCommand {
space_table: space_table.clone(),
table_data: space_table.table_data().clone(),
flush_opts,
tx,
};
Expand All @@ -217,7 +227,7 @@ impl Instance {
let (compact_tx, compact_rx) = oneshot::channel();
// Create a oneshot channel to send/receive compaction result.
let cmd = CompactTableCommand {
space_table: space_table.clone(),
table_data: space_table.table_data().clone(),
waiter: Some(compact_tx),
tx,
};
Expand All @@ -240,7 +250,7 @@ impl Instance {
}

/// Flush given table in write worker thread.
pub async fn flush_table_in_worker(
pub(crate) async fn flush_table_in_worker(
self: &Arc<Self>,
worker_local: &mut WorkerLocal,
table_data: &TableDataRef,
Expand Down Expand Up @@ -300,7 +310,6 @@ impl Instance {
Ok(TableFlushRequest {
table_data: table_data.clone(),
max_sequence: last_sequence,
policy: TableFlushPolicy::Dump,
})
}

Expand All @@ -317,7 +326,7 @@ impl Instance {
let table = table_data.name.clone();

let instance = self.clone();
let flush_job = async move { instance.flush_memtables(&flush_req).await };
let flush_job = async move { instance.flush_memtables(&flush_req, opts.policy).await };

let compact_req = TableCompactionRequest::no_waiter(
table_data.clone(),
Expand Down Expand Up @@ -358,11 +367,14 @@ impl Instance {
}

/// Each table can only have one running flush job.
async fn flush_memtables(&self, flush_req: &TableFlushRequest) -> Result<()> {
async fn flush_memtables(
&self,
flush_req: &TableFlushRequest,
policy: TableFlushPolicy,
) -> Result<()> {
let TableFlushRequest {
table_data,
max_sequence,
policy,
} = flush_req;

let current_version = table_data.current_version();
Expand All @@ -384,6 +396,9 @@ impl Instance {
let _timer = local_metrics.flush_duration_histogram.start_timer();

match policy {
TableFlushPolicy::Unknown => {
return UnknownPolicy {}.fail();
}
TableFlushPolicy::Dump => {
self.dump_memtables(table_data, request_id, &mems_to_flush)
.await?
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! The root mod only contains common functions of instance, other logics are
//! divided into the sub crates
mod alter;
pub(crate) mod alter;
mod close;
mod create;
mod drop;
Expand All @@ -14,7 +14,7 @@ pub mod flush_compaction;
pub(crate) mod mem_collector;
pub mod open;
mod read;
mod write;
pub(crate) mod write;
pub mod write_worker;

use std::{
Expand Down
3 changes: 2 additions & 1 deletion analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
ApplyMemTable, FlushTable, OperateByWriteWorker, ReadMetaUpdate, ReadWal,
RecoverTableData, Result,
},
flush_compaction::TableFlushOptions,
flush_compaction::{TableFlushOptions, TableFlushPolicy},
mem_collector::MemUsageCollector,
write_worker,
write_worker::{RecoverTableCommand, WorkerLocal, WriteGroup},
Expand Down Expand Up @@ -376,6 +376,7 @@ impl Instance {
res_sender: None,
compact_after_flush: false,
block_on_write_thread: false,
policy: TableFlushPolicy::Dump,
};
self.flush_table_in_worker(worker_local, table_data, opts)
.await
Expand Down
Loading

0 comments on commit 7ddc262

Please sign in to comment.