Skip to content

Commit

Permalink
feat: setup in different deployment mode (apache#190)
Browse files Browse the repository at this point in the history
* refactor: support build TableId from raw u64

* refactor: make Manager as trait object

* refactor: fix test modules for compatibility

* feat: support building server in cluster deloy mode

* refactor: setup server for different mode

* refactor: simplify ClusterConfig

* refactor: separate id alloc implemenatations from setup.rs

* chore: update ceresdbproto crate

* refactor: avoid creating defualt schema when create catalog_manager

* refactor: remove Node struct

* fix: remove http prefix for default meta_addr

* fix: start the cluster first

* fix: wrong endpoint formatting

* chore: fix lint issues

* chore: fix clippy complaints

* refactor: address CR issues

* refactor: remove ResponseHeader in the returned value of MetaClientV2

* refactor: address CR issues

* refactor: rename TableId building method

* fix: update table id formatted string

* fix: TableId debug formatted text
  • Loading branch information
ShiKaiWi authored Aug 17, 2022
1 parent 8348d80 commit eaa4588
Show file tree
Hide file tree
Showing 52 changed files with 692 additions and 572 deletions.
14 changes: 9 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,16 @@ path = "src/bin/ceresdb-server.rs"
[dependencies]
# Workspace dependencies, in alphabetical order
analytic_engine = { path = "analytic_engine" }
async-trait = "0.1.53"
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
clap = "2.0"
cluster = { path = "cluster" }
common_util = { path = "common_util" }
df_operator = { path = "df_operator" }
log = "0.4"
logger = { path = "components/logger" }
meta_client_v2 = { path = "meta_client_v2" }
query_engine = { path = "query_engine" }
server = { path = "server" }
table_engine = { path = "table_engine" }
Expand Down
27 changes: 14 additions & 13 deletions analytic_engine/src/meta/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,10 +726,11 @@ mod tests {
}

fn alloc_table_id(&self) -> TableId {
TableId::new(
TableId::with_seq(
self.schema_id,
self.table_seq_gen.alloc_table_seq().unwrap(),
)
.unwrap()
}

fn table_name_from_id(table_id: TableId) -> String {
Expand Down Expand Up @@ -934,7 +935,7 @@ mod tests {

#[test]
fn test_manifest_add_table() {
let ctx = TestContext::new("add_table", SchemaId::new(0).unwrap());
let ctx = TestContext::new("add_table", SchemaId::from_u32(0));
run_basic_manifest_test(ctx, |ctx, table_id, manifest_data_builder| {
Box::pin(async move {
ctx.add_table(table_id, manifest_data_builder).await;
Expand All @@ -944,7 +945,7 @@ mod tests {

#[test]
fn test_manifest_drop_table() {
let ctx = TestContext::new("drop_table", SchemaId::new(0).unwrap());
let ctx = TestContext::new("drop_table", SchemaId::from_u32(0));
run_basic_manifest_test(ctx, |ctx, table_id, manifest_data_builder| {
Box::pin(async move {
ctx.add_table(table_id, manifest_data_builder).await;
Expand All @@ -955,7 +956,7 @@ mod tests {

#[test]
fn test_manifest_version_edit() {
let ctx = TestContext::new("version_edit", SchemaId::new(0).unwrap());
let ctx = TestContext::new("version_edit", SchemaId::from_u32(0));
run_basic_manifest_test(ctx, |ctx, table_id, manifest_data_builder| {
Box::pin(async move {
ctx.add_table(table_id, manifest_data_builder).await;
Expand All @@ -967,7 +968,7 @@ mod tests {

#[test]
fn test_manifest_alter_options() {
let ctx = TestContext::new("version_edit", SchemaId::new(0).unwrap());
let ctx = TestContext::new("version_edit", SchemaId::from_u32(0));
run_basic_manifest_test(ctx, |ctx, table_id, manifest_data_builder| {
Box::pin(async move {
ctx.add_table(table_id, manifest_data_builder).await;
Expand All @@ -979,7 +980,7 @@ mod tests {

#[test]
fn test_manifest_alter_schema() {
let ctx = TestContext::new("version_edit", SchemaId::new(0).unwrap());
let ctx = TestContext::new("version_edit", SchemaId::from_u32(0));
run_basic_manifest_test(ctx, |ctx, table_id, manifest_data_builder| {
Box::pin(async move {
ctx.add_table(table_id, manifest_data_builder).await;
Expand All @@ -991,7 +992,7 @@ mod tests {

#[test]
fn test_manifest_snapshot_one_table() {
let ctx = TestContext::new("snapshot_one_table", SchemaId::new(0).unwrap());
let ctx = TestContext::new("snapshot_one_table", SchemaId::from_u32(0));
let runtime = ctx.runtime.clone();
runtime.block_on(async move {
let table_id = ctx.alloc_table_id();
Expand Down Expand Up @@ -1020,7 +1021,7 @@ mod tests {

#[test]
fn test_manifest_snapshot_one_table_massive_logs() {
let ctx = TestContext::new("snapshot_one_table_massive_logs", SchemaId::new(0).unwrap());
let ctx = TestContext::new("snapshot_one_table_massive_logs", SchemaId::from_u32(0));
let runtime = ctx.runtime.clone();
runtime.block_on(async move {
let table_id = ctx.alloc_table_id();
Expand Down Expand Up @@ -1207,7 +1208,7 @@ mod tests {
fn test_no_snapshot_logs_merge() {
let ctx = Arc::new(TestContext::new(
"snapshot_merge_no_snapshot",
SchemaId::new(0).unwrap(),
SchemaId::from_u32(0),
));
let table_id = ctx.alloc_table_id();
let logs: Vec<(&str, MetaUpdateLogEntry)> = vec![
Expand Down Expand Up @@ -1262,7 +1263,7 @@ mod tests {
fn test_multiple_snapshot_merge_normal() {
let ctx = Arc::new(TestContext::new(
"snapshot_merge_normal",
SchemaId::new(0).unwrap(),
SchemaId::from_u32(0),
));
let table_id = ctx.alloc_table_id();
let logs: Vec<(&str, MetaUpdateLogEntry)> = vec![
Expand Down Expand Up @@ -1349,7 +1350,7 @@ mod tests {
fn test_multiple_snapshot_merge_interleaved_snapshot() {
let ctx = Arc::new(TestContext::new(
"snapshot_merge_interleaved",
SchemaId::new(0).unwrap(),
SchemaId::from_u32(0),
));
let table_id = ctx.alloc_table_id();
let logs: Vec<(&str, MetaUpdateLogEntry)> = vec![
Expand Down Expand Up @@ -1439,7 +1440,7 @@ mod tests {
fn test_multiple_snapshot_merge_sneaked_update() {
let ctx = Arc::new(TestContext::new(
"snapshot_merge_sneaked_update",
SchemaId::new(0).unwrap(),
SchemaId::from_u32(0),
));
let table_id = ctx.alloc_table_id();
let logs: Vec<(&str, MetaUpdateLogEntry)> = vec![
Expand Down Expand Up @@ -1502,7 +1503,7 @@ mod tests {
fn test_multiple_snapshot_drop_table() {
let ctx = Arc::new(TestContext::new(
"snapshot_drop_table",
SchemaId::new(0).unwrap(),
SchemaId::from_u32(0),
));
let table_id = ctx.alloc_table_id();
let logs: Vec<(&str, MetaUpdateLogEntry)> = vec![
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ pub mod tests {
let create_request = CreateTableRequest {
catalog_name: "test_catalog".to_string(),
schema_name: "public".to_string(),
schema_id: SchemaId::new(DEFAULT_SPACE_ID).unwrap(),
schema_id: SchemaId::from_u32(DEFAULT_SPACE_ID),
table_id: self.table_id,
table_name: self.table_name,
table_schema,
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/tests/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use table_engine::{
use crate::{table_options, tests::row_util};

pub fn new_table_id(schema_id: u16, table_seq: u32) -> TableId {
TableId::new(SchemaId::from(schema_id), TableSeq::from(table_seq))
TableId::with_seq(SchemaId::from(schema_id), TableSeq::from(table_seq)).unwrap()
}

pub type RowTuple<'a> = (&'a str, Timestamp, &'a str, f64, f64, &'a str);
Expand Down Expand Up @@ -298,7 +298,7 @@ impl Default for Builder {
create_request: CreateTableRequest {
catalog_name: "ceresdb".to_string(),
schema_name: "public".to_string(),
schema_id: SchemaId::new(2).unwrap(),
schema_id: SchemaId::from_u32(2),
table_id: new_table_id(2, 1),
table_name: "test_table".to_string(),
table_schema: FixedSchemaTable::default_schema(),
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ impl TestEnv {
runtimes: self.runtimes.clone(),
builder: T::default(),
engine: None,
schema_id: SchemaId::new(100).unwrap(),
schema_id: SchemaId::from_u32(100),
last_table_seq: 1,
name_to_tables: HashMap::new(),
}
Expand Down
12 changes: 11 additions & 1 deletion catalog/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

//! Catalog manager
use std::sync::Arc;

use snafu::Snafu;

use crate::{schema::NameRef, CatalogRef};
Expand All @@ -17,11 +19,17 @@ define_result!(Error);
// TODO(yingwen): Maybe use async trait?
// TODO(yingwen): Provide a context

pub trait Manager: Clone + Send + Sync {
pub trait Manager: Send + Sync {
/// Get the default catalog name
///
/// Default catalog is ensured created because no method to create catalog
/// is provided.
fn default_catalog_name(&self) -> NameRef;

/// Get the default schema name
///
/// Default schema may be not created by the implementation and the caller
/// may need to create that by itself.
fn default_schema_name(&self) -> NameRef;

/// Find the catalog by name
Expand All @@ -30,3 +38,5 @@ pub trait Manager: Clone + Send + Sync {
/// All catalogs
fn all_catalogs(&self) -> Result<Vec<CatalogRef>>;
}

pub type ManagerRef = Arc<dyn Manager>;
33 changes: 19 additions & 14 deletions catalog_impls/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
use std::sync::Arc;

use async_trait::async_trait;
use catalog::{consts::SYSTEM_CATALOG, manager::Manager, schema::NameRef, CatalogRef};
use catalog::{
consts::SYSTEM_CATALOG,
manager::{Manager, ManagerRef},
schema::NameRef,
CatalogRef,
};
use system_catalog::{tables::Tables, SystemTableAdapter};
use table_engine::table::{SchemaId, TableId};

Expand All @@ -15,13 +20,13 @@ pub mod volatile;

/// CatalogManagerImpl is a wrapper for system and user tables
#[derive(Clone)]
pub struct CatalogManagerImpl<M> {
pub struct CatalogManagerImpl {
system_tables: SystemTables,
user_catalog_manager: M,
user_catalog_manager: ManagerRef,
}

impl<M: Manager + 'static> CatalogManagerImpl<M> {
pub fn new(manager: M) -> Self {
impl CatalogManagerImpl {
pub fn new(manager: ManagerRef) -> Self {
let mut system_tables_builder = SystemTablesBuilder::new();
system_tables_builder = system_tables_builder
.insert_table(SystemTableAdapter::new(Tables::new(manager.clone())));
Expand All @@ -32,7 +37,7 @@ impl<M: Manager + 'static> CatalogManagerImpl<M> {
}
}

impl<M: Manager> Manager for CatalogManagerImpl<M> {
impl Manager for CatalogManagerImpl {
fn default_catalog_name(&self) -> NameRef {
self.user_catalog_manager.default_catalog_name()
}
Expand All @@ -56,25 +61,25 @@ impl<M: Manager> Manager for CatalogManagerImpl<M> {
#[async_trait]
pub trait SchemaIdAlloc: Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
async fn alloc_schema_id(
async fn alloc_schema_id<'a>(
&self,
schema_name: NameRef,
schema_name: NameRef<'a>,
) -> std::result::Result<SchemaId, Self::Error>;
}

#[async_trait]
pub trait TableIdAlloc: Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
async fn alloc_table_id(
async fn alloc_table_id<'a>(
&self,
schema_name: NameRef,
table_name: NameRef,
schema_name: NameRef<'a>,
table_name: NameRef<'a>,
) -> std::result::Result<TableId, Self::Error>;

async fn invalidate_table_id(
async fn invalidate_table_id<'a>(
&self,
schema_name: NameRef,
table_name: NameRef,
schema_name: NameRef<'a>,
table_name: NameRef<'a>,
table_id: TableId,
) -> std::result::Result<(), Self::Error>;
}
Loading

0 comments on commit eaa4588

Please sign in to comment.