diff --git a/catalog_impls/src/cluster.rs b/catalog_impls/src/cluster.rs new file mode 100644 index 0000000000..c7f3427847 --- /dev/null +++ b/catalog_impls/src/cluster.rs @@ -0,0 +1,202 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::sync::Arc; + +use async_trait::async_trait; +use catalog::{ + consts, + manager::Manager, + schema::{ + self, CloseOptions, CloseTableRequest, CreateOptions, CreateTableRequest, DropOptions, + DropTableRequest, NameRef, OpenOptions, OpenTableRequest, Schema, SchemaRef, + }, + Catalog, CatalogRef, +}; +use cluster::{cluster_impl::ClusterImpl, table_manager::TableManager}; +use table_engine::table::{SchemaId, TableRef}; + +/// Catalog/Schema manager in the cluster mode. +/// +/// This just redirects requests to [ClusterImpl] (which implements +/// [cluster::Cluster]) for orphan rule. +pub struct ManagerImpl(ClusterImpl); + +impl Manager for ManagerImpl { + fn default_catalog_name(&self) -> NameRef { + consts::DEFAULT_CATALOG + } + + fn default_schema_name(&self) -> NameRef { + consts::DEFAULT_SCHEMA + } + + fn catalog_by_name(&self, name: NameRef) -> catalog::manager::Result> { + let catalog = self.0.table_manager().get_catalog_name(name).map(|name| { + Arc::new(CatalogImpl { + name, + table_manager: self.0.table_manager().clone(), + }) as _ + }); + + Ok(catalog) + } + + fn all_catalogs(&self) -> catalog::manager::Result> { + let catalogs = self + .0 + .table_manager() + .get_all_catalog_names() + .into_iter() + .map(|name| { + Arc::new(CatalogImpl { + name, + table_manager: self.0.table_manager().clone(), + }) as _ + }) + .collect(); + + Ok(catalogs) + } +} + +pub struct CatalogImpl { + /// Catalog name + name: String, + table_manager: TableManager, +} + +#[async_trait] +impl Catalog for CatalogImpl { + /// Get the catalog name + fn name(&self) -> NameRef { + &self.name + } + + /// Find schema by name + fn schema_by_name(&self, name: NameRef) -> catalog::Result> { + let schema = self + .table_manager + .get_schema_id(&self.name, name) + .map(|id| { + Arc::new(SchemaImpl { + catalog_name: self.name.clone(), + schema_name: name.to_string(), + id: id.into(), + table_manager: self.table_manager.clone(), + }) as _ + }); + + Ok(schema) + } + + #[allow(unused_variables)] + async fn create_schema<'a>(&'a self, name: NameRef<'a>) -> catalog::Result<()> { + todo!() + } + + /// All schemas + fn all_schemas(&self) -> catalog::Result> { + let schemas = self + .table_manager + .get_all_schema_infos(&self.name) + .into_iter() + .map(|info| { + Arc::new(SchemaImpl { + catalog_name: self.name.clone(), + schema_name: info.name, + id: info.id.into(), + table_manager: self.table_manager.clone(), + }) as _ + }) + .collect(); + + Ok(schemas) + } +} + +pub struct SchemaImpl { + /// Catalog name + catalog_name: String, + /// Schema name + schema_name: String, + /// Schema id + id: SchemaId, + table_manager: TableManager, +} + +#[async_trait] +impl Schema for SchemaImpl { + /// Get schema name. + fn name(&self) -> NameRef { + &self.schema_name + } + + /// Get schema id + fn id(&self) -> SchemaId { + self.id + } + + /// Find table by name. + fn table_by_name(&self, name: NameRef) -> schema::Result> { + let table = self + .table_manager + .table_by_name(&self.catalog_name, &self.schema_name, name); + + Ok(table) + } + + /// Create table according to `request`. + #[allow(unused_variables)] + async fn create_table( + &self, + request: CreateTableRequest, + opts: CreateOptions, + ) -> schema::Result { + todo!() + } + + /// Drop table according to `request`. + /// + /// Returns true if the table is really dropped. + #[allow(unused_variables)] + async fn drop_table( + &self, + request: DropTableRequest, + opts: DropOptions, + ) -> schema::Result { + todo!() + } + + /// Open the table according to `request`. + /// + /// Return None if table does not exist. + #[allow(unused_variables)] + async fn open_table( + &self, + request: OpenTableRequest, + opts: OpenOptions, + ) -> schema::Result> { + todo!() + } + + /// Close the table according to `request`. + /// + /// Return false if table does not exist. + #[allow(unused_variables)] + async fn close_table( + &self, + request: CloseTableRequest, + opts: CloseOptions, + ) -> schema::Result<()> { + todo!() + } + + /// All tables + fn all_tables(&self) -> schema::Result> { + let tables = self + .table_manager + .get_all_table_ref(&self.catalog_name, &self.schema_name); + + Ok(tables) + } +} diff --git a/catalog_impls/src/lib.rs b/catalog_impls/src/lib.rs index da2548bf6b..893f9ab232 100644 --- a/catalog_impls/src/lib.rs +++ b/catalog_impls/src/lib.rs @@ -12,6 +12,7 @@ use system_catalog::{tables::Tables, SystemTableAdapter}; use crate::system_tables::{SystemTables, SystemTablesBuilder}; +pub mod cluster; mod system_tables; pub mod table_based; pub mod volatile; diff --git a/cluster/src/cluster_impl.rs b/cluster/src/cluster_impl.rs index 2ad3952f96..b49fe46059 100644 --- a/cluster/src/cluster_impl.rs +++ b/cluster/src/cluster_impl.rs @@ -21,10 +21,8 @@ use tokio::{ }; use crate::{ - config::ClusterConfig, - table_manager::{ShardTableInfo, TableManager}, - topology::ClusterTopology, - Cluster, ClusterNodesNotFound, ClusterNodesResp, MetaClientFailure, Result, StartMetaClient, + config::ClusterConfig, table_manager::TableManager, topology::ClusterTopology, Cluster, + ClusterNodesNotFound, ClusterNodesResp, MetaClientFailure, Result, StartMetaClient, TableManipulator, TableManipulatorRef, }; @@ -98,6 +96,10 @@ impl ClusterImpl { fn error_wait_lease(&self) -> Duration { self.config.meta_client.lease.0 / 2 } + + pub fn table_manager(&self) -> &TableManager { + &self.inner.table_manager + } } struct Inner { @@ -142,17 +144,9 @@ impl EventHandler for Inner { Ok(()) } - ActionCmd::CreateTableCmd(cmd) => self - .table_manager - .add_shard_table(ShardTableInfo::from(cmd)) - .map_err(|e| Box::new(e) as _), - ActionCmd::DropTableCmd(cmd) => { - warn!("Drop table, schema:{}, table:{}", cmd.schema_name, cmd.name); - - self.table_manager.drop_table(&cmd.schema_name, &cmd.name); - Ok(()) - } - ActionCmd::MetaNoneCmd(_) + ActionCmd::CreateTableCmd(_) => todo!(), + ActionCmd::DropTableCmd(_) + | ActionCmd::MetaNoneCmd(_) | ActionCmd::MetaCloseCmd(_) | ActionCmd::MetaSplitCmd(_) | ActionCmd::MetaChangeRoleCmd(_) => { diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs index 4983b6c5e6..ebc7428a70 100644 --- a/cluster/src/lib.rs +++ b/cluster/src/lib.rs @@ -14,7 +14,7 @@ use snafu::{Backtrace, Snafu}; pub mod cluster_impl; pub mod config; -mod table_manager; +pub mod table_manager; // FIXME: Remove this lint ignore derive when topology about schema tables is // finished. #[allow(dead_code)] diff --git a/cluster/src/table_manager.rs b/cluster/src/table_manager.rs index 70b78a34a9..e598aaf2cf 100644 --- a/cluster/src/table_manager.rs +++ b/cluster/src/table_manager.rs @@ -1,24 +1,24 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. use std::{ - collections::{BTreeMap, HashMap}, - sync::RwLock, + collections::{BTreeMap, HashMap, HashSet}, + sync::{Arc, RwLock}, }; +use catalog::consts::{self, DEFAULT_CATALOG}; use common_types::{ - schema::{SchemaId, SchemaName}, - table::{TableId, TableName}, + schema::{CatalogName, SchemaId, SchemaName}, + table::TableId, }; use meta_client::types::{CreateTableCmd, ShardId, ShardInfo, ShardTables, TableInfo}; -use snafu::OptionExt; +use table_engine::table::TableRef; -use crate::{Result, ShardNotFound}; +use crate::Result; #[derive(Debug, Clone)] -#[allow(dead_code)] -struct SchemaInfo { - name: SchemaName, - id: SchemaId, +pub struct SchemaInfo { + pub name: SchemaName, + pub id: SchemaId, } #[derive(Debug, Clone)] @@ -47,9 +47,9 @@ impl From<&CreateTableCmd> for ShardTableInfo { /// relationships: /// * one shard -> multiple tables /// * one schema -> multiple tables -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct TableManager { - inner: RwLock, + inner: Arc>, } impl TableManager { @@ -57,41 +57,126 @@ impl TableManager { self.inner.read().unwrap().get_shards_infos() } - pub fn add_shard_table(&self, shard_table: ShardTableInfo) -> Result<()> { - self.inner.write().unwrap().add_shard_table(shard_table) + pub fn add_shard_table(&self, table_info: TableInfo, table: TableRef) -> Result<()> { + self.inner.write().unwrap().add_table(table_info, table); + + Ok(()) + } + + pub fn update_table_info(&self, shard_table: &HashMap) { + self.inner.write().unwrap().update_table_info(shard_table) + } + + pub fn get_catalog_name(&self, catalog: &str) -> Option { + if self + .inner + .read() + .unwrap() + .catalog_infos + .contains_key(catalog) + { + Some(catalog.to_string()) + } else { + None + } } - pub fn drop_table(&self, schema_name: &str, table_name: &str) { + pub fn get_all_catalog_names(&self) -> Vec { self.inner - .write() + .read() .unwrap() - .drop_table(schema_name, table_name) + .catalog_infos + .keys() + .cloned() + .collect() } - pub fn update_table_info(&self, shard_table: &HashMap) { - self.inner.write().unwrap().update_table_info(shard_table) + pub fn get_schema_id(&self, catalog: &str, schema: &str) -> Option { + self.inner + .read() + .unwrap() + .catalog_infos + .get(catalog)? + .get(schema) + .map(|info| info.id) } - #[allow(dead_code)] - pub fn get_schema_id(&self, schema_name: &str) -> Option { - self.inner.read().unwrap().get_schema_id(schema_name) + pub fn get_all_schema_infos(&self, catalog: &str) -> Vec { + if let Some(schemas) = self.inner.read().unwrap().catalog_infos.get(catalog) { + schemas.values().cloned().collect() + } else { + vec![] + } } - #[allow(dead_code)] - pub fn get_table_id(&self, schema_name: &str, table_name: &str) -> Option { + // todo: should accept schema id as param instead of schema name? + pub fn get_all_table_ref(&self, catalog: &str, schema: &str) -> Vec { + let schema_id = if let Some(id) = self.inner.read().unwrap().get_schema_id(schema) { + id + } else { + return vec![]; + }; + self.inner .read() .unwrap() - .get_table_id(schema_name, table_name) + .tables_by_token + .iter() + .filter_map(|(token, table)| { + if token.catalog == catalog && token.schema == schema_id { + Some(table.clone()) + } else { + None + } + }) + .collect() + } + + pub fn table_by_name(&self, catalog: &str, schema: &str, table_name: &str) -> Option { + let schema_id = self.inner.read().unwrap().get_schema_id(schema)?; + + self.inner + .read() + .unwrap() + .tables_by_token + .iter() + .find(|(token, table)| { + token.catalog == catalog && token.schema == schema_id && table.name() == table_name + }) + .map(|(_, table)| table.clone()) } } #[derive(Debug, Default)] struct Inner { + // shard infos shard_infos: HashMap, - schema_infos: HashMap, - // TODO: maybe another mapping (shard -> table) is necessary. - tables: BTreeMap>, + + // catalog/schema infos + catalog_infos: HashMap>, + + // table handles + tables_by_token: BTreeMap, + #[allow(dead_code)] + tokens_by_shard: HashMap>, +} + +#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Debug, Default)] +struct TableToken { + // todo: change this to CatalogId + catalog: String, + schema: SchemaId, + table: TableId, +} + +impl TableToken { + fn from_table_info(info: TableInfo) -> Self { + Self { + catalog: DEFAULT_CATALOG.to_string(), + schema: info.schema_id, + table: info.id, + } + } } impl Inner { @@ -108,26 +193,21 @@ impl Inner { }; self.shard_infos.insert(*shard_id, shard_info.clone()); for table in &shard_tables.tables { - self.schema_infos + self.catalog_infos + .get_mut(consts::DEFAULT_CATALOG) + .unwrap() .entry(table.schema_name.clone()) .or_insert(SchemaInfo { name: table.schema_name.clone(), id: table.schema_id, }); - self.tables - .entry(table.schema_name.clone()) - .or_insert_with(BTreeMap::new) - .insert( - table.name.clone(), - ShardTableInfo { - shard_id: shard_info.shard_id, - table_info: table.clone(), - }, - ); + + todo!() } } } + #[allow(dead_code)] fn find_shard_info(&self, shard_id: ShardId) -> Option { self.shard_infos .values() @@ -135,33 +215,22 @@ impl Inner { .cloned() } - fn add_shard_table(&mut self, shard_table: ShardTableInfo) -> Result<()> { - self.find_shard_info(shard_table.shard_id) - .context(ShardNotFound { - shard_id: shard_table.shard_id, - })?; - - self.tables - .entry(shard_table.table_info.schema_name.clone()) - .or_insert_with(BTreeMap::new) - .insert(shard_table.table_info.name.clone(), shard_table); - - Ok(()) + fn add_table(&mut self, table_info: TableInfo, table: TableRef) { + self.tables_by_token + .insert(TableToken::from_table_info(table_info), table); } - fn drop_table(&mut self, schema_name: &str, table_name: &str) { - self.tables - .get_mut(schema_name) - .map(|v| v.remove(table_name)); + #[allow(dead_code)] + fn drop_table(&mut self, table_info: TableInfo) { + self.tables_by_token + .remove(&TableToken::from_table_info(table_info)); } fn get_schema_id(&self, schema_name: &str) -> Option { - self.schema_infos.get(schema_name).map(|v| v.id) - } - - fn get_table_id(&self, schema_name: &str, table_name: &str) -> Option { - self.tables + self.catalog_infos + .get(consts::DEFAULT_CATALOG) + .unwrap() .get(schema_name) - .and_then(|schema| schema.get(table_name).map(|v| v.table_info.id)) + .map(|v| v.id) } } diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index abb3cc74c0..30e4ee24db 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -152,6 +152,7 @@ pub enum Error { }, } +pub type CatalogName = String; pub type SchemaId = u32; pub type SchemaName = String; // TODO: make these constants configurable