From 61b7b74859a65e1e2221119aec951de290defc21 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 7 Sep 2022 11:40:33 +0800 Subject: [PATCH] refactor: remove `SchemaIdAlloc` and `TableIdAlloc` (#238) * tiny tidies Signed-off-by: Ruihang Xia * remove to IdAlloc_s Signed-off-by: Ruihang Xia * run cargo fmt Signed-off-by: Ruihang Xia Signed-off-by: Ruihang Xia --- Cargo.lock | 2 + catalog/src/lib.rs | 1 - catalog_impls/Cargo.toml | 2 + catalog_impls/src/lib.rs | 28 --- catalog_impls/src/volatile.rs | 94 ++++--- cluster/src/cluster_impl.rs | 14 +- docs/guides/src/dev/crate-deps.dot | 2 + docs/guides/src/dev/crate-deps.svg | 378 +++++++++++++++-------------- meta_client/src/meta_impl.rs | 4 +- src/adapter.rs | 59 +---- src/setup.rs | 11 +- 11 files changed, 255 insertions(+), 340 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9067b91162..0fd7495092 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -734,9 +734,11 @@ dependencies = [ "analytic_engine", "async-trait", "catalog", + "cluster", "common_types 0.1.0", "common_util", "log", + "meta_client", "server", "snafu 0.6.10", "system_catalog", diff --git a/catalog/src/lib.rs b/catalog/src/lib.rs index 90799b9205..caa765bb3a 100644 --- a/catalog/src/lib.rs +++ b/catalog/src/lib.rs @@ -38,7 +38,6 @@ pub enum Error { define_result!(Error); /// Catalog manage schemas -// TODO(yingwen): Maybe use async trait? // TODO(yingwen): Provide a context // TODO(yingwen): Catalog id? #[async_trait] diff --git a/catalog_impls/Cargo.toml b/catalog_impls/Cargo.toml index 0a19927db1..20fc457fce 100644 --- a/catalog_impls/Cargo.toml +++ b/catalog_impls/Cargo.toml @@ -12,9 +12,11 @@ workspace = true # Workspace dependencies, in alphabetical order async-trait = "0.1.53" catalog = { path = "../catalog" } +cluster = { path = "../cluster" } common_types = { path = "../common_types" } common_util = { path = "../common_util" } log = "0.4" +meta_client = { path = "../meta_client" } snafu = { version ="0.6.10", features = ["backtraces"]} system_catalog = { path = "../system_catalog" } table_engine = { path = "../table_engine" } diff --git a/catalog_impls/src/lib.rs b/catalog_impls/src/lib.rs index bb4adc0af7..da2548bf6b 100644 --- a/catalog_impls/src/lib.rs +++ b/catalog_impls/src/lib.rs @@ -2,7 +2,6 @@ use std::sync::Arc; -use async_trait::async_trait; use catalog::{ consts::SYSTEM_CATALOG, manager::{Manager, ManagerRef}, @@ -10,7 +9,6 @@ use catalog::{ CatalogRef, }; use system_catalog::{tables::Tables, SystemTableAdapter}; -use table_engine::table::{SchemaId, TableId}; use crate::system_tables::{SystemTables, SystemTablesBuilder}; @@ -57,29 +55,3 @@ impl Manager for CatalogManagerImpl { self.user_catalog_manager.all_catalogs() } } - -#[async_trait] -pub trait SchemaIdAlloc: Send + Sync { - type Error: std::error::Error + Send + Sync + 'static; - async fn alloc_schema_id<'a>( - &self, - schema_name: NameRef<'a>, - ) -> std::result::Result; -} - -#[async_trait] -pub trait TableIdAlloc: Send + Sync { - type Error: std::error::Error + Send + Sync + 'static; - async fn alloc_table_id<'a>( - &self, - schema_name: NameRef<'a>, - table_name: NameRef<'a>, - ) -> std::result::Result; - - async fn invalidate_table_id<'a>( - &self, - schema_name: NameRef<'a>, - table_name: NameRef<'a>, - table_id: TableId, - ) -> std::result::Result<(), Self::Error>; -} diff --git a/catalog_impls/src/volatile.rs b/catalog_impls/src/volatile.rs index b458a252bb..68789087d1 100644 --- a/catalog_impls/src/volatile.rs +++ b/catalog_impls/src/volatile.rs @@ -22,29 +22,22 @@ use catalog::{ }; use common_types::schema::SchemaName; use log::{debug, info}; +use meta_client::MetaClientRef; use snafu::{ensure, ResultExt}; -use table_engine::table::{SchemaId, TableRef}; +use table_engine::table::{SchemaId, TableId, TableRef}; use tokio::sync::Mutex; -use crate::{SchemaIdAlloc, TableIdAlloc}; - /// ManagerImpl manages multiple volatile catalogs. -pub struct ManagerImpl { - catalogs: HashMap>>, - schema_id_alloc: Arc, - table_id_alloc: Arc, +pub struct ManagerImpl { + catalogs: HashMap>, + meta_client: MetaClientRef, } -impl ManagerImpl -where - S: SchemaIdAlloc + 'static, - T: TableIdAlloc + 'static, -{ - pub async fn new(schema_id_alloc: S, table_id_alloc: T) -> Self { +impl ManagerImpl { + pub async fn new(meta_client: MetaClientRef) -> Self { let mut manager = ManagerImpl { catalogs: HashMap::new(), - table_id_alloc: Arc::new(table_id_alloc), - schema_id_alloc: Arc::new(schema_id_alloc), + meta_client, }; manager.maybe_create_default_catalog().await; @@ -53,11 +46,7 @@ where } } -impl Manager for ManagerImpl -where - S: SchemaIdAlloc + 'static, - T: TableIdAlloc + 'static, -{ +impl Manager for ManagerImpl { fn default_catalog_name(&self) -> NameRef { consts::DEFAULT_CATALOG } @@ -80,11 +69,7 @@ where } } -impl ManagerImpl -where - S: SchemaIdAlloc, - T: TableIdAlloc + 'static, -{ +impl ManagerImpl { async fn maybe_create_default_catalog(&mut self) { // Try to get default catalog, create it if not exists. if self.catalogs.get(consts::DEFAULT_CATALOG).is_none() { @@ -94,12 +79,11 @@ where }; } - async fn create_catalog(&mut self, catalog_name: String) -> Arc> { + async fn create_catalog(&mut self, catalog_name: String) -> Arc { let catalog = Arc::new(CatalogImpl { name: catalog_name.clone(), schemas: RwLock::new(HashMap::new()), - schema_id_alloc: self.schema_id_alloc.clone(), - table_id_alloc: self.table_id_alloc.clone(), + meta_client: self.meta_client.clone(), }); self.catalogs.insert(catalog_name, catalog.clone()); @@ -113,21 +97,16 @@ where /// The schema and table id are allocated (and maybe stored) by other components /// so there is no recovering work for all the schemas and tables during /// initialization. -struct CatalogImpl { +struct CatalogImpl { /// Catalog name name: String, /// All the schemas belonging to the catalog. schemas: RwLock>, - schema_id_alloc: Arc, - table_id_alloc: Arc, + meta_client: MetaClientRef, } #[async_trait] -impl Catalog for CatalogImpl -where - S: SchemaIdAlloc, - T: TableIdAlloc + 'static, -{ +impl Catalog for CatalogImpl { fn name(&self) -> NameRef { &self.name } @@ -147,14 +126,17 @@ where } let schema_id = self - .schema_id_alloc - .alloc_schema_id(name) + .meta_client + .alloc_schema_id(cluster::AllocSchemaIdRequest { + name: name.to_string(), + }) .await .map_err(|e| Box::new(e) as _) .context(catalog::CreateSchema { catalog: &self.name, schema: name, - })?; + }) + .map(|resp| SchemaId::from(resp.id))?; let mut schemas = self.schemas.write().unwrap(); if schemas.get(name).is_some() { @@ -165,7 +147,7 @@ where self.name.to_string(), name.to_string(), schema_id, - self.table_id_alloc.clone(), + self.meta_client.clone(), )); schemas.insert(name.to_string(), schema); @@ -191,7 +173,7 @@ where /// /// The tables belonging to the schema won't be recovered during initialization /// and will be opened afterwards. -struct SchemaImpl { +struct SchemaImpl { /// Catalog name catalog_name: String, /// Schema name @@ -201,15 +183,15 @@ struct SchemaImpl { /// Guard for creating/dropping table create_table_mutex: Mutex<()>, schema_id: SchemaId, - table_id_alloc: Arc, + meta_client: MetaClientRef, } -impl SchemaImpl { +impl SchemaImpl { fn new( catalog_name: String, schema_name: String, schema_id: SchemaId, - table_id_alloc: Arc, + meta_client: MetaClientRef, ) -> Self { Self { catalog_name, @@ -217,7 +199,7 @@ impl SchemaImpl { tables: RwLock::new(HashMap::new()), create_table_mutex: Mutex::new(()), schema_id, - table_id_alloc, + meta_client, } } @@ -265,7 +247,7 @@ impl SchemaImpl { } #[async_trait] -impl Schema for SchemaImpl { +impl Schema for SchemaImpl { fn name(&self) -> NameRef { &self.schema_name } @@ -306,14 +288,18 @@ impl Schema for SchemaImpl { } let table_id = self - .table_id_alloc - .alloc_table_id(&request.schema_name, &request.table_name) + .meta_client + .alloc_table_id(cluster::AllocTableIdRequest { + schema_name: request.schema_name.to_string(), + name: request.table_name.to_string(), + }) .await .map_err(|e| Box::new(e) as _) .context(schema::AllocateTableId { schema: &self.schema_name, table: &request.table_name, - })?; + }) + .map(|v| TableId::from(v.id))?; let request = request.into_engine_create_request(table_id); @@ -367,9 +353,13 @@ impl Schema for SchemaImpl { .await .context(DropTable)?; - // invalidate the table id after table is dropped in engine. - self.table_id_alloc - .invalidate_table_id(&schema_name, &table_name, table.id()) + // Request CeresMeta to drop this table. + self.meta_client + .drop_table(cluster::DropTableRequest { + schema_name: schema_name.to_string(), + name: table_name.to_string(), + id: table.id().as_u64(), + }) .await .map_err(|e| Box::new(e) as _) .context(schema::InvalidateTableId { diff --git a/cluster/src/cluster_impl.rs b/cluster/src/cluster_impl.rs index 6cbfcfa0a9..2ad3952f96 100644 --- a/cluster/src/cluster_impl.rs +++ b/cluster/src/cluster_impl.rs @@ -12,7 +12,7 @@ use meta_client::{ types::{ ActionCmd, GetNodesRequest, GetShardTablesRequest, RouteTablesRequest, RouteTablesResponse, }, - EventHandler, MetaClient, + EventHandler, MetaClientRef, }; use snafu::{OptionExt, ResultExt}; use tokio::{ @@ -25,7 +25,7 @@ use crate::{ table_manager::{ShardTableInfo, TableManager}, topology::ClusterTopology, Cluster, ClusterNodesNotFound, ClusterNodesResp, MetaClientFailure, Result, StartMetaClient, - TableManipulator, + TableManipulator, TableManipulatorRef, }; /// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`]. @@ -43,8 +43,8 @@ pub struct ClusterImpl { impl ClusterImpl { pub fn new( - meta_client: Arc, - table_manipulator: Arc, + meta_client: MetaClientRef, + table_manipulator: TableManipulatorRef, config: ClusterConfig, runtime: Arc, ) -> Result { @@ -102,8 +102,8 @@ impl ClusterImpl { struct Inner { table_manager: TableManager, - meta_client: Arc, - table_manipulator: Arc, + meta_client: MetaClientRef, + table_manipulator: TableManipulatorRef, #[allow(dead_code)] topology: RwLock, } @@ -166,7 +166,7 @@ impl EventHandler for Inner { impl Inner { fn new( - meta_client: Arc, + meta_client: MetaClientRef, table_manipulator: Arc, ) -> Result { Ok(Self { diff --git a/docs/guides/src/dev/crate-deps.dot b/docs/guides/src/dev/crate-deps.dot index 2b28e8460a..397816c26b 100644 --- a/docs/guides/src/dev/crate-deps.dot +++ b/docs/guides/src/dev/crate-deps.dot @@ -34,6 +34,8 @@ digraph G { catalog_impls -> catalog catalog_impls -> system_catalog catalog_impls -> table_engine + catalog_impls -> cluster + catalog_impls -> meta_client cluster -> analytic_engine cluster -> catalog diff --git a/docs/guides/src/dev/crate-deps.svg b/docs/guides/src/dev/crate-deps.svg index fdb029b1b1..6e875080d0 100644 --- a/docs/guides/src/dev/crate-deps.svg +++ b/docs/guides/src/dev/crate-deps.svg @@ -1,403 +1,415 @@ - - + G - + arrow_deps - -arrow_deps + +arrow_deps analytic_engine - -analytic_engine + +analytic_engine analytic_engine->arrow_deps - - + + proto - -proto + +proto analytic_engine->proto - - + + table_engine - -table_engine + +table_engine analytic_engine->table_engine - - + + wal - -wal + +wal analytic_engine->wal - - + + - + table_engine->arrow_deps - - + + - + table_engine->proto - - + + catalog - -catalog + +catalog catalog->table_engine - - + + catalog_impls - -catalog_impls + +catalog_impls catalog_impls->table_engine - - + + catalog_impls->catalog - - + + system_catalog - -system_catalog + +system_catalog catalog_impls->system_catalog - - + + + + + +cluster + +cluster + + + +catalog_impls->cluster + + + + + +meta_client + +meta_client + + + +catalog_impls->meta_client + + - + system_catalog->arrow_deps - - + + - + system_catalog->proto - - + + - + system_catalog->table_engine - - + + - + system_catalog->catalog - - - - - -cluster - -cluster + + - + cluster->analytic_engine - - + + - + cluster->catalog - - - - - -meta_client - -meta_client + + - + cluster->meta_client - - + + interpreters - -interpreters + +interpreters - + interpreters->arrow_deps - - + + - + interpreters->table_engine - - + + - + interpreters->catalog - - + + sql - -sql + +sql - + interpreters->sql - - + + df_operator - -df_operator + +df_operator - + interpreters->df_operator - - + + query_engine - -query_engine + +query_engine - + interpreters->query_engine - - + + - + sql->arrow_deps - - + + - + sql->table_engine - - + + - + sql->catalog - - + + - + sql->df_operator - - + + - + df_operator->arrow_deps - - + + - + query_engine->arrow_deps - - + + - + query_engine->table_engine - - + + - + query_engine->sql - - + + - + query_engine->df_operator - - + + server - -server + +server - + server->arrow_deps - - + + - + server->analytic_engine - - + + - + server->table_engine - - + + - + server->catalog - - + + - + server->system_catalog - - + + - + server->meta_client - - + + - + server->interpreters - - + + - + server->sql - - + + - + server->df_operator - - + + - + server->query_engine - - + + ceresdb - -ceresdb + +ceresdb - + ceresdb->analytic_engine - - + + - + ceresdb->table_engine - - + + - + ceresdb->catalog - - + + - + ceresdb->catalog_impls - - + + - + ceresdb->df_operator - - + + - + ceresdb->query_engine - - + + - + ceresdb->server - - + + diff --git a/meta_client/src/meta_impl.rs b/meta_client/src/meta_impl.rs index efe46c28c1..8f9b249584 100644 --- a/meta_client/src/meta_impl.rs +++ b/meta_client/src/meta_impl.rs @@ -37,7 +37,7 @@ use crate::{ }, EventHandler, EventHandlerRef, FailAllocSchemaId, FailAllocTableId, FailDropTable, FailGetGrpcClient, FailGetTables, FailHandleEvent, FailRouteTables, FailSendHeartbeat, - FetchActionCmd, InitHeartBeatStream, MetaClient, MetaRpc, Result, + FetchActionCmd, InitHeartBeatStream, MetaClient, MetaClientRef, MetaRpc, Result, }; #[derive(Debug, Deserialize, Clone)] @@ -564,7 +564,7 @@ pub fn build_meta_client( config: MetaClientConfig, node_meta_info: NodeMetaInfo, runtime: Arc, -) -> Result> { +) -> Result { let meta_client = MetaClientImpl::new(config, node_meta_info, runtime)?; Ok(Arc::new(meta_client)) } diff --git a/src/adapter.rs b/src/adapter.rs index a7b73a1399..28ae6aa9c4 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -3,66 +3,9 @@ use catalog::{ manager::ManagerRef as CatalogManagerRef, schema::{CloseOptions, CloseTableRequest, NameRef, OpenOptions, OpenTableRequest, SchemaRef}, }; -use catalog_impls::{SchemaIdAlloc, TableIdAlloc}; use cluster::TableManipulator; use log::debug; -use meta_client::{types::DropTableRequest, MetaClientRef}; -use table_engine::{ - engine::TableEngineRef, - table::{SchemaId, TableId}, - ANALYTIC_ENGINE_TYPE, -}; - -pub struct SchemaIdAllocAdapter(pub MetaClientRef); - -#[async_trait] -impl SchemaIdAlloc for SchemaIdAllocAdapter { - type Error = meta_client::Error; - - async fn alloc_schema_id<'a>(&self, schema_name: NameRef<'a>) -> Result { - self.0 - .alloc_schema_id(cluster::AllocSchemaIdRequest { - name: schema_name.to_string(), - }) - .await - .map(|resp| SchemaId::from(resp.id)) - } -} -pub struct TableIdAllocAdapter(pub MetaClientRef); - -#[async_trait] -impl TableIdAlloc for TableIdAllocAdapter { - type Error = meta_client::Error; - - async fn alloc_table_id<'a>( - &self, - schema_name: NameRef<'a>, - table_name: NameRef<'a>, - ) -> Result { - self.0 - .alloc_table_id(cluster::AllocTableIdRequest { - schema_name: schema_name.to_string(), - name: table_name.to_string(), - }) - .await - .map(|v| TableId::from(v.id)) - } - - async fn invalidate_table_id<'a>( - &self, - schema_name: NameRef<'a>, - table_name: NameRef<'a>, - table_id: TableId, - ) -> Result<(), Self::Error> { - self.0 - .drop_table(DropTableRequest { - schema_name: schema_name.to_string(), - name: table_name.to_string(), - id: table_id.as_u64(), - }) - .await - } -} +use table_engine::{engine::TableEngineRef, table::TableId, ANALYTIC_ENGINE_TYPE}; pub struct TableManipulatorImpl { pub catalog_manager: CatalogManagerRef, diff --git a/src/setup.rs b/src/setup.rs index b708e80e62..06a9d2a526 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -34,10 +34,7 @@ use tracing_util::{ tracing_appender::{non_blocking::WorkerGuard, rolling::Rotation}, }; -use crate::{ - adapter::{SchemaIdAllocAdapter, TableIdAllocAdapter, TableManipulatorImpl}, - signal_handler, -}; +use crate::{adapter::TableManipulatorImpl, signal_handler}; /// Setup log with given `config`, returns the runtime log level switch. pub fn setup_log(config: &Config) -> RuntimeLevel { @@ -158,11 +155,7 @@ async fn build_in_cluster_mode( ) .expect("fail to build meta client"); - let catalog_manager = { - let schema_id_alloc = SchemaIdAllocAdapter(meta_client.clone()); - let table_id_alloc = TableIdAllocAdapter(meta_client.clone()); - Arc::new(volatile::ManagerImpl::new(schema_id_alloc, table_id_alloc).await) - }; + let catalog_manager = { Arc::new(volatile::ManagerImpl::new(meta_client.clone()).await) }; let cluster = { let table_manipulator = Arc::new(TableManipulatorImpl {