From eaa458853a0b48e503237cb928547a90b0137eca Mon Sep 17 00:00:00 2001 From: WEI Xikai Date: Wed, 17 Aug 2022 10:23:49 +0800 Subject: [PATCH] feat: setup in different deployment mode (#190) * refactor: support build TableId from raw u64 * refactor: make Manager as trait object * refactor: fix test modules for compatibility * feat: support building server in cluster deloy mode * refactor: setup server for different mode * refactor: simplify ClusterConfig * refactor: separate id alloc implemenatations from setup.rs * chore: update ceresdbproto crate * refactor: avoid creating defualt schema when create catalog_manager * refactor: remove Node struct * fix: remove http prefix for default meta_addr * fix: start the cluster first * fix: wrong endpoint formatting * chore: fix lint issues * chore: fix clippy complaints * refactor: address CR issues * refactor: remove ResponseHeader in the returned value of MetaClientV2 * refactor: address CR issues * refactor: rename TableId building method * fix: update table id formatted string * fix: TableId debug formatted text --- Cargo.lock | 14 ++- Cargo.toml | 3 + analytic_engine/src/meta/details.rs | 27 +++-- analytic_engine/src/table/data.rs | 2 +- analytic_engine/src/tests/table.rs | 4 +- analytic_engine/src/tests/util.rs | 2 +- catalog/src/manager.rs | 12 +- catalog_impls/src/lib.rs | 33 +++--- catalog_impls/src/table_based.rs | 85 ++++++------- catalog_impls/src/volatile.rs | 103 ++-------------- ceresdbproto_deps/Cargo.toml | 2 +- cluster/src/cluster_impl.rs | 4 +- cluster/src/config.rs | 14 +-- cluster/src/lib.rs | 2 +- interpreters/src/create.rs | 14 +-- interpreters/src/drop.rs | 14 +-- interpreters/src/factory.rs | 14 ++- interpreters/src/show.rs | 26 ++-- interpreters/src/tests.rs | 10 +- meta_client/src/lib.rs | 4 +- meta_client_v2/src/lib.rs | 19 ++- meta_client_v2/src/meta_impl.rs | 45 ++++--- meta_client_v2/src/types.rs | 74 ++---------- server/Cargo.toml | 3 +- server/src/config.rs | 15 ++- server/src/grpc/mod.rs | 53 ++++----- server/src/grpc/prom_query.rs | 10 +- server/src/grpc/query.rs | 13 +- server/src/grpc/route.rs | 7 +- server/src/grpc/write.rs | 21 ++-- server/src/handlers/admin.rs | 4 +- server/src/handlers/sql.rs | 6 +- server/src/http.rs | 25 ++-- server/src/instance.rs | 8 +- server/src/mysql/builder.rs | 13 +- server/src/mysql/service.rs | 15 ++- server/src/mysql/worker.rs | 16 +-- server/src/router.rs | 8 +- server/src/server.rs | 74 ++++++++++-- sql/Cargo.toml | 2 +- sql/src/frontend.rs | 2 +- sql/src/planner.rs | 20 +++- sql/src/promql/convert.rs | 2 +- sql/src/provider.rs | 8 +- src/adapter.rs | 151 ++++++++++++++++++++++++ src/lib.rs | 3 +- src/setup.rs | 118 ++++++++++++------ src/signal_handler.rs | 6 +- system_catalog/src/lib.rs | 9 +- system_catalog/src/sys_catalog_table.rs | 16 +-- system_catalog/src/tables.rs | 14 +-- table_engine/src/table.rs | 95 ++++++--------- 52 files changed, 692 insertions(+), 572 deletions(-) create mode 100644 src/adapter.rs diff --git a/Cargo.lock b/Cargo.lock index b3051b3ddf..373c4eeb98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -757,13 +757,16 @@ name = "ceresdb" version = "0.1.0" dependencies = [ "analytic_engine", + "async-trait", "catalog", "catalog_impls", "clap", + "cluster", "common_util", "df_operator", "log", "logger", + "meta_client_v2", "query_engine", "server", "signal-hook", @@ -798,7 +801,7 @@ dependencies = [ [[package]] name = "ceresdbproto" version = "0.1.0" -source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=2c10152d021cd5a26b9c870cdede6a0317adca3d#2c10152d021cd5a26b9c870cdede6a0317adca3d" +source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=12a0ee8e45883056f275c707518fa9ed508676d1#12a0ee8e45883056f275c707518fa9ed508676d1" dependencies = [ "futures 0.3.21", "grpcio 0.9.1", @@ -809,7 +812,7 @@ dependencies = [ [[package]] name = "ceresdbproto" version = "0.1.0" -source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=ec5404e0f60fc748d3e8b68a592e2926ac1e84de#ec5404e0f60fc748d3e8b68a592e2926ac1e84de" +source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=2c10152d021cd5a26b9c870cdede6a0317adca3d#2c10152d021cd5a26b9c870cdede6a0317adca3d" dependencies = [ "futures 0.3.21", "grpcio 0.9.1", @@ -821,7 +824,7 @@ dependencies = [ name = "ceresdbproto_deps" version = "0.1.0" dependencies = [ - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=ec5404e0f60fc748d3e8b68a592e2926ac1e84de)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=12a0ee8e45883056f275c707518fa9ed508676d1)", ] [[package]] @@ -4831,7 +4834,8 @@ dependencies = [ "async-trait", "avro-rs", "catalog", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2c10152d021cd5a26b9c870cdede6a0317adca3d)", + "ceresdbproto_deps", + "cluster", "common_types 0.1.0", "common_util", "df_operator", @@ -5148,7 +5152,7 @@ version = "0.1.0" dependencies = [ "arrow_deps 0.1.0", "catalog", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2c10152d021cd5a26b9c870cdede6a0317adca3d)", + "ceresdbproto_deps", "common_types 0.1.0", "common_util", "df_operator", diff --git a/Cargo.toml b/Cargo.toml index 11c59507e6..d13557863b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,13 +51,16 @@ path = "src/bin/ceresdb-server.rs" [dependencies] # Workspace dependencies, in alphabetical order analytic_engine = { path = "analytic_engine" } +async-trait = "0.1.53" catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } clap = "2.0" +cluster = { path = "cluster" } common_util = { path = "common_util" } df_operator = { path = "df_operator" } log = "0.4" logger = { path = "components/logger" } +meta_client_v2 = { path = "meta_client_v2" } query_engine = { path = "query_engine" } server = { path = "server" } table_engine = { path = "table_engine" } diff --git a/analytic_engine/src/meta/details.rs b/analytic_engine/src/meta/details.rs index ff03a177ec..bd9bdc4e06 100644 --- a/analytic_engine/src/meta/details.rs +++ b/analytic_engine/src/meta/details.rs @@ -726,10 +726,11 @@ mod tests { } fn alloc_table_id(&self) -> TableId { - TableId::new( + TableId::with_seq( self.schema_id, self.table_seq_gen.alloc_table_seq().unwrap(), ) + .unwrap() } fn table_name_from_id(table_id: TableId) -> String { @@ -934,7 +935,7 @@ mod tests { #[test] fn test_manifest_add_table() { - let ctx = TestContext::new("add_table", SchemaId::new(0).unwrap()); + let ctx = TestContext::new("add_table", SchemaId::from_u32(0)); run_basic_manifest_test(ctx, |ctx, table_id, manifest_data_builder| { Box::pin(async move { ctx.add_table(table_id, manifest_data_builder).await; @@ -944,7 +945,7 @@ mod tests { #[test] fn test_manifest_drop_table() { - let ctx = TestContext::new("drop_table", SchemaId::new(0).unwrap()); + let ctx = TestContext::new("drop_table", SchemaId::from_u32(0)); run_basic_manifest_test(ctx, |ctx, table_id, manifest_data_builder| { Box::pin(async move { ctx.add_table(table_id, manifest_data_builder).await; @@ -955,7 +956,7 @@ mod tests { #[test] fn test_manifest_version_edit() { - let ctx = TestContext::new("version_edit", SchemaId::new(0).unwrap()); + let ctx = TestContext::new("version_edit", SchemaId::from_u32(0)); run_basic_manifest_test(ctx, |ctx, table_id, manifest_data_builder| { Box::pin(async move { ctx.add_table(table_id, manifest_data_builder).await; @@ -967,7 +968,7 @@ mod tests { #[test] fn test_manifest_alter_options() { - let ctx = TestContext::new("version_edit", SchemaId::new(0).unwrap()); + let ctx = TestContext::new("version_edit", SchemaId::from_u32(0)); run_basic_manifest_test(ctx, |ctx, table_id, manifest_data_builder| { Box::pin(async move { ctx.add_table(table_id, manifest_data_builder).await; @@ -979,7 +980,7 @@ mod tests { #[test] fn test_manifest_alter_schema() { - let ctx = TestContext::new("version_edit", SchemaId::new(0).unwrap()); + let ctx = TestContext::new("version_edit", SchemaId::from_u32(0)); run_basic_manifest_test(ctx, |ctx, table_id, manifest_data_builder| { Box::pin(async move { ctx.add_table(table_id, manifest_data_builder).await; @@ -991,7 +992,7 @@ mod tests { #[test] fn test_manifest_snapshot_one_table() { - let ctx = TestContext::new("snapshot_one_table", SchemaId::new(0).unwrap()); + let ctx = TestContext::new("snapshot_one_table", SchemaId::from_u32(0)); let runtime = ctx.runtime.clone(); runtime.block_on(async move { let table_id = ctx.alloc_table_id(); @@ -1020,7 +1021,7 @@ mod tests { #[test] fn test_manifest_snapshot_one_table_massive_logs() { - let ctx = TestContext::new("snapshot_one_table_massive_logs", SchemaId::new(0).unwrap()); + let ctx = TestContext::new("snapshot_one_table_massive_logs", SchemaId::from_u32(0)); let runtime = ctx.runtime.clone(); runtime.block_on(async move { let table_id = ctx.alloc_table_id(); @@ -1207,7 +1208,7 @@ mod tests { fn test_no_snapshot_logs_merge() { let ctx = Arc::new(TestContext::new( "snapshot_merge_no_snapshot", - SchemaId::new(0).unwrap(), + SchemaId::from_u32(0), )); let table_id = ctx.alloc_table_id(); let logs: Vec<(&str, MetaUpdateLogEntry)> = vec![ @@ -1262,7 +1263,7 @@ mod tests { fn test_multiple_snapshot_merge_normal() { let ctx = Arc::new(TestContext::new( "snapshot_merge_normal", - SchemaId::new(0).unwrap(), + SchemaId::from_u32(0), )); let table_id = ctx.alloc_table_id(); let logs: Vec<(&str, MetaUpdateLogEntry)> = vec![ @@ -1349,7 +1350,7 @@ mod tests { fn test_multiple_snapshot_merge_interleaved_snapshot() { let ctx = Arc::new(TestContext::new( "snapshot_merge_interleaved", - SchemaId::new(0).unwrap(), + SchemaId::from_u32(0), )); let table_id = ctx.alloc_table_id(); let logs: Vec<(&str, MetaUpdateLogEntry)> = vec![ @@ -1439,7 +1440,7 @@ mod tests { fn test_multiple_snapshot_merge_sneaked_update() { let ctx = Arc::new(TestContext::new( "snapshot_merge_sneaked_update", - SchemaId::new(0).unwrap(), + SchemaId::from_u32(0), )); let table_id = ctx.alloc_table_id(); let logs: Vec<(&str, MetaUpdateLogEntry)> = vec![ @@ -1502,7 +1503,7 @@ mod tests { fn test_multiple_snapshot_drop_table() { let ctx = Arc::new(TestContext::new( "snapshot_drop_table", - SchemaId::new(0).unwrap(), + SchemaId::from_u32(0), )); let table_id = ctx.alloc_table_id(); let logs: Vec<(&str, MetaUpdateLogEntry)> = vec![ diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index c71d9d1440..74e88dc96b 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -621,7 +621,7 @@ pub mod tests { let create_request = CreateTableRequest { catalog_name: "test_catalog".to_string(), schema_name: "public".to_string(), - schema_id: SchemaId::new(DEFAULT_SPACE_ID).unwrap(), + schema_id: SchemaId::from_u32(DEFAULT_SPACE_ID), table_id: self.table_id, table_name: self.table_name, table_schema, diff --git a/analytic_engine/src/tests/table.rs b/analytic_engine/src/tests/table.rs index 13c706fbb2..63d88b8d51 100644 --- a/analytic_engine/src/tests/table.rs +++ b/analytic_engine/src/tests/table.rs @@ -25,7 +25,7 @@ use table_engine::{ use crate::{table_options, tests::row_util}; pub fn new_table_id(schema_id: u16, table_seq: u32) -> TableId { - TableId::new(SchemaId::from(schema_id), TableSeq::from(table_seq)) + TableId::with_seq(SchemaId::from(schema_id), TableSeq::from(table_seq)).unwrap() } pub type RowTuple<'a> = (&'a str, Timestamp, &'a str, f64, f64, &'a str); @@ -298,7 +298,7 @@ impl Default for Builder { create_request: CreateTableRequest { catalog_name: "ceresdb".to_string(), schema_name: "public".to_string(), - schema_id: SchemaId::new(2).unwrap(), + schema_id: SchemaId::from_u32(2), table_id: new_table_id(2, 1), table_name: "test_table".to_string(), table_schema: FixedSchemaTable::default_schema(), diff --git a/analytic_engine/src/tests/util.rs b/analytic_engine/src/tests/util.rs index ef48a8f4c1..f60c4d170c 100644 --- a/analytic_engine/src/tests/util.rs +++ b/analytic_engine/src/tests/util.rs @@ -374,7 +374,7 @@ impl TestEnv { runtimes: self.runtimes.clone(), builder: T::default(), engine: None, - schema_id: SchemaId::new(100).unwrap(), + schema_id: SchemaId::from_u32(100), last_table_seq: 1, name_to_tables: HashMap::new(), } diff --git a/catalog/src/manager.rs b/catalog/src/manager.rs index fb10637750..21a8b83816 100644 --- a/catalog/src/manager.rs +++ b/catalog/src/manager.rs @@ -2,6 +2,8 @@ //! Catalog manager +use std::sync::Arc; + use snafu::Snafu; use crate::{schema::NameRef, CatalogRef}; @@ -17,11 +19,17 @@ define_result!(Error); // TODO(yingwen): Maybe use async trait? // TODO(yingwen): Provide a context -pub trait Manager: Clone + Send + Sync { +pub trait Manager: Send + Sync { /// Get the default catalog name + /// + /// Default catalog is ensured created because no method to create catalog + /// is provided. fn default_catalog_name(&self) -> NameRef; /// Get the default schema name + /// + /// Default schema may be not created by the implementation and the caller + /// may need to create that by itself. fn default_schema_name(&self) -> NameRef; /// Find the catalog by name @@ -30,3 +38,5 @@ pub trait Manager: Clone + Send + Sync { /// All catalogs fn all_catalogs(&self) -> Result>; } + +pub type ManagerRef = Arc; diff --git a/catalog_impls/src/lib.rs b/catalog_impls/src/lib.rs index ed2b0b77a2..bb4adc0af7 100644 --- a/catalog_impls/src/lib.rs +++ b/catalog_impls/src/lib.rs @@ -3,7 +3,12 @@ use std::sync::Arc; use async_trait::async_trait; -use catalog::{consts::SYSTEM_CATALOG, manager::Manager, schema::NameRef, CatalogRef}; +use catalog::{ + consts::SYSTEM_CATALOG, + manager::{Manager, ManagerRef}, + schema::NameRef, + CatalogRef, +}; use system_catalog::{tables::Tables, SystemTableAdapter}; use table_engine::table::{SchemaId, TableId}; @@ -15,13 +20,13 @@ pub mod volatile; /// CatalogManagerImpl is a wrapper for system and user tables #[derive(Clone)] -pub struct CatalogManagerImpl { +pub struct CatalogManagerImpl { system_tables: SystemTables, - user_catalog_manager: M, + user_catalog_manager: ManagerRef, } -impl CatalogManagerImpl { - pub fn new(manager: M) -> Self { +impl CatalogManagerImpl { + pub fn new(manager: ManagerRef) -> Self { let mut system_tables_builder = SystemTablesBuilder::new(); system_tables_builder = system_tables_builder .insert_table(SystemTableAdapter::new(Tables::new(manager.clone()))); @@ -32,7 +37,7 @@ impl CatalogManagerImpl { } } -impl Manager for CatalogManagerImpl { +impl Manager for CatalogManagerImpl { fn default_catalog_name(&self) -> NameRef { self.user_catalog_manager.default_catalog_name() } @@ -56,25 +61,25 @@ impl Manager for CatalogManagerImpl { #[async_trait] pub trait SchemaIdAlloc: Send + Sync { type Error: std::error::Error + Send + Sync + 'static; - async fn alloc_schema_id( + async fn alloc_schema_id<'a>( &self, - schema_name: NameRef, + schema_name: NameRef<'a>, ) -> std::result::Result; } #[async_trait] pub trait TableIdAlloc: Send + Sync { type Error: std::error::Error + Send + Sync + 'static; - async fn alloc_table_id( + async fn alloc_table_id<'a>( &self, - schema_name: NameRef, - table_name: NameRef, + schema_name: NameRef<'a>, + table_name: NameRef<'a>, ) -> std::result::Result; - async fn invalidate_table_id( + async fn invalidate_table_id<'a>( &self, - schema_name: NameRef, - table_name: NameRef, + schema_name: NameRef<'a>, + table_name: NameRef<'a>, table_id: TableId, ) -> std::result::Result<(), Self::Error>; } diff --git a/catalog_impls/src/table_based.rs b/catalog_impls/src/table_based.rs index 714cae455a..f549667839 100644 --- a/catalog_impls/src/table_based.rs +++ b/catalog_impls/src/table_based.rs @@ -12,9 +12,9 @@ use catalog::{ self, consts, manager::{self, Manager}, schema::{ - self, CatalogMismatch, CloseOptions, CloseTableRequest, CreateExistTable, CreateOptions, - CreateTable, CreateTableRequest, DropOptions, DropTable, DropTableRequest, NameRef, - OpenOptions, OpenTableRequest, Schema, SchemaMismatch, SchemaRef, TooManyTable, + self, AllocateTableId, CatalogMismatch, CloseOptions, CloseTableRequest, CreateExistTable, + CreateOptions, CreateTable, CreateTableRequest, DropOptions, DropTable, DropTableRequest, + NameRef, OpenOptions, OpenTableRequest, Schema, SchemaMismatch, SchemaRef, TooManyTable, WriteTableMeta, }, Catalog, CatalogRef, @@ -29,7 +29,8 @@ use system_catalog::sys_catalog_table::{ use table_engine::{ engine::{TableEngineRef, TableState}, table::{ - ReadOptions, SchemaId, SchemaIdGenerator, TableId, TableInfo, TableRef, TableSeqGenerator, + ReadOptions, SchemaId, SchemaIdGenerator, TableId, TableInfo, TableRef, TableSeq, + TableSeqGenerator, }, }; use tokio::sync::Mutex; @@ -70,14 +71,31 @@ pub enum Error { schema: String, source: system_catalog::sys_catalog_table::Error, }, + + #[snafu(display( + "Invalid schema id and table seq, schema_id:{:?}, table_seq:{:?}.\nBacktrace:\n{}", + schema_id, + table_seq, + backtrace, + ))] + InvalidSchemaIdAndTableSeq { + schema_id: SchemaId, + table_seq: TableSeq, + backtrace: Backtrace, + }, } define_result!(Error); /// Table based catalog manager -#[derive(Clone)] pub struct TableBasedManager { - inner: Arc, + /// Sys catalog table + catalog_table: Arc, + catalogs: CatalogMap, + /// Table engine proxy + engine_proxy: TableEngineRef, + /// Global schema id generator, Each schema has a unique schema id. + schema_id_generator: Arc, } impl Manager for TableBasedManager { @@ -90,17 +108,12 @@ impl Manager for TableBasedManager { } fn catalog_by_name(&self, name: NameRef) -> manager::Result> { - let catalog = self.inner.catalogs.get(name).cloned().map(|v| v as _); + let catalog = self.catalogs.get(name).cloned().map(|v| v as _); Ok(catalog) } fn all_catalogs(&self) -> manager::Result> { - Ok(self - .inner - .catalogs - .iter() - .map(|(_, v)| v.clone() as _) - .collect()) + Ok(self.catalogs.iter().map(|(_, v)| v.clone() as _).collect()) } } @@ -114,40 +127,23 @@ impl TableBasedManager { .await .context(BuildSysCatalog)?; - let mut inner = Inner { + let mut manager = Self { catalog_table: Arc::new(catalog_table), catalogs: HashMap::new(), engine_proxy, schema_id_generator: Arc::new(SchemaIdGenerator::default()), }; - inner.init().await?; + manager.init().await?; - Ok(Self { - inner: Arc::new(inner), - }) + Ok(manager) } #[cfg(test)] pub fn get_engine_proxy(&self) -> TableEngineRef { - self.inner.engine_proxy.clone() + self.engine_proxy.clone() } -} - -type CatalogMap = HashMap>; - -/// Inner state of TableBasedManager -struct Inner { - /// Sys catalog table - catalog_table: Arc, - catalogs: CatalogMap, - /// Table engine proxy - engine_proxy: TableEngineRef, - /// Global schema id generator, Each schema has a unique schema id. - schema_id_generator: Arc, -} -impl Inner { /// Load all data from sys catalog table. async fn init(&mut self) -> Result<()> { // The system catalog and schema in it is not persisted, so we add it manually. @@ -307,6 +303,8 @@ impl Inner { } } +type CatalogMap = HashMap>; + /// Sys catalog visitor implementation, used to load catalog info struct VisitorImpl<'a> { catalog_table: Arc, @@ -385,7 +383,7 @@ impl<'a> Visitor for VisitorImpl<'a> { // Update max table sequence of the schema. let table_id = table_info.table_id; - let table_seq = table_id.table_seq(); + let table_seq = TableSeq::from(table_id); if table_seq.as_u64() >= schema.table_seq_generator.last_table_seq_u64() { schema.table_seq_generator.set_last_table_seq(table_seq); } @@ -631,7 +629,7 @@ impl SchemaImpl { .cloned() } - async fn alloc_table_id<'a>(&'a self, name: NameRef<'a>) -> schema::Result { + async fn alloc_table_id<'a>(&self, name: NameRef<'a>) -> schema::Result { let table_seq = self .table_seq_generator .alloc_table_seq() @@ -640,7 +638,16 @@ impl SchemaImpl { table: name, })?; - Ok(TableId::new(self.schema_id, table_seq)) + TableId::with_seq(self.schema_id, table_seq) + .context(InvalidSchemaIdAndTableSeq { + schema_id: self.schema_id, + table_seq, + }) + .map_err(|e| Box::new(e) as _) + .context(AllocateTableId { + schema: &self.schema_name, + table: name, + }) } } @@ -893,9 +900,7 @@ mod tests { // Create catalog manager, use analytic table as backend TableBasedManager::new(analytic.clone(), engine_proxy.clone()) .await - .unwrap_or_else(|e| { - panic!("Failed to create catalog manager, err:{}", e); - }) + .expect("Failed to create catalog manager") } async fn build_default_schema_with_catalog(catalog_manager: &TableBasedManager) -> SchemaRef { diff --git a/catalog_impls/src/volatile.rs b/catalog_impls/src/volatile.rs index 82829e2387..5e4e379168 100644 --- a/catalog_impls/src/volatile.rs +++ b/catalog_impls/src/volatile.rs @@ -29,18 +29,9 @@ use crate::{SchemaIdAlloc, TableIdAlloc}; /// ManagerImpl manages multiple volatile catalogs. pub struct ManagerImpl { - inner: Arc>, -} - -// Note: The way deriving [`Clone`] to make [`ManagerImpl`] clonable doesn't -// work for the type paramenters `S` and `T` without [`Clone`] trait bound so we -// need a manual implementation. -impl Clone for ManagerImpl { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - } - } + catalogs: HashMap>>, + schema_id_alloc: Arc, + table_id_alloc: Arc, } impl ManagerImpl @@ -49,17 +40,15 @@ where T: TableIdAlloc + 'static, { pub async fn new(schema_id_alloc: S, table_id_alloc: T) -> Self { - let mut inner = ManagerImplInner { + let mut manager = ManagerImpl { catalogs: HashMap::new(), table_id_alloc: Arc::new(table_id_alloc), schema_id_alloc: Arc::new(schema_id_alloc), }; - inner.maybe_create_default_catalog().await; + manager.maybe_create_default_catalog().await; - Self { - inner: Arc::new(inner), - } + manager } } @@ -77,17 +66,12 @@ where } fn catalog_by_name(&self, name: NameRef) -> manager::Result> { - let catalog = self - .inner - .catalogs - .get(name) - .map(|v| v.clone() as CatalogRef); + let catalog = self.catalogs.get(name).map(|v| v.clone() as CatalogRef); Ok(catalog) } fn all_catalogs(&self) -> manager::Result> { Ok(self - .inner .catalogs .iter() .map(|(_, v)| v.clone() as CatalogRef) @@ -95,46 +79,18 @@ where } } -struct ManagerImplInner { - catalogs: HashMap>>, - schema_id_alloc: Arc, - table_id_alloc: Arc, -} - -impl ManagerImplInner +impl ManagerImpl where S: SchemaIdAlloc, T: TableIdAlloc + 'static, { async fn maybe_create_default_catalog(&mut self) { // Try to get default catalog, create it if not exists. - let catalog = match self.catalogs.get(consts::DEFAULT_CATALOG) { - Some(v) => v.clone(), - None => { - // Default catalog is not exists, create and store it. - self.create_catalog(consts::DEFAULT_CATALOG.to_string()) - .await - } + if self.catalogs.get(consts::DEFAULT_CATALOG).is_none() { + // Default catalog is not exists, create and store it. + self.create_catalog(consts::DEFAULT_CATALOG.to_string()) + .await; }; - - // Create default schema if not exists. - if !catalog.schema_exists(consts::DEFAULT_SCHEMA) { - // Allocate schema id. - let schema_id = self - .schema_id_alloc - .alloc_schema_id(consts::DEFAULT_SCHEMA) - .await - .expect("Schema id of default catalog should be valid"); - - self.add_schema_to_catalog( - consts::DEFAULT_CATALOG.to_string(), - consts::DEFAULT_SCHEMA.to_string(), - schema_id, - self.table_id_alloc.clone(), - &*catalog, - ) - .await; - } } async fn create_catalog(&mut self, catalog_name: String) -> Arc> { @@ -149,26 +105,6 @@ where catalog } - - async fn add_schema_to_catalog( - &mut self, - catalog_name: String, - schema_name: String, - schema_id: SchemaId, - table_id_alloc: Arc, - catalog: &CatalogImpl, - ) -> Arc> { - let schema = Arc::new(SchemaImpl::new( - catalog_name, - schema_name, - schema_id, - table_id_alloc, - )); - - catalog.add_schema(schema.clone()); - - schema - } } /// A volatile implementation for [`Catalog`]. @@ -185,21 +121,6 @@ struct CatalogImpl { table_id_alloc: Arc, } -impl CatalogImpl -where - T: TableIdAlloc + 'static, -{ - fn add_schema(&self, schema: Arc>) { - let mut schemas = self.schemas.write().unwrap(); - schemas.insert(schema.name().to_string(), schema); - } - - fn schema_exists(&self, schema_name: &str) -> bool { - let schemas = self.schemas.read().unwrap(); - schemas.get(schema_name).is_some() - } -} - #[async_trait] impl Catalog for CatalogImpl where diff --git a/ceresdbproto_deps/Cargo.toml b/ceresdbproto_deps/Cargo.toml index ad2a227ea8..b4ec79bd36 100644 --- a/ceresdbproto_deps/Cargo.toml +++ b/ceresdbproto_deps/Cargo.toml @@ -10,4 +10,4 @@ workspace = true [dependencies.ceresdbproto] git = "https://github.com/CeresDB/ceresdbproto.git" -rev = "ec5404e0f60fc748d3e8b68a592e2926ac1e84de" \ No newline at end of file +rev = "12a0ee8e45883056f275c707518fa9ed508676d1" \ No newline at end of file diff --git a/cluster/src/cluster_impl.rs b/cluster/src/cluster_impl.rs index 766abcb47d..cb75b2aa1e 100644 --- a/cluster/src/cluster_impl.rs +++ b/cluster/src/cluster_impl.rs @@ -88,11 +88,11 @@ impl ClusterImpl { // Register node every 2/3 lease fn heartbeat_interval(&self) -> Duration { - Duration::from_secs(self.config.meta_client_config.lease.as_millis() * 2 / 3) + Duration::from_secs(self.config.meta_client.lease.as_millis() * 2 / 3) } fn error_wait_lease(&self) -> Duration { - self.config.meta_client_config.lease.0 / 2 + self.config.meta_client.lease.0 / 2 } } diff --git a/cluster/src/config.rs b/cluster/src/config.rs index fdff80dcca..73345a22d2 100644 --- a/cluster/src/config.rs +++ b/cluster/src/config.rs @@ -1,18 +1,12 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use meta_client_v2::meta_impl::MetaClientConfig; +use meta_client_v2::{meta_impl::MetaClientConfig, types::NodeMetaInfo}; use serde_derive::Deserialize; #[derive(Default, Clone, Deserialize, Debug)] +#[serde(default)] pub struct ClusterConfig { - /// Local ip address of this node, used as endpoint ip in meta. - pub node: String, - /// Grpc port of this node, also used as endpoint port in meta. - pub port: u16, - pub zone: String, - pub idc: String, - pub binary_version: String, + pub node: NodeMetaInfo, pub cmd_channel_buffer_size: usize, - - pub meta_client_config: MetaClientConfig, + pub meta_client: MetaClientConfig, } diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs index ae9132ee38..7ba4624714 100644 --- a/cluster/src/lib.rs +++ b/cluster/src/lib.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use common_util::define_result; pub use meta_client_v2::types::{ AllocSchemaIdRequest, AllocSchemaIdResponse, AllocTableIdRequest, AllocTableIdResponse, - DropTableRequest, DropTableResponse, GetTablesRequest, + DropTableRequest, GetTablesRequest, }; use meta_client_v2::types::{ShardId, TableId}; use snafu::{Backtrace, Snafu}; diff --git a/interpreters/src/create.rs b/interpreters/src/create.rs index a237793b3a..21e9528caf 100644 --- a/interpreters/src/create.rs +++ b/interpreters/src/create.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use catalog::{ - manager::Manager, + manager::ManagerRef, schema::{CreateOptions, CreateTableRequest}, }; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; @@ -50,18 +50,18 @@ pub enum Error { define_result!(Error); /// Create interpreter -pub struct CreateInterpreter { +pub struct CreateInterpreter { ctx: Context, plan: CreateTablePlan, - catalog_manager: C, + catalog_manager: ManagerRef, table_engine: TableEngineRef, } -impl CreateInterpreter { +impl CreateInterpreter { pub fn create( ctx: Context, plan: CreateTablePlan, - catalog_manager: C, + catalog_manager: ManagerRef, table_engine: TableEngineRef, ) -> InterpreterPtr { Box::new(Self { @@ -73,7 +73,7 @@ impl CreateInterpreter { } } -impl CreateInterpreter { +impl CreateInterpreter { async fn execute_create(self: Box) -> Result { let default_catalog = self.ctx.default_catalog(); let catalog = self @@ -132,7 +132,7 @@ impl CreateInterpreter { // TODO(yingwen): Wrap a method that returns self::Result, simplify some code to // converting self::Error to super::Error #[async_trait] -impl Interpreter for CreateInterpreter { +impl Interpreter for CreateInterpreter { async fn execute(self: Box) -> InterpreterResult { self.execute_create().await.context(Create) } diff --git a/interpreters/src/drop.rs b/interpreters/src/drop.rs index 03ff2217b6..d7c97c34f8 100644 --- a/interpreters/src/drop.rs +++ b/interpreters/src/drop.rs @@ -3,7 +3,7 @@ //! Interpreter for drop statements use async_trait::async_trait; -use catalog::{manager::Manager, schema::DropOptions}; +use catalog::{manager::ManagerRef, schema::DropOptions}; use log::warn; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use sql::plan::DropTablePlan; @@ -50,18 +50,18 @@ pub enum Error { define_result!(Error); /// Drop interpreter -pub struct DropInterpreter { +pub struct DropInterpreter { ctx: Context, plan: DropTablePlan, - catalog_manager: C, + catalog_manager: ManagerRef, table_engine: TableEngineRef, } -impl DropInterpreter { +impl DropInterpreter { pub fn create( ctx: Context, plan: DropTablePlan, - catalog_manager: C, + catalog_manager: ManagerRef, table_engine: TableEngineRef, ) -> InterpreterPtr { Box::new(Self { @@ -73,7 +73,7 @@ impl DropInterpreter { } } -impl DropInterpreter { +impl DropInterpreter { async fn execute_drop(self: Box) -> Result { let default_catalog = self.ctx.default_catalog(); let catalog = self @@ -124,7 +124,7 @@ impl DropInterpreter { // TODO(yingwen): Wrap a method that returns self::Result, simplify some code to // converting self::Error to super::Error #[async_trait] -impl Interpreter for DropInterpreter { +impl Interpreter for DropInterpreter { async fn execute(self: Box) -> InterpreterResult { self.execute_drop().await.context(Drop) } diff --git a/interpreters/src/factory.rs b/interpreters/src/factory.rs index c83d39198b..e0f6b726a2 100644 --- a/interpreters/src/factory.rs +++ b/interpreters/src/factory.rs @@ -2,7 +2,7 @@ //! Interpreter factory -use catalog::manager::Manager as CatalogManager; +use catalog::manager::ManagerRef; use query_engine::executor::Executor; use sql::plan::Plan; use table_engine::engine::TableEngineRef; @@ -15,14 +15,18 @@ use crate::{ }; /// A factory to create interpreters -pub struct Factory { +pub struct Factory { query_executor: Q, - catalog_manager: C, + catalog_manager: ManagerRef, table_engine: TableEngineRef, } -impl Factory { - pub fn new(query_executor: Q, catalog_manager: C, table_engine: TableEngineRef) -> Self { +impl Factory { + pub fn new( + query_executor: Q, + catalog_manager: ManagerRef, + table_engine: TableEngineRef, + ) -> Self { Self { query_executor, catalog_manager, diff --git a/interpreters/src/show.rs b/interpreters/src/show.rs index de5a6eb745..e7b4322500 100644 --- a/interpreters/src/show.rs +++ b/interpreters/src/show.rs @@ -8,7 +8,7 @@ use arrow_deps::arrow::{ record_batch::RecordBatch, }; use async_trait::async_trait; -use catalog::{manager::Manager, schema::Schema, Catalog}; +use catalog::{manager::ManagerRef, schema::Schema, Catalog}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use sql::{ ast::ShowCreateObject, @@ -83,14 +83,14 @@ impl From for Error { } } -pub struct ShowInterpreter { +pub struct ShowInterpreter { ctx: Context, plan: ShowPlan, - catalog_manager: C, + catalog_manager: ManagerRef, } -impl ShowInterpreter { - pub fn create(ctx: Context, plan: ShowPlan, catalog_manager: C) -> InterpreterPtr { +impl ShowInterpreter { + pub fn create(ctx: Context, plan: ShowPlan, catalog_manager: ManagerRef) -> InterpreterPtr { Box::new(Self { ctx, plan, @@ -99,13 +99,13 @@ impl ShowInterpreter { } } -impl ShowInterpreter { +impl ShowInterpreter { fn show_create(plan: ShowCreatePlan) -> Result { let show_create = ShowCreateInterpreter::create(plan); show_create.execute_show_create() } - fn show_tables(ctx: Context, catalog_manager: C) -> Result { + fn show_tables(ctx: Context, catalog_manager: ManagerRef) -> Result { let schema = get_default_schema(&ctx, &catalog_manager)?; let tables_names = schema @@ -131,7 +131,7 @@ impl ShowInterpreter { Ok(Output::Records(vec![record_batch])) } - fn show_databases(ctx: Context, catalog_manager: C) -> Result { + fn show_databases(ctx: Context, catalog_manager: ManagerRef) -> Result { let catalog = get_default_catalog(&ctx, &catalog_manager)?; let schema_names = catalog .all_schemas() @@ -158,7 +158,7 @@ impl ShowInterpreter { } #[async_trait] -impl Interpreter for ShowInterpreter { +impl Interpreter for ShowInterpreter { async fn execute(self: Box) -> InterpreterResult { match self.plan { ShowPlan::ShowCreatePlan(t) => Self::show_create(t).context(ShowCreateTable), @@ -172,9 +172,9 @@ impl Interpreter for ShowInterpreter { } } -fn get_default_catalog( +fn get_default_catalog( ctx: &Context, - catalog_manager: &C, + catalog_manager: &ManagerRef, ) -> Result> { let default_catalog = ctx.default_catalog(); let catalog = catalog_manager @@ -188,9 +188,9 @@ fn get_default_catalog( Ok(catalog) } -fn get_default_schema( +fn get_default_schema( ctx: &Context, - catalog_manager: &C, + catalog_manager: &ManagerRef, ) -> Result> { let catalog = get_default_catalog(ctx, catalog_manager)?; diff --git a/interpreters/src/tests.rs b/interpreters/src/tests.rs index 380b298184..f2127b6e89 100644 --- a/interpreters/src/tests.rs +++ b/interpreters/src/tests.rs @@ -1,5 +1,7 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +use std::sync::Arc; + use analytic_engine::{ setup::{EngineBuilder, RocksEngineBuilder}, tests::util::TestEnv, @@ -23,9 +25,7 @@ async fn build_catalog_manager(analytic: TableEngineRef) -> TableBasedManager { // Create catalog manager, use analytic table as backend TableBasedManager::new(analytic.clone(), analytic) .await - .unwrap_or_else(|e| { - panic!("Failed to create catalog manager, err:{}", e); - }) + .expect("Failed to create catalog manager") } fn sql_to_plan(meta_provider: &M, sql: &str) -> Plan { @@ -56,8 +56,8 @@ impl Env where M: MetaProvider, { - async fn build_factory(&self) -> Factory { - let catalog_manager = build_catalog_manager(self.engine()).await; + async fn build_factory(&self) -> Factory { + let catalog_manager = Arc::new(build_catalog_manager(self.engine()).await); Factory::new(ExecutorImpl::new(), catalog_manager, self.engine()) } diff --git a/meta_client/src/lib.rs b/meta_client/src/lib.rs index 155d31ff09..3878293feb 100644 --- a/meta_client/src/lib.rs +++ b/meta_client/src/lib.rs @@ -127,7 +127,7 @@ impl From for SchemaConfig { } } -#[derive(Debug, Default, Clone, Deserialize)] +#[derive(Clone, Debug, Default, Deserialize)] pub struct ClusterView { pub schema_shards: HashMap, pub schema_configs: HashMap, @@ -135,7 +135,7 @@ pub struct ClusterView { pub type ClusterViewRef = Arc; -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] #[serde(default)] pub struct MetaClientConfig { pub cluster: String, diff --git a/meta_client_v2/src/lib.rs b/meta_client_v2/src/lib.rs index f33c8cb25f..bbbfe671fa 100644 --- a/meta_client_v2/src/lib.rs +++ b/meta_client_v2/src/lib.rs @@ -7,8 +7,7 @@ use common_util::define_result; use snafu::{Backtrace, Snafu}; use types::{ ActionCmd, AllocSchemaIdRequest, AllocSchemaIdResponse, AllocTableIdRequest, - AllocTableIdResponse, DropTableRequest, DropTableResponse, GetTablesRequest, GetTablesResponse, - ResponseHeader, ShardInfo, + AllocTableIdResponse, DropTableRequest, GetTablesRequest, GetTablesResponse, ShardInfo, }; pub mod meta_impl; @@ -69,9 +68,15 @@ pub enum Error { source: Box, }, - #[snafu(display("Meta rpc error, resp header:{:?}.\nBacktrace:\n{}", header, backtrace))] + #[snafu(display( + "Meta rpc error, resp code:{}, msg:{}.\nBacktrace:\n{}", + code, + msg, + backtrace + ))] MetaRpc { - header: ResponseHeader, + code: u32, + msg: String, backtrace: Backtrace, }, @@ -105,7 +110,7 @@ pub trait EventHandler { /// MetaClient is the abstraction of client used to communicate with CeresMeta /// cluster. #[async_trait] -pub trait MetaClient { +pub trait MetaClient: Send + Sync { /// Start the meta client and the events will occur afterwards. async fn start(&self) -> Result<()>; /// Stop the meta client and release all the resources. @@ -120,9 +125,11 @@ pub trait MetaClient { async fn alloc_table_id(&self, req: AllocTableIdRequest) -> Result; - async fn drop_table(&self, req: DropTableRequest) -> Result; + async fn drop_table(&self, req: DropTableRequest) -> Result<()>; async fn get_tables(&self, req: GetTablesRequest) -> Result; async fn send_heartbeat(&self, req: Vec) -> Result<()>; } + +pub type MetaClientRef = Arc; diff --git a/meta_client_v2/src/meta_impl.rs b/meta_client_v2/src/meta_impl.rs index 8adb2dc17a..f6db434822 100644 --- a/meta_client_v2/src/meta_impl.rs +++ b/meta_client_v2/src/meta_impl.rs @@ -6,7 +6,9 @@ use std::{ }; use async_trait::async_trait; -use ceresdbproto_deps::ceresdbproto::{meta_service, meta_service_grpc::CeresmetaRpcServiceClient}; +use ceresdbproto_deps::ceresdbproto::{ + common::ResponseHeader, meta_service, meta_service_grpc::CeresmetaRpcServiceClient, +}; use common_util::{ config::ReadableDuration, runtime::{JoinHandle, Runtime}, @@ -29,9 +31,8 @@ use tokio::{ use crate::{ types::{ ActionCmd, AllocSchemaIdRequest, AllocSchemaIdResponse, AllocTableIdRequest, - AllocTableIdResponse, CreateTableCmd, DropTableCmd, DropTableRequest, DropTableResponse, - GetTablesRequest, GetTablesResponse, NodeHeartbeatResponse, NodeInfo, NodeMetaInfo, - RequestHeader, ResponseHeader, ShardInfo, + AllocTableIdResponse, CreateTableCmd, DropTableCmd, DropTableRequest, GetTablesRequest, + GetTablesResponse, NodeHeartbeatResponse, NodeInfo, NodeMetaInfo, RequestHeader, ShardInfo, }, EventHandler, EventHandlerRef, FailAllocSchemaId, FailAllocTableId, FailDropTable, FailGetGrpcClient, FailGetTables, FailHandleEvent, FailSendHeartbeat, FetchActionCmd, @@ -52,7 +53,7 @@ impl Default for MetaClientConfig { fn default() -> Self { Self { cluster_name: String::new(), - meta_addr: "http://127.0.0.1:8080".to_string(), + meta_addr: "127.0.0.1:8080".to_string(), lease: ReadableDuration::secs(10), timeout: ReadableDuration::secs(5), cq_count: 8, @@ -96,7 +97,7 @@ impl Inner { fn request_header(&self) -> RequestHeader { RequestHeader { - node: self.node_meta_info.node.to_string(), + node: self.node_meta_info.endpoint(), cluster_name: self.meta_config.cluster_name.clone(), } } @@ -227,11 +228,12 @@ impl Inner { resp, ); - let resp = NodeHeartbeatResponse::from(resp); - if let Err(e) = check_response_header(&resp.header) { + if let Err(e) = check_response_header(resp.get_header()) { error!("Fetch action cmd failed, err:{}", e); continue; } + + let resp = NodeHeartbeatResponse::from(resp); let event = match resp.action_cmd { Some(action_cmd) => action_cmd, None => { @@ -355,10 +357,8 @@ impl MetaClient for MetaClientImpl { pb_req, pb_resp ); - let resp = AllocSchemaIdResponse::from(pb_resp); - - check_response_header(&resp.header)?; - Ok(resp) + check_response_header(pb_resp.get_header())?; + Ok(AllocSchemaIdResponse::from(pb_resp)) } async fn alloc_table_id(&self, req: AllocTableIdRequest) -> Result { @@ -381,8 +381,8 @@ impl MetaClient for MetaClientImpl { pb_req, pb_resp ); + check_response_header(pb_resp.get_header())?; let resp = AllocTableIdResponse::from(pb_resp); - check_response_header(&resp.header)?; let add_table_cmd = ActionCmd::CreateTableCmd(CreateTableCmd { schema_name: resp.schema_name.clone(), @@ -396,7 +396,7 @@ impl MetaClient for MetaClientImpl { Ok(resp) } - async fn drop_table(&self, req: DropTableRequest) -> Result { + async fn drop_table(&self, req: DropTableRequest) -> Result<()> { let grpc_client_guard = self.inner.grpc_client.read().await; let grpc_client = grpc_client_guard.as_ref().context(FailGetGrpcClient)?; @@ -416,16 +416,13 @@ impl MetaClient for MetaClientImpl { pb_req, pb_resp ); - let resp = DropTableResponse::from(pb_resp); - check_response_header(&resp.header)?; - + check_response_header(pb_resp.get_header())?; let drop_table_cmd = ActionCmd::DropTableCmd(DropTableCmd { schema_name: req.schema_name.clone(), name: req.name.clone(), }); - self.inner.handle_event(&drop_table_cmd).await?; - Ok(resp) + self.inner.handle_event(&drop_table_cmd).await } async fn get_tables(&self, req: GetTablesRequest) -> Result { @@ -449,10 +446,9 @@ impl MetaClient for MetaClientImpl { pb_req, pb_resp ); - let resp = GetTablesResponse::from(pb_resp); - check_response_header(&resp.header)?; + check_response_header(pb_resp.get_header())?; - Ok(resp) + Ok(GetTablesResponse::from(pb_resp)) } async fn send_heartbeat(&self, shards_info: Vec) -> Result<()> { @@ -496,11 +492,12 @@ impl MetaClient for MetaClientImpl { } fn check_response_header(header: &ResponseHeader) -> Result<()> { - if header.is_success() { + if header.code == 0 { Ok(()) } else { MetaRpc { - header: header.clone(), + code: header.code, + msg: header.get_error(), } .fail() } diff --git a/meta_client_v2/src/types.rs b/meta_client_v2/src/types.rs index 2be4b1fcc1..98aeb62979 100644 --- a/meta_client_v2/src/types.rs +++ b/meta_client_v2/src/types.rs @@ -4,7 +4,6 @@ use std::collections::HashMap; use ceresdbproto_deps::ceresdbproto::{ cluster::ShardRole as PbShardRole, - common::ResponseHeader as PbResponseHeader, meta_service::{self, NodeHeartbeatResponse_oneof_cmd}, }; use common_util::config::ReadableDuration; @@ -20,19 +19,6 @@ pub struct RequestHeader { pub cluster_name: String, } -#[derive(Debug, Clone)] -pub struct ResponseHeader { - pub code: u32, - pub err_msg: String, -} - -impl ResponseHeader { - #[inline] - pub fn is_success(&self) -> bool { - self.code == 0 - } -} - #[derive(Debug)] pub struct AllocSchemaIdRequest { pub name: String, @@ -40,8 +26,6 @@ pub struct AllocSchemaIdRequest { #[derive(Debug)] pub struct AllocSchemaIdResponse { - pub header: ResponseHeader, - pub name: String, pub id: SchemaId, } @@ -54,8 +38,6 @@ pub struct AllocTableIdRequest { #[derive(Debug)] pub struct AllocTableIdResponse { - pub header: ResponseHeader, - pub schema_name: String, pub name: String, pub shard_id: ShardId, @@ -70,11 +52,6 @@ pub struct DropTableRequest { pub id: TableId, } -#[derive(Debug)] -pub struct DropTableResponse { - pub header: ResponseHeader, -} - #[derive(Clone, Debug)] pub struct GetTablesRequest { pub shard_ids: Vec, @@ -82,8 +59,6 @@ pub struct GetTablesRequest { #[derive(Clone, Debug)] pub struct GetTablesResponse { - pub header: ResponseHeader, - pub tables_map: HashMap, } @@ -101,26 +76,22 @@ pub struct ShardTables { pub tables: Vec, } -#[derive(Debug, Clone, Default, Deserialize)] -pub struct Node { +#[derive(Debug, Default, Clone, Deserialize)] +#[serde(default)] +pub struct NodeMetaInfo { pub addr: String, pub port: u16, + pub zone: String, + pub idc: String, + pub binary_version: String, } -impl ToString for Node { - fn to_string(&self) -> String { +impl NodeMetaInfo { + pub fn endpoint(&self) -> String { format!("{}:{}", self.addr, self.port) } } -#[derive(Debug, Default, Clone, Deserialize)] -pub struct NodeMetaInfo { - pub node: Node, - pub zone: String, - pub idc: String, - pub binary_version: String, -} - #[derive(Debug, Clone)] pub struct NodeInfo { pub node_meta_info: NodeMetaInfo, @@ -129,8 +100,6 @@ pub struct NodeInfo { #[derive(Debug)] pub struct NodeHeartbeatResponse { - pub header: ResponseHeader, - pub timestamp: u64, pub action_cmd: Option, } @@ -209,7 +178,7 @@ impl Default for MetaClientConfig { fn default() -> Self { Self { cluster_name: String::new(), - meta_addr: "http://127.0.0.1:8080".to_string(), + meta_addr: "127.0.0.1:8080".to_string(), meta_members_url: "ceresmeta/members".to_string(), lease: ReadableDuration::secs(10), timeout: ReadableDuration::secs(5), @@ -221,7 +190,7 @@ impl Default for MetaClientConfig { impl From for meta_service::NodeInfo { fn from(node_info: NodeInfo) -> Self { let mut pb_node_info = meta_service::NodeInfo::new(); - pb_node_info.set_node(node_info.node_meta_info.node.to_string()); + pb_node_info.set_node(node_info.node_meta_info.endpoint()); pb_node_info.set_zone(node_info.node_meta_info.zone); pb_node_info.set_binary_version(node_info.node_meta_info.binary_version); pb_node_info.set_shardsInfo(protobuf::RepeatedField::from_vec( @@ -263,10 +232,9 @@ impl From for ShardRole { } impl From for NodeHeartbeatResponse { - fn from(mut pb: meta_service::NodeHeartbeatResponse) -> Self { + fn from(pb: meta_service::NodeHeartbeatResponse) -> Self { let timestamp = pb.get_timestamp(); NodeHeartbeatResponse { - header: pb.take_header().into(), timestamp, action_cmd: pb.cmd.map(|v| v.into()), } @@ -332,7 +300,6 @@ impl From for meta_service::GetTablesRequest { impl From for GetTablesResponse { fn from(mut pb: meta_service::GetTablesResponse) -> Self { Self { - header: pb.take_header().into(), tables_map: pb .take_tables_map() .into_iter() @@ -371,15 +338,6 @@ impl From for meta_service::RequestHeader { } } -impl From for ResponseHeader { - fn from(mut pb: PbResponseHeader) -> Self { - Self { - code: pb.get_code(), - err_msg: pb.take_error(), - } - } -} - impl From for meta_service::AllocSchemaIdRequest { fn from(req: AllocSchemaIdRequest) -> Self { let mut pb = meta_service::AllocSchemaIdRequest::new(); @@ -391,7 +349,6 @@ impl From for meta_service::AllocSchemaIdRequest { impl From for AllocSchemaIdResponse { fn from(mut pb: meta_service::AllocSchemaIdResponse) -> Self { Self { - header: pb.take_header().into(), name: pb.take_name(), id: pb.get_id(), } @@ -410,7 +367,6 @@ impl From for meta_service::AllocTableIdRequest { impl From for AllocTableIdResponse { fn from(mut pb: meta_service::AllocTableIdResponse) -> Self { Self { - header: pb.take_header().into(), schema_name: pb.take_schema_name(), name: pb.take_name(), shard_id: pb.get_shard_id(), @@ -429,11 +385,3 @@ impl From for meta_service::DropTableRequest { pb } } - -impl From for DropTableResponse { - fn from(mut pb: meta_service::DropTableResponse) -> Self { - Self { - header: pb.take_header().into(), - } - } -} diff --git a/server/Cargo.toml b/server/Cargo.toml index 8538edc027..c6ff1f47fc 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -14,7 +14,8 @@ arrow_deps = { path = "../arrow_deps" } async-trait = "0.1.53" avro-rs = "0.13" catalog = { path = "../catalog" } -ceresdbproto = { git = "https://github.com/CeresDB/ceresdbproto.git", rev= "2c10152d021cd5a26b9c870cdede6a0317adca3d" } +ceresdbproto_deps = { path = "../ceresdbproto_deps" } +cluster = { path = "../cluster" } common_types = { path = "../common_types" } common_util = { path = "../common_util" } df_operator = { path = "../df_operator" } diff --git a/server/src/config.rs b/server/src/config.rs index e60857410a..cc291804ef 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -3,12 +3,19 @@ //! Server configs use analytic_engine; +use cluster::config::ClusterConfig; use meta_client::MetaClientConfig; use serde_derive::Deserialize; use crate::router::RuleList; -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Copy, Deserialize)] +pub enum DeployMode { + Standalone, + Cluster, +} + +#[derive(Clone, Debug, Deserialize)] #[serde(default)] pub struct RuntimeConfig { // Runtime for reading data @@ -22,7 +29,7 @@ pub struct RuntimeConfig { } // TODO(yingwen): Split config into several sub configs. -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] #[serde(default)] pub struct Config { /// The address to listen. @@ -52,6 +59,8 @@ pub struct Config { // Analytic engine configs: pub analytic: analytic_engine::Config, + pub cluster: ClusterConfig, + pub deploy_mode: DeployMode, } impl Default for RuntimeConfig { @@ -88,6 +97,8 @@ impl Default for Config { }, route_rules: RuleList::default(), analytic: analytic_engine::Config::default(), + cluster: ClusterConfig::default(), + deploy_mode: DeployMode::Standalone, } } } diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index 724c2037ab..02c2e53906 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -9,8 +9,8 @@ use std::{ }; use async_trait::async_trait; -use catalog::{consts as catalogConst, manager::Manager as CatalogManager}; -use ceresdbproto::{ +use catalog::{consts as catalogConst, manager::ManagerRef}; +use ceresdbproto_deps::ceresdbproto::{ common::ResponseHeader, prometheus::{PrometheusQueryRequest, PrometheusQueryResponse}, storage::{ @@ -156,21 +156,21 @@ impl RequestHeader { } } -pub struct HandlerContext<'a, C, Q> { +pub struct HandlerContext<'a, Q> { #[allow(dead_code)] header: RequestHeader, router: RouterRef, - instance: InstanceRef, + instance: InstanceRef, catalog: String, schema: String, schema_config: Option<&'a SchemaConfig>, } -impl<'a, C: CatalogManager, Q> HandlerContext<'a, C, Q> { +impl<'a, Q> HandlerContext<'a, Q> { fn new( header: RequestHeader, router: Arc, - instance: InstanceRef, + instance: InstanceRef, cluster_view: &'a ClusterViewRef, ) -> Result { let default_catalog = instance.catalog_manager.default_catalog_name(); @@ -239,17 +239,17 @@ impl RpcServices { } } -pub struct Builder { +pub struct Builder { bind_addr: String, port: u16, meta_client_config: Option, env: Option>, runtimes: Option>, - instance: Option>, + instance: Option>, route_rules: RuleList, } -impl Builder { +impl Builder { pub fn new() -> Self { Self { bind_addr: String::from("0.0.0.0"), @@ -287,7 +287,7 @@ impl Builder { self } - pub fn instance(mut self, instance: InstanceRef) -> Self { + pub fn instance(mut self, instance: InstanceRef) -> Self { self.instance = Some(instance); self } @@ -298,7 +298,7 @@ impl Builder { } } -impl Builder { +impl Builder { pub fn build(self) -> Result { let meta_client_config = self.meta_client_config.context(MissingMetaClientConfig)?; let runtimes = self.runtimes.context(MissingRuntimes)?; @@ -334,12 +334,12 @@ impl Builder { } } -struct SchemaWatcher { - catalog_manager: C, +struct SchemaWatcher { + catalog_manager: ManagerRef, } #[async_trait] -impl MetaWatcher for SchemaWatcher { +impl MetaWatcher for SchemaWatcher { async fn on_change(&self, view: ClusterViewRef) -> meta_client::Result<()> { for schema in view.schema_shards.keys() { let default_catalog = catalogConst::DEFAULT_CATALOG; @@ -380,14 +380,14 @@ fn build_ok_header() -> ResponseHeader { header } -struct StorageServiceImpl { +struct StorageServiceImpl { router: Arc, - instance: InstanceRef, + instance: InstanceRef, runtimes: Arc, meta_client: Arc, } -impl Clone for StorageServiceImpl { +impl Clone for StorageServiceImpl { fn clone(&self) -> Self { Self { router: self.router.clone(), @@ -490,9 +490,7 @@ macro_rules! handle_request { }; } -impl StorageService - for StorageServiceImpl -{ +impl StorageService for StorageServiceImpl { handle_request!(route, handle_route, RouteRequest, RouteResponse); handle_request!(write, handle_write, WriteRequest, WriteResponse); @@ -509,8 +507,8 @@ impl StorageService fn stream_write( &mut self, ctx: RpcContext<'_>, - mut stream_req: RequestStream, - sink: ClientStreamingSink, + mut stream_req: RequestStream, + sink: ClientStreamingSink, ) { let begin_instant = Instant::now(); let router = self.router.clone(); @@ -686,11 +684,8 @@ impl StorageService /// Create CreateTablePlan from a write metric. // The caller must ENSURE that the HandlerContext's schema_config is not None. -pub fn write_metric_to_create_table_plan< - C: CatalogManager + 'static, - Q: QueryExecutor + 'static, ->( - ctx: &HandlerContext, +pub fn write_metric_to_create_table_plan( + ctx: &HandlerContext, write_metric: &WriteMetric, ) -> Result { let schema_config = ctx.schema_config.unwrap(); @@ -889,7 +884,9 @@ fn try_get_data_type_from_value(value: &Value_oneof_value) -> Result #[cfg(test)] mod tests { - use ceresdbproto::storage::{Field, FieldGroup, Tag, Value, WriteEntry, WriteMetric}; + use ceresdbproto_deps::ceresdbproto::storage::{ + Field, FieldGroup, Tag, Value, WriteEntry, WriteMetric, + }; use common_types::datum::DatumKind; use meta_client::SchemaConfig; diff --git a/server/src/grpc/prom_query.rs b/server/src/grpc/prom_query.rs index b7453a903a..a7732e54dd 100644 --- a/server/src/grpc/prom_query.rs +++ b/server/src/grpc/prom_query.rs @@ -5,8 +5,7 @@ use std::{ sync::Arc, }; -use catalog::manager::Manager as CatalogManager; -use ceresdbproto::{ +use ceresdbproto_deps::ceresdbproto::{ common::ResponseHeader, prometheus::{Label, PrometheusQueryRequest, PrometheusQueryResponse, Sample, TimeSeries}, }; @@ -37,12 +36,11 @@ fn is_table_not_found_error(e: &FrontendError) -> bool { if matches!(source, sql::promql::Error::TableNotFound { .. }))) } -pub async fn handle_query( - ctx: &HandlerContext<'_, C, Q>, +pub async fn handle_query( + ctx: &HandlerContext<'_, Q>, req: PrometheusQueryRequest, ) -> Result where - C: CatalogManager + 'static, Q: QueryExecutor + 'static, { let request_id = RequestId::next_id(); @@ -60,7 +58,7 @@ where // TODO(yingwen): Privilege check, cannot access data of other tenant // TODO(yingwen): Maybe move MetaProvider to instance let provider = CatalogMetaProvider { - manager: &instance.catalog_manager, + manager: instance.catalog_manager.clone(), default_catalog: ctx.catalog(), default_schema: ctx.tenant(), function_registry: &*instance.function_registry, diff --git a/server/src/grpc/query.rs b/server/src/grpc/query.rs index f6943a4311..3c58bba5e9 100644 --- a/server/src/grpc/query.rs +++ b/server/src/grpc/query.rs @@ -4,8 +4,7 @@ use std::time::Instant; -use catalog::manager::Manager as CatalogManager; -use ceresdbproto::{ +use ceresdbproto_deps::ceresdbproto::{ common::ResponseHeader, storage::{QueryRequest, QueryResponse, QueryResponse_SchemaType}, }; @@ -39,8 +38,8 @@ fn empty_ok_resp() -> QueryResponse { resp } -pub async fn handle_query( - ctx: &HandlerContext<'_, C, Q>, +pub async fn handle_query( + ctx: &HandlerContext<'_, Q>, req: QueryRequest, ) -> Result { let output_result = fetch_query_output(ctx, &req).await?; @@ -56,8 +55,8 @@ pub async fn handle_query( - ctx: &HandlerContext<'_, C, Q>, +pub async fn fetch_query_output( + ctx: &HandlerContext<'_, Q>, req: &QueryRequest, ) -> Result> { let request_id = RequestId::next_id(); @@ -76,7 +75,7 @@ pub async fn fetch_query_output( - ctx: &HandlerContext<'_, C, Q>, +pub async fn handle_route( + ctx: &HandlerContext<'_, Q>, req: RouteRequest, ) -> Result { handle_route_sync(ctx.router.clone(), req, ctx.tenant()) diff --git a/server/src/grpc/write.rs b/server/src/grpc/write.rs index 422796dc43..48b47dda39 100644 --- a/server/src/grpc/write.rs +++ b/server/src/grpc/write.rs @@ -4,8 +4,7 @@ use std::collections::HashMap; -use catalog::manager::Manager as CatalogManager; -use ceresdbproto::storage::{ +use ceresdbproto_deps::ceresdbproto::storage::{ Value_oneof_value, WriteEntry, WriteMetric, WriteRequest, WriteResponse, }; use common_types::{ @@ -28,8 +27,8 @@ use crate::{ grpc::{self, HandlerContext}, }; -pub(crate) async fn handle_write( - ctx: &HandlerContext<'_, C, Q>, +pub(crate) async fn handle_write( + ctx: &HandlerContext<'_, Q>, req: WriteRequest, ) -> Result { let request_id = RequestId::next_id(); @@ -105,8 +104,8 @@ pub(crate) async fn handle_write( - ctx: &HandlerContext<'_, C, Q>, +async fn write_request_to_insert_plan( + ctx: &HandlerContext<'_, Q>, mut write_request: WriteRequest, request_id: RequestId, ) -> Result> { @@ -144,8 +143,8 @@ async fn write_request_to_insert_plan( - ctx: &HandlerContext<'_, C, Q>, +fn try_get_table( + ctx: &HandlerContext<'_, Q>, table_name: &str, ) -> Result> { ctx.instance @@ -178,8 +177,8 @@ fn try_get_table( }) } -async fn create_table( - ctx: &HandlerContext<'_, C, Q>, +async fn create_table( + ctx: &HandlerContext<'_, Q>, write_metric: &WriteMetric, request_id: RequestId, ) -> Result<()> { @@ -432,7 +431,7 @@ fn convert_proto_value_to_datum( #[cfg(test)] mod test { - use ceresdbproto::storage::{Field, FieldGroup, Tag, Value}; + use ceresdbproto_deps::ceresdbproto::storage::{Field, FieldGroup, Tag, Value}; use common_types::{ column_schema::{self, ColumnSchema}, schema::Builder, diff --git a/server/src/handlers/admin.rs b/server/src/handlers/admin.rs index 1779e917c6..c78b4fd1a0 100644 --- a/server/src/handlers/admin.rs +++ b/server/src/handlers/admin.rs @@ -24,9 +24,9 @@ pub struct RejectResponse { read_reject_list: BTreeSet, } -pub async fn handle_reject( +pub async fn handle_reject( _ctx: RequestContext, - instance: InstanceRef, + instance: InstanceRef, request: RejectRequest, ) -> Result { match request.operation { diff --git a/server/src/handlers/sql.rs b/server/src/handlers/sql.rs index 9ac007a10f..a78b41f7d0 100644 --- a/server/src/handlers/sql.rs +++ b/server/src/handlers/sql.rs @@ -95,9 +95,9 @@ impl From for Request { } } -pub async fn handle_sql( +pub async fn handle_sql( ctx: RequestContext, - instance: InstanceRef, + instance: InstanceRef, request: Request, ) -> Result { let request_id = RequestId::next_id(); @@ -110,7 +110,7 @@ pub async fn handle_sql // TODO(yingwen): Privilege check, cannot access data of other tenant // TODO(yingwen): Maybe move MetaProvider to instance let provider = CatalogMetaProvider { - manager: &instance.catalog_manager, + manager: instance.catalog_manager.clone(), default_catalog: &ctx.catalog, default_schema: &ctx.tenant, function_registry: &*instance.function_registry, diff --git a/server/src/http.rs b/server/src/http.rs index 05da19d850..6f4a908d41 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -6,7 +6,6 @@ use std::{ collections::HashMap, convert::Infallible, error::Error as StdError, net::IpAddr, sync::Arc, }; -use catalog::manager::Manager as CatalogManager; use log::error; use profile::Profiler; use query_engine::executor::Executor as QueryExecutor; @@ -84,21 +83,21 @@ impl reject::Reject for Error {} /// Http service /// /// Note that the service does not owns the runtime -pub struct Service { +pub struct Service { runtimes: Arc, - instance: InstanceRef, + instance: InstanceRef, profiler: Arc, tx: Sender<()>, } -impl Service { +impl Service { // TODO(yingwen): Maybe log error or return error pub fn stop(self) { let _ = self.tx.send(()); } } -impl Service { +impl Service { fn routes(&self) -> impl Filter + Clone { self.home() .or(self.metrics()) @@ -147,7 +146,7 @@ impl Service { warp::path!("flush_memtable") .and(warp::post()) .and(self.with_instance()) - .and_then(|instance: InstanceRef| async move { + .and_then(|instance: InstanceRef| async move { let get_all_tables = || { let mut tables = Vec::new(); for catalog in instance @@ -265,7 +264,7 @@ impl Service { fn with_instance( &self, - ) -> impl Filter,), Error = Infallible> + Clone { + ) -> impl Filter,), Error = Infallible> + Clone { let instance = self.instance.clone(); warp::any().map(move || instance.clone()) } @@ -296,13 +295,13 @@ impl Service { } /// Service builder -pub struct Builder { +pub struct Builder { config: Config, runtimes: Option>, - instance: Option>, + instance: Option>, } -impl Builder { +impl Builder { pub fn new(config: Config) -> Self { Self { config, @@ -316,15 +315,15 @@ impl Builder { self } - pub fn instance(mut self, instance: InstanceRef) -> Self { + pub fn instance(mut self, instance: InstanceRef) -> Self { self.instance = Some(instance); self } } -impl Builder { +impl Builder { /// Build and start the service - pub fn build(self) -> Result> { + pub fn build(self) -> Result> { let runtimes = self.runtimes.context(MissingRuntimes)?; let instance = self.instance.context(MissingInstance)?; let (tx, rx) = oneshot::channel(); diff --git a/server/src/instance.rs b/server/src/instance.rs index 7c755d9fec..918bc02fca 100644 --- a/server/src/instance.rs +++ b/server/src/instance.rs @@ -4,6 +4,7 @@ use std::sync::Arc; +use catalog::manager::ManagerRef; use df_operator::registry::FunctionRegistryRef; use table_engine::engine::TableEngineRef; @@ -11,10 +12,9 @@ use crate::limiter::Limiter; /// A cluster instance. Usually there is only one instance per cluster /// -/// C: catalog::manager::Manager /// Q: query_engine::executor::Executor -pub struct Instance { - pub catalog_manager: C, +pub struct Instance { + pub catalog_manager: ManagerRef, pub query_executor: Q, pub table_engine: TableEngineRef, // User defined functions registry. @@ -23,4 +23,4 @@ pub struct Instance { } /// A reference counted instance pointer -pub type InstanceRef = Arc>; +pub type InstanceRef = Arc>; diff --git a/server/src/mysql/builder.rs b/server/src/mysql/builder.rs index 3610b0aef9..d74ad58187 100644 --- a/server/src/mysql/builder.rs +++ b/server/src/mysql/builder.rs @@ -2,7 +2,6 @@ use std::{net::SocketAddr, sync::Arc}; -use catalog::manager::Manager as CatalogManager; use query_engine::executor::Executor as QueryExecutor; use snafu::{OptionExt, ResultExt}; use table_engine::engine::EngineRuntimes; @@ -15,10 +14,10 @@ use crate::{ }, }; -pub struct Builder { +pub struct Builder { config: Config, runtimes: Option>, - instance: Option>, + instance: Option>, } #[derive(Debug)] @@ -27,7 +26,7 @@ pub struct Config { pub port: u16, } -impl Builder { +impl Builder { pub fn new(config: Config) -> Self { Self { config, @@ -41,14 +40,14 @@ impl Builder { self } - pub fn instance(mut self, instance: InstanceRef) -> Self { + pub fn instance(mut self, instance: InstanceRef) -> Self { self.instance = Some(instance); self } } -impl Builder { - pub fn build(self) -> Result> { +impl Builder { + pub fn build(self) -> Result> { let runtimes = self.runtimes.context(MissingRuntimes)?; let instance = self.instance.context(MissingInstance)?; diff --git a/server/src/mysql/service.rs b/server/src/mysql/service.rs index 2b263cafa6..503766eada 100644 --- a/server/src/mysql/service.rs +++ b/server/src/mysql/service.rs @@ -2,7 +2,6 @@ use std::{net::SocketAddr, sync::Arc}; -use catalog::manager::Manager as CatalogManager; use common_util::runtime::JoinHandle; use log::{error, info}; use opensrv_mysql::AsyncMysqlIntermediary; @@ -19,20 +18,20 @@ use crate::{ }, }; -pub struct MysqlService { - instance: InstanceRef, +pub struct MysqlService { + instance: InstanceRef, runtimes: Arc, socket_addr: SocketAddr, join_handler: Option>, tx: Option>, } -impl MysqlService { +impl MysqlService { pub fn new( - instance: Arc>, + instance: Arc>, runtimes: Arc, socket_addr: SocketAddr, - ) -> MysqlService { + ) -> MysqlService { Self { instance, runtimes, @@ -43,7 +42,7 @@ impl MysqlService { } } -impl MysqlService { +impl MysqlService { pub async fn start(&mut self) -> Result<()> { let (tx, rx) = oneshot::channel(); @@ -66,7 +65,7 @@ impl MysqlService } async fn loop_accept( - instance: InstanceRef, + instance: InstanceRef, runtimes: Arc, socket_addr: SocketAddr, mut rx: Receiver<()>, diff --git a/server/src/mysql/worker.rs b/server/src/mysql/worker.rs index 68684f899c..10f674488b 100644 --- a/server/src/mysql/worker.rs +++ b/server/src/mysql/worker.rs @@ -2,7 +2,6 @@ use std::{marker::PhantomData, sync::Arc}; -use catalog::manager::Manager as CatalogManager; use log::{error, info}; use opensrv_mysql::{AsyncMysqlShim, ErrorKind, QueryResultWriter, StatementMetaWriter}; use query_engine::executor::Executor as QueryExecutor; @@ -22,19 +21,18 @@ use crate::{ }, }; -pub struct MysqlWorker { +pub struct MysqlWorker { generic_hold: PhantomData, - instance: Arc>, + instance: Arc>, runtimes: Arc, } -impl MysqlWorker +impl MysqlWorker where W: std::io::Write + Send + Sync, - C: CatalogManager + 'static, Q: QueryExecutor + 'static, { - pub fn new(instance: Arc>, runtimes: Arc) -> Self { + pub fn new(instance: Arc>, runtimes: Arc) -> Self { Self { generic_hold: PhantomData::default(), instance, @@ -44,10 +42,9 @@ where } #[async_trait::async_trait] -impl AsyncMysqlShim for MysqlWorker +impl AsyncMysqlShim for MysqlWorker where W: std::io::Write + Send + Sync, - C: CatalogManager + 'static, Q: QueryExecutor + 'static, { type Error = crate::mysql::error::Error; @@ -101,10 +98,9 @@ where } } -impl MysqlWorker +impl MysqlWorker where W: std::io::Write + Send + Sync, - C: CatalogManager + 'static, Q: QueryExecutor + 'static, { async fn do_query<'a>(&'a mut self, sql: &'a str) -> Result { diff --git a/server/src/router.rs b/server/src/router.rs index aa687c714b..4bd39a9abf 100644 --- a/server/src/router.rs +++ b/server/src/router.rs @@ -6,7 +6,7 @@ use std::{ sync::Arc, }; -use ceresdbproto::storage::{Endpoint, Route, RouteRequest}; +use ceresdbproto_deps::ceresdbproto::storage::{Endpoint, Route, RouteRequest}; use log::info; use meta_client::{MetaClient, ShardId}; use serde_derive::Deserialize; @@ -24,7 +24,7 @@ pub trait Router { fn route(&self, schema: &str, req: RouteRequest) -> Result>; } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct PrefixRule { /// Schema name of the prefix. pub schema: String, @@ -34,7 +34,7 @@ pub struct PrefixRule { pub shard: ShardId, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct HashRule { /// Schema name of the prefix. pub schema: String, @@ -42,7 +42,7 @@ pub struct HashRule { pub shards: Vec, } -#[derive(Debug, Default, Deserialize)] +#[derive(Clone, Debug, Default, Deserialize)] pub struct RuleList { pub prefix_rules: Vec, pub hash_rules: Vec, diff --git a/server/src/server.rs b/server/src/server.rs index 1a89354467..8acc8d70cc 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -4,9 +4,11 @@ use std::sync::Arc; -use catalog::manager::Manager as CatalogManager; +use catalog::manager::ManagerRef; +use cluster::ClusterRef; use df_operator::registry::FunctionRegistryRef; use grpcio::Environment; +use log::warn; use query_engine::executor::Executor as QueryExecutor; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::engine::{EngineRuntimes, TableEngineRef}; @@ -58,46 +60,84 @@ pub enum Error { #[snafu(display("Failed to start grpc service, err:{}", source))] StartGrpcService { source: crate::grpc::Error }, + + #[snafu(display("Failed to start clsuter, err:{}", source))] + StartCluster { source: cluster::Error }, } define_result!(Error); // TODO(yingwen): Consider a config manager /// Server -pub struct Server { - http_service: Service, +pub struct Server { + http_service: Service, rpc_services: RpcServices, - mysql_service: mysql::MysqlService, + mysql_service: mysql::MysqlService, + instance: InstanceRef, + cluster: Option, } -impl Server { - pub fn stop(mut self) { +impl Server { + pub async fn stop(mut self) { self.rpc_services.shutdown(); self.http_service.stop(); self.mysql_service.shutdown(); + + if let Some(cluster) = &self.cluster { + cluster.stop().await.expect("fail to stop cluster"); + } } pub async fn start(&mut self) -> Result<()> { + if let Some(cluster) = &self.cluster { + cluster.start().await.context(StartCluster)?; + } + + self.create_default_schema_if_not_exists().await; + self.mysql_service .start() .await .context(StartMysqlService)?; - self.rpc_services.start().await.context(StartGrpcService) + self.rpc_services.start().await.context(StartGrpcService)?; + + Ok(()) + } + + async fn create_default_schema_if_not_exists(&self) { + let catalog_mgr = &self.instance.catalog_manager; + let default_catalog = catalog_mgr + .catalog_by_name(catalog_mgr.default_catalog_name()) + .expect("Fail to retreive default catalog") + .expect("Default catalog doesn't exist"); + + if default_catalog + .schema_by_name(catalog_mgr.default_schema_name()) + .expect("Fail to retreive default schema") + .is_none() + { + warn!("Deafult schema doesn't exist and create it"); + default_catalog + .create_schema(catalog_mgr.default_schema_name()) + .await + .expect("Fail to create default schema"); + } } } #[must_use] -pub struct Builder { +pub struct Builder { config: Config, runtimes: Option>, - catalog_manager: Option, + catalog_manager: Option, query_executor: Option, table_engine: Option, function_registry: Option, limiter: Limiter, + cluster: Option, } -impl Builder { +impl Builder { pub fn new(config: Config) -> Self { Self { config, @@ -107,6 +147,7 @@ impl Builder { table_engine: None, function_registry: None, limiter: Limiter::default(), + cluster: None, } } @@ -115,7 +156,7 @@ impl Builder { self } - pub fn catalog_manager(mut self, val: C) -> Self { + pub fn catalog_manager(mut self, val: ManagerRef) -> Self { self.catalog_manager = Some(val); self } @@ -140,8 +181,13 @@ impl Builder { self } + pub fn cluster(mut self, cluster: ClusterRef) -> Self { + self.cluster = Some(cluster); + self + } + /// Build and run the server - pub fn build(self) -> Result> { + pub fn build(self) -> Result> { // Build runtimes let runtimes = self.runtimes.context(MissingRuntimes)?; @@ -191,7 +237,7 @@ impl Builder { .meta_client_config(meta_client_config) .env(env) .runtimes(runtimes) - .instance(instance) + .instance(instance.clone()) .route_rules(self.config.route_rules) .build() .context(BuildGrpcService)?; @@ -200,6 +246,8 @@ impl Builder { http_service, rpc_services, mysql_service, + instance, + cluster: self.cluster, }; Ok(server) } diff --git a/sql/Cargo.toml b/sql/Cargo.toml index 1e0119198c..79e63721f2 100644 --- a/sql/Cargo.toml +++ b/sql/Cargo.toml @@ -15,6 +15,7 @@ test = [] # In alphabetical order arrow_deps = { path = "../arrow_deps" } catalog = { path = "../catalog" } +ceresdbproto_deps = { path = "../ceresdbproto_deps" } common_types = { path = "../common_types"} common_util = { path = "../common_util" } df_operator = { path = "../df_operator" } @@ -23,7 +24,6 @@ paste = "1.0" snafu = { version ="0.6.10", features = ["backtraces"]} sqlparser = "0.19.0" table_engine = { path = "../table_engine" } -ceresdbproto = { git = "https://github.com/CeresDB/ceresdbproto.git", rev = "2c10152d021cd5a26b9c870cdede6a0317adca3d" } regex = "1" [dev-dependencies] diff --git a/sql/src/frontend.rs b/sql/src/frontend.rs index f45e6def4d..a99133b0a9 100644 --- a/sql/src/frontend.rs +++ b/sql/src/frontend.rs @@ -4,7 +4,7 @@ use std::{convert::TryInto, sync::Arc}; -use ceresdbproto::prometheus::PrometheusQueryRequest; +use ceresdbproto_deps::ceresdbproto::prometheus::PrometheusQueryRequest; use common_types::request_id::RequestId; use snafu::{ResultExt, Snafu}; use table_engine::table; diff --git a/sql/src/planner.rs b/sql/src/planner.rs index 6f78a89929..d24d778f34 100644 --- a/sql/src/planner.rs +++ b/sql/src/planner.rs @@ -868,7 +868,9 @@ mod tests { InsertPlan { table: MemoryTable { name: "test_table", - id: TableId(100, 0, 100), + id: TableId( + 100, + ), schema: Schema { num_key_columns: 2, timestamp_index: 1, @@ -1044,7 +1046,9 @@ mod tests { DescribeTablePlan { table: MemoryTable { name: "test_table", - id: TableId(100, 0, 100), + id: TableId( + 100, + ), schema: Schema { num_key_columns: 2, timestamp_index: 1, @@ -1111,7 +1115,9 @@ mod tests { AlterTablePlan { table: MemoryTable { name: "test_table", - id: TableId(100, 0, 100), + id: TableId( + 100, + ), schema: Schema { num_key_columns: 2, timestamp_index: 1, @@ -1191,7 +1197,9 @@ mod tests { AlterTablePlan { table: MemoryTable { name: "test_table", - id: TableId(100, 0, 100), + id: TableId( + 100, + ), schema: Schema { num_key_columns: 2, timestamp_index: 1, @@ -1264,7 +1272,9 @@ mod tests { ShowCreatePlan { table: MemoryTable { name: "test_table", - id: TableId(100, 0, 100), + id: TableId( + 100, + ), schema: Schema { num_key_columns: 2, timestamp_index: 1, diff --git a/sql/src/promql/convert.rs b/sql/src/promql/convert.rs index b11b60e5e9..eb9bf45463 100644 --- a/sql/src/promql/convert.rs +++ b/sql/src/promql/convert.rs @@ -13,7 +13,7 @@ use arrow_deps::datafusion::{ }, sql::planner::ContextProvider, }; -use ceresdbproto::prometheus::{ +use ceresdbproto_deps::ceresdbproto::prometheus::{ Expr as ExprPb, Filter as FilterPb, FilterType as FilterPbType, Operand as OperandPb, Selector as PbSelector, SubExpr as PbSubExpr, SubExpr_OperatorType, }; diff --git a/sql/src/provider.rs b/sql/src/provider.rs index 2e89b46d6c..05c32d7074 100644 --- a/sql/src/provider.rs +++ b/sql/src/provider.rs @@ -14,7 +14,7 @@ use arrow_deps::{ }, datafusion_expr::TableSource, }; -use catalog::manager::Manager; +use catalog::manager::ManagerRef; use common_types::request_id::RequestId; use df_operator::{registry::FunctionRegistry, scalar::ScalarUdf, udaf::AggregateUdf}; use snafu::{ResultExt, Snafu}; @@ -78,14 +78,14 @@ pub trait MetaProvider { /// - Other meta data like default catalog and schema are needed // TODO(yingwen): Maybe support schema searching instead of using a fixed // default schema -pub struct CatalogMetaProvider<'a, M> { - pub manager: &'a M, +pub struct CatalogMetaProvider<'a> { + pub manager: ManagerRef, pub default_catalog: &'a str, pub default_schema: &'a str, pub function_registry: &'a (dyn FunctionRegistry + Send + Sync), } -impl<'a, M: Manager> MetaProvider for CatalogMetaProvider<'a, M> { +impl<'a> MetaProvider for CatalogMetaProvider<'a> { fn default_catalog_name(&self) -> &str { self.default_catalog } diff --git a/src/adapter.rs b/src/adapter.rs new file mode 100644 index 0000000000..b13cceb3cb --- /dev/null +++ b/src/adapter.rs @@ -0,0 +1,151 @@ +use async_trait::async_trait; +use catalog::{ + manager::ManagerRef as CatalogManagerRef, + schema::{CloseOptions, CloseTableRequest, NameRef, OpenOptions, OpenTableRequest, SchemaRef}, +}; +use catalog_impls::{SchemaIdAlloc, TableIdAlloc}; +use cluster::TableManipulator; +use log::debug; +use meta_client_v2::{types::DropTableRequest, MetaClientRef}; +use table_engine::{ + engine::TableEngineRef, + table::{SchemaId, TableId}, + ANALYTIC_ENGINE_TYPE, +}; + +pub struct SchemaIdAllocAdapter(pub MetaClientRef); + +#[async_trait] +impl SchemaIdAlloc for SchemaIdAllocAdapter { + type Error = meta_client_v2::Error; + + async fn alloc_schema_id<'a>(&self, schema_name: NameRef<'a>) -> Result { + self.0 + .alloc_schema_id(cluster::AllocSchemaIdRequest { + name: schema_name.to_string(), + }) + .await + .map(|resp| SchemaId::from(resp.id)) + } +} +pub struct TableIdAllocAdapter(pub MetaClientRef); + +#[async_trait] +impl TableIdAlloc for TableIdAllocAdapter { + type Error = meta_client_v2::Error; + + async fn alloc_table_id<'a>( + &self, + schema_name: NameRef<'a>, + table_name: NameRef<'a>, + ) -> Result { + self.0 + .alloc_table_id(cluster::AllocTableIdRequest { + schema_name: schema_name.to_string(), + name: table_name.to_string(), + }) + .await + .map(|v| TableId::from(v.id)) + } + + async fn invalidate_table_id<'a>( + &self, + schema_name: NameRef<'a>, + table_name: NameRef<'a>, + table_id: TableId, + ) -> Result<(), Self::Error> { + self.0 + .drop_table(DropTableRequest { + schema_name: schema_name.to_string(), + name: table_name.to_string(), + id: table_id.as_u64(), + }) + .await + } +} + +pub struct TableManipulatorImpl { + pub catalog_manager: CatalogManagerRef, + pub table_engine: TableEngineRef, +} + +impl TableManipulatorImpl { + fn catalog_schema_by_name( + &self, + schema_name: &str, + ) -> Result<(NameRef, Option), Box> { + let default_catalog_name = self.catalog_manager.default_catalog_name(); + let default_catalog = self + .catalog_manager + .catalog_by_name(default_catalog_name) + .map_err(Box::new)? + .unwrap(); + let schema = default_catalog + .schema_by_name(schema_name) + .map_err(Box::new)?; + Ok((default_catalog_name, schema)) + } +} + +#[async_trait] +impl TableManipulator for TableManipulatorImpl { + async fn open_table( + &self, + schema_name: &str, + table_name: &str, + table_id: u64, + ) -> Result<(), Box> { + let (default_catalog_name, schema) = self.catalog_schema_by_name(schema_name)?; + let schema = schema.unwrap(); + let table_id = TableId::from(table_id); + let req = OpenTableRequest { + catalog_name: default_catalog_name.to_string(), + schema_name: schema_name.to_string(), + schema_id: schema.id(), + table_name: table_name.to_string(), + table_id, + engine: ANALYTIC_ENGINE_TYPE.to_string(), + }; + let opts = OpenOptions { + table_engine: self.table_engine.clone(), + }; + let table = schema.open_table(req, opts).await.map_err(Box::new)?; + debug!( + "Finish opening table:{}-{}, catalog:{}, schema:{}, really_opened:{}", + table_name, + table_id, + default_catalog_name, + schema_name, + table.is_some() + ); + Ok(()) + } + + async fn close_table( + &self, + schema_name: &str, + table_name: &str, + table_id: u64, + ) -> Result<(), Box> { + let (default_catalog_name, schema) = self.catalog_schema_by_name(schema_name)?; + let schema = schema.unwrap(); + let table_id = TableId::from(table_id); + let req = CloseTableRequest { + catalog_name: default_catalog_name.to_string(), + schema_name: schema_name.to_string(), + schema_id: schema.id(), + table_name: table_name.to_string(), + table_id, + engine: ANALYTIC_ENGINE_TYPE.to_string(), + }; + let opts = CloseOptions { + table_engine: self.table_engine.clone(), + }; + schema.close_table(req, opts).await.map_err(Box::new)?; + debug!( + "Finish closing table:{}-{}, catalog:{}, schema:{}", + table_name, table_id, default_catalog_name, schema_name + ); + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index eb2b6ae18a..bb3642e6c7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,5 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -//! ceresdb - +mod adapter; pub mod setup; mod signal_handler; diff --git a/src/setup.rs b/src/setup.rs index e33594a015..6701c07eed 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -8,24 +8,29 @@ use analytic_engine::{ self, setup::{EngineBuilder, ReplicatedEngineBuilder, RocksEngineBuilder}, }; -use catalog_impls::{table_based::TableBasedManager, CatalogManagerImpl}; +use catalog_impls::{table_based::TableBasedManager, volatile, CatalogManagerImpl}; +use cluster::cluster_impl::ClusterImpl; use common_util::runtime; use df_operator::registry::FunctionRegistryImpl; use log::info; use logger::RuntimeLevel; -use query_engine::executor::ExecutorImpl; +use meta_client_v2::meta_impl; +use query_engine::executor::{Executor, ExecutorImpl}; use server::{ - config::{Config, RuntimeConfig}, + config::{Config, DeployMode, RuntimeConfig}, server::Builder, table_engine::{MemoryTableEngine, TableEngineProxy}, }; -use table_engine::engine::EngineRuntimes; +use table_engine::engine::{EngineRuntimes, TableEngineRef}; use tracing_util::{ self, tracing_appender::{non_blocking::WorkerGuard, rolling::Rotation}, }; -use crate::signal_handler; +use crate::{ + adapter::{SchemaIdAllocAdapter, TableIdAllocAdapter, TableManipulatorImpl}, + signal_handler, +}; /// Setup log with given `config`, returns the runtime log level switch. pub fn setup_log(config: &Config) -> RuntimeLevel { @@ -48,10 +53,7 @@ fn build_runtime(name: &str, threads_num: usize) -> runtime::Runtime { .thread_name(name) .enable_all() .build() - .unwrap_or_else(|e| { - //TODO(yingwen) replace panic with fatal - panic!("Failed to create runtime, err:{}", e); - }) + .expect("Failed to create runtime") } fn build_engine_runtimes(config: &RuntimeConfig) -> EngineRuntimes { @@ -92,9 +94,7 @@ where let analytic = analytic_engine_builder .build(analytic_config, runtimes.clone()) .await - .unwrap_or_else(|e| { - panic!("Failed to setup analytic engine, err:{}", e); - }); + .expect("Failed to setup analytic engine"); // Create table engine proxy let engine_proxy = Arc::new(TableEngineProxy { @@ -102,43 +102,87 @@ where analytic: analytic.clone(), }); - // Create catalog manager, use analytic table as backend - let catalog_manager = CatalogManagerImpl::new( - TableBasedManager::new(analytic, engine_proxy.clone()) - .await - .unwrap_or_else(|e| { - panic!("Failed to create catalog manager, err:{}", e); - }), - ); - // Init function registry. let mut function_registry = FunctionRegistryImpl::new(); - function_registry.load_functions().unwrap_or_else(|e| { - panic!("Failed to create function registry, err:{}", e); - }); + function_registry + .load_functions() + .expect("Failed to create function registry"); let function_registry = Arc::new(function_registry); // Create query executor let query_executor = ExecutorImpl::new(); - // Build and start server - let mut server = Builder::new(config) + let builder = Builder::new(config.clone()) .runtimes(runtimes.clone()) - .catalog_manager(catalog_manager) .query_executor(query_executor) - .table_engine(engine_proxy) - .function_registry(function_registry) - .build() - .unwrap_or_else(|e| { - panic!("Failed to create server, err:{}", e); - }); - server.start().await.unwrap_or_else(|e| { - panic!("Failed to start server,, err:{}", e); - }); + .table_engine(engine_proxy.clone()) + .function_registry(function_registry); + + let builder = match config.deploy_mode { + DeployMode::Standalone => build_in_standalone_mode(builder, analytic, engine_proxy).await, + DeployMode::Cluster => { + build_in_cluster_mode(&config, builder, &runtimes, engine_proxy).await + } + }; + + // Build and start server + let mut server = builder.build().expect("Failed to create server"); + server.start().await.expect("Failed to start server"); // Wait for signal signal_handler::wait_for_signal(); // Stop server - server.stop(); + server.stop().await; +} + +async fn build_in_cluster_mode( + config: &Config, + builder: Builder, + runtimes: &EngineRuntimes, + engine_proxy: TableEngineRef, +) -> Builder { + let meta_client = meta_impl::build_meta_client( + config.cluster.meta_client.clone(), + config.cluster.node.clone(), + runtimes.meta_runtime.clone(), + ) + .expect("fail to build meta client"); + + let catalog_manager = { + let schema_id_alloc = SchemaIdAllocAdapter(meta_client.clone()); + let table_id_alloc = TableIdAllocAdapter(meta_client.clone()); + Arc::new(volatile::ManagerImpl::new(schema_id_alloc, table_id_alloc).await) + }; + + let cluster = { + let table_manipulator = Arc::new(TableManipulatorImpl { + catalog_manager: catalog_manager.clone(), + table_engine: engine_proxy, + }); + let cluster_impl = ClusterImpl::new( + meta_client, + table_manipulator, + config.cluster.clone(), + runtimes.meta_runtime.clone(), + ) + .unwrap(); + Arc::new(cluster_impl) + }; + + builder.catalog_manager(catalog_manager).cluster(cluster) +} + +async fn build_in_standalone_mode( + builder: Builder, + table_engine: TableEngineRef, + engine_proxy: TableEngineRef, +) -> Builder { + let table_based_manager = TableBasedManager::new(table_engine, engine_proxy.clone()) + .await + .expect("Failed to create catalog manager"); + + // Create catalog manager, use analytic table as backend + let catalog_manager = Arc::new(CatalogManagerImpl::new(Arc::new(table_based_manager))); + builder.catalog_manager(catalog_manager) } diff --git a/src/signal_handler.rs b/src/signal_handler.rs index 39ad1733f4..b23deceae0 100644 --- a/src/signal_handler.rs +++ b/src/signal_handler.rs @@ -12,10 +12,8 @@ mod details { use signal_hook::{consts::TERM_SIGNALS, iterator::Signals}; pub fn wait_for_signal() { - let mut sigs = Signals::new(TERM_SIGNALS).unwrap_or_else(|e| { - // TODO(yingwen): Log here - panic!("Failed to register signal handlers, err:{}", e); - }); + let mut sigs = Signals::new(TERM_SIGNALS).expect("Failed to register signal handlers"); + for signal in &mut sigs { if TERM_SIGNALS.contains(&signal) { info!("Received signal {}, stopping server...", signal); diff --git a/system_catalog/src/lib.rs b/system_catalog/src/lib.rs index b56b1a912a..98f4f97de3 100644 --- a/system_catalog/src/lib.rs +++ b/system_catalog/src/lib.rs @@ -2,6 +2,8 @@ //! System catalog implementations +#![feature(const_option)] + use std::{ collections::HashMap, fmt::Debug, @@ -30,21 +32,22 @@ pub mod sys_catalog_table; pub mod tables; /// Schema id of the sys catalog schema (`system/public`). -pub const SYSTEM_SCHEMA_ID: SchemaId = SchemaId::from_u16(1); +pub const SYSTEM_SCHEMA_ID: SchemaId = SchemaId::from_u32(1); /// Table name of the `sys_catalog`. pub const SYS_CATALOG_TABLE_NAME: &str = "sys_catalog"; /// Table sequence of the `sys_catalog` table, always set to 1 pub const SYS_CATALOG_TABLE_SEQ: TableSeq = TableSeq::from_u32(1); /// Table id of the `sys_catalog` table. -pub const SYS_CATALOG_TABLE_ID: TableId = TableId::new(SYSTEM_SCHEMA_ID, SYS_CATALOG_TABLE_SEQ); +pub const SYS_CATALOG_TABLE_ID: TableId = + TableId::with_seq(SYSTEM_SCHEMA_ID, SYS_CATALOG_TABLE_SEQ).unwrap(); /// Table name of the `tables` table. pub const TABLES_TABLE_NAME: &str = "tables"; /// Table sequence of the `tables` table. pub const TABLES_TABLE_SEQ: TableSeq = TableSeq::from_u32(2); /// Table id of the `tables` table. -pub const TABLES_TABLE_ID: TableId = TableId::new(SYSTEM_SCHEMA_ID, TABLES_TABLE_SEQ); +pub const TABLES_TABLE_ID: TableId = TableId::with_seq(SYSTEM_SCHEMA_ID, TABLES_TABLE_SEQ).unwrap(); // NOTE: The MAX_SYSTEM_TABLE_ID should be updated if any new system table is // added. diff --git a/system_catalog/src/sys_catalog_table.rs b/system_catalog/src/sys_catalog_table.rs index 12644d109b..402bb7a33e 100644 --- a/system_catalog/src/sys_catalog_table.rs +++ b/system_catalog/src/sys_catalog_table.rs @@ -873,19 +873,15 @@ impl CreateSchemaRequest { } } -impl TryFrom for CreateSchemaRequest { - type Error = Error; - - fn try_from(entry: SchemaEntry) -> Result { - let schema_id = SchemaId::new(entry.schema_id).context(InvalidSchemaId { - id: entry.schema_id, - })?; +impl From for CreateSchemaRequest { + fn from(entry: SchemaEntry) -> Self { + let schema_id = SchemaId::from(entry.schema_id); - Ok(Self { + Self { catalog_name: entry.catalog_name, schema_name: entry.schema_name, schema_id, - }) + } } } @@ -1002,7 +998,7 @@ fn decode_one_request(key: &[u8], value: &[u8]) -> Result { } KeyType::CreateSchema => { let entry = SchemaEntry::parse_from_bytes(value).context(DecodeEntryPb)?; - DecodedRequest::CreateSchema(CreateSchemaRequest::try_from(entry)?) + DecodedRequest::CreateSchema(CreateSchemaRequest::from(entry)) } KeyType::TableEntry => { let entry = TableEntry::parse_from_bytes(value).context(DecodeEntryPb)?; diff --git a/system_catalog/src/tables.rs b/system_catalog/src/tables.rs index dff209dd17..31749ae63d 100644 --- a/system_catalog/src/tables.rs +++ b/system_catalog/src/tables.rs @@ -5,7 +5,7 @@ use std::fmt::{Debug, Formatter}; use async_trait::async_trait; -use catalog::{manager::Manager, schema::SchemaRef, CatalogRef}; +use catalog::{manager::ManagerRef, schema::SchemaRef, CatalogRef}; use common_types::{ column_schema, datum::{Datum, DatumKind}, @@ -82,12 +82,12 @@ fn tables_schema() -> Schema { .unwrap() } -pub struct Tables { +pub struct Tables { schema: Schema, - catalog_manager: M, + catalog_manager: ManagerRef, } -impl Debug for Tables { +impl Debug for Tables { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("SysTables") .field("schema", &self.schema) @@ -95,8 +95,8 @@ impl Debug for Tables { } } -impl Tables { - pub fn new(catalog_manager: M) -> Self { +impl Tables { + pub fn new(catalog_manager: ManagerRef) -> Self { Self { schema: tables_schema(), catalog_manager, @@ -117,7 +117,7 @@ impl Tables { } #[async_trait] -impl SystemTable for Tables { +impl SystemTable for Tables { fn name(&self) -> &str { TABLES_TABLE_NAME } diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index 1256de2af8..b7a2e2501e 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -137,28 +137,11 @@ pub const DEFAULT_READ_PARALLELISM: usize = 8; pub struct SchemaId(u32); impl SchemaId { - /// Bits of schema id. - const BITS: u32 = 24; - /// 24 bits mask (0xffffff) - const MASK: u32 = (1 << Self::BITS) - 1; - /// Max schema id. - pub const MAX: SchemaId = SchemaId(Self::MASK); /// Min schema id. pub const MIN: SchemaId = SchemaId(0); - /// Create a new schema id from u32, return None if `id` is invalid. - pub fn new(id: u32) -> Option { - // Only need to check max as min is 0. - if id <= SchemaId::MAX.0 { - Some(Self(id)) - } else { - None - } - } - - // It is safe to convert u16 into schema id. - pub const fn from_u16(id: u16) -> Self { - Self(id as u32) + pub const fn from_u32(id: u32) -> Self { + Self(id) } /// Convert the schema id into u32. @@ -176,7 +159,13 @@ impl PartialEq for SchemaId { impl From for SchemaId { fn from(id: u16) -> SchemaId { - SchemaId::from_u16(id) + Self(id as u32) + } +} + +impl From for SchemaId { + fn from(id: u32) -> SchemaId { + Self(id) } } @@ -205,8 +194,8 @@ impl TableSeq { } // It is safe to convert u32 into table seq. - pub const fn from_u32(id: u32) -> Self { - Self(id as u64) + pub const fn from_u32(seq: u32) -> Self { + Self(seq as u64) } /// Convert the table sequence into u64. @@ -218,7 +207,16 @@ impl TableSeq { impl From for TableSeq { fn from(id: u32) -> TableSeq { - TableSeq::from_u32(id) + TableSeq(id as u64) + } +} + +impl From for TableSeq { + /// Get the sequence part of the table id. + fn from(table_id: TableId) -> TableSeq { + let seq_part = table_id.0 & TableSeq::MASK; + + TableSeq(seq_part) } } @@ -226,7 +224,7 @@ impl From for TableSeq { /// /// Table id is constructed via schema id (24 bits) and a table sequence (40 /// bits). -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize)] +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Deserialize)] pub struct TableId(u64); impl TableId { @@ -234,33 +232,21 @@ impl TableId { pub const MIN: TableId = TableId(0); /// Create table id from raw u64 number. - pub const fn from_raw(id: u64) -> Self { + pub const fn new(id: u64) -> Self { Self(id) } /// Create a new table id from `schema_id` and `table_seq`. - pub const fn new(schema_id: SchemaId, table_seq: TableSeq) -> Self { + /// + /// Return `None` If `schema_id` is not invalid. + pub const fn with_seq(schema_id: SchemaId, table_seq: TableSeq) -> Option { let schema_id_data = schema_id.0 as u64; let schema_id_part = schema_id_data << TableSeq::BITS; - let table_id_data = schema_id_part | table_seq.0; - - Self(table_id_data) - } - - /// Get the schema id part of the table id. - #[inline] - pub fn schema_id(&self) -> SchemaId { - let schema_id_part = self.0 >> TableSeq::BITS; - - SchemaId(schema_id_part as u32) - } - - /// Get the sequence part of the table id. - #[inline] - pub fn table_seq(&self) -> TableSeq { - let seq_part = self.0 & TableSeq::MASK; - - TableSeq(seq_part) + if (schema_id_part >> TableSeq::BITS) != schema_id_data { + None + } else { + Some(Self(schema_id_part | table_seq.0)) + } } /// Convert table id into u64. @@ -272,19 +258,7 @@ impl TableId { impl From for TableId { fn from(id: u64) -> TableId { - TableId(id) - } -} - -impl fmt::Debug for TableId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "TableId({}, {}, {})", - self.0, - self.schema_id().as_u32(), - self.table_seq().as_u64() - ) + TableId::new(id) } } @@ -490,9 +464,10 @@ impl SchemaIdGenerator { } pub fn alloc_schema_id(&self) -> Option { + // TODO: consider the case where schema id overflows. let last = self.last_schema_id.fetch_add(1, Ordering::Relaxed); - SchemaId::new(last + 1) + Some(SchemaId::from(last + 1)) } } @@ -520,6 +495,7 @@ impl TableSeqGenerator { } pub fn alloc_table_seq(&self) -> Option { + // TODO: consider the case where table sequence overflows. let last = self.last_table_seq.fetch_add(1, Ordering::Relaxed); TableSeq::new(last + 1) @@ -606,7 +582,6 @@ mod tests { #[test] fn test_schema_id() { assert_eq!(0, SchemaId::MIN.as_u32()); - assert_eq!(0xffffff, SchemaId::MAX.as_u32()); } #[test]